Spark简介
三种资源管理器:Yarn、Standalone、Mesos(粗粒度、细粒度)
粗粒度管理模式:一次分配运行过程中的全部资源,且运行过程中要一直占用这些资源(即使不用),程序运行结束后,回收全部资源;
细粒度管理模式:资源按需分配。
术语解释:
- application : spark应用程序,包含:driver(一个) + executor(多个);
- application jar
- cluster manager
- worker node
- driver program:管理应用程序。 main() + 初始化了SparkContext;
- executor : 位于worker,task在这里执行;
- Deploy mode : cluster(driver在集群中,看不见输出结果。用于生产环境)、client(driver在客户端,能看见结果。用于开发测试)
- Task:任务调度的最小单位,Driver将Task发送到Executor上执行;数量由分区数决定
- Job:Action触发Job。Job包含多个stage;
- Stage:依据shuffle(宽依赖),将一个Job切分为多个Stage,Stage又称为TaskSets;
Spark部署运行模式:
本地模式:Spark所有进程都运行在一台机器的JVM中;
local、local[N]、local[*]、local[N,M]
伪分布式模式:在一台机器中模拟集群运行,相关的进程在同一台机器上;
local-cluster[N,cores,memory]
分布式模式包括:Standalone、Yarn、Mesos
spark://node1:7077
–deploy-mode:cluster、client(缺省值)
cluster:看不见返回结果,用于生产环境
client:能看见返回结果,用于测试环境
二者最主要的区别是:Driver运行在哪里。cluster模式,Driver运行在cluster上;client模式,Driver运行在客户机上;
standalone、Yarn都有cluster、client模式;
RDD
RDD是spark的核心概念,它是一个容错、可以并行执行的分布式数据集。
什么是RDD:
- 一个分区的列表
- 一个计算函数compute,对每个分区进行计算
- 对其他RDDs的依赖(宽依赖、窄依赖)列表
- 对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
- 对每个分区有一个优先位置的列表【可选的】
SparkContext
SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互;
SparkContext用于连接Spark集群、创建RDD、累加器、广播变量;
调用SparkContext的parallelize、makeRDD方法,从数组中创建RDD;
用textFile方法来从文件系统中加载数据创建RDD;
SparkContext的三大组件:DAGScheduler、TaskScheduler、SchedulerBackend
DAGScheduler : 将DAG划分成若干个Stage;
TaskScheduler : 将Stage划分成若干为Task,并对Task进行调度;
SchedulerBackend:定义了许多与Executor事件相关的处理。如:新的executor注册时记录executor的信息,增加全局的资源量(核数);executor更新状态,若任务完成的话,回收core;停止executor、remove executor等事件。
RDD的操作
可以分为:Transformation、Action
Transformation只是记录了RDD转换的轨迹,并不会发生真正的计算;
只有遇到Action操作时,才会发生真正的计算;
常见的Transformation:
会产生Shuffle的Transformation:
会产生action的Transformation:
常见的Action:
PairRDD常见操作:keys、values、sortByKey、reduceByKey、join、mapValues
RDD的持久化
cache、persist、checkpoint:都属于Transformation,即都是lazy的,需要action触发;
cache() = persist(StorageLevel.MEMORY_ONLY)
StorageLevel.MEMORY_ONLY:默认的存储级别
StorageLevel.MEMORY_AND_DISK_2:本地放一份,远程放一份;
persist:可以有更多存储级别的选择;
cache、persist:主要用做性能优化;
checkpoint:主要用来做容错:将RDD中的数据存储保存到HFDS;斩断依赖;
persist中的数据会被自动清除;checkpoint的数据需要手工清除;
RDD分区
RDD分区的原则:
尽可能使得分区的个数,等于集群核心数目;
尽可能使同一 RDD 不同分区内的记录的数量一致;
默认的分区数(并发数)(可在 spark-default.conf 配置):
spark.default.parallelism
对于 textFile 方法,默认情况下:
每个HDFS的分区文件(默认块大小128M),每个都会创建一个RDD分区;最小2个分区;
对于本地文件,默认分区个数等于 min(defaultParallelism, 2);
分区器
Hash分区、Range分区、自定义分区;
只有Key-Value类型的RDD才会有分区器,非Key-Value类型的RDD分区器的值是None。
分区器决定了:
- RDD中分区的个数;
- RDD中每条数据经过Shuffle过程属于哪个分区;
- reduce的个数;
依赖
RDD的依赖分为两种:窄依赖(Narrow Dependencies)与宽依赖(Wide Dependencies,源码中称为Shuffle Dependencies)
依赖有2个作用:
其一用来解决数据容错;
其二用来划分stage。
窄依赖:父RDD的Partition 与 子RDDPartition对应的关系为 1:1 或 n:1;
宽依赖:父RDD的Partition 与 子RDDPartition对应的关系为 m:n ;
宽依赖对应着shuffle操作;
宽依赖中子RDD分区通常来自多个父RDD分区。如果子RDD的某个分区丢失,所有的父RDD分区可能都要进行重新计算;
Shuffle
Hash Shuffle => Hash Shuffle V2 => Sort Shuffle => tungsten-sort => 两个shuffle算法的合并 => Hash Shuffle去除
Hash Shuffle 算法存在的问题:生成海量的小文件(同时打开过多文件 及 低效的随机IO)
reduceByKey、grouByKey :都有shuffle,但reduceByKey在shuffle过程中传输的数据量小;
repartition、coalesce :coalesce没有shuffle;repartition有shuffle;
shuffle是划分stage的依据;
共享变量
广播变量、累加器;
广播变量,由driver广播到Executor上;在Executor中的Task之间进行共享(减少了数据的传输);共享变量只读;
累加器:则支持在所有不同节点之间进行累加计算,只能由driver读取;
作业调度
- SparkContext初始化;
- Driver向Master注册,并申请资源;
- Master命令Worker启动Executor;
- Executor启动,并向Driver注册;
- Driver向Executor发送Task;
- Executor向Driver汇报任务执行情况;
- 应用程序执行完毕,Driver向Master注销自己;
SparkSQL
SparkSession
RDD、Dataset、DataFrame
type DataFrame = Dataset[Row]
DataFrame = RDD[Row] + schema
Dataset = RDD + case class
spark.read.option(“”, true).csv(“”)
header、inferschema、delimiter(sep)
Action
show(n, false)
printSchema
Transformation
DSL(领域专用语言)
与RDD类似:map、flatMap、filter
缓存:cache、presist、checkpoint
select :与列有关
where
groupBy
orderBy
join
空值处理
时间日期
分析函数
分析函数名(参数) over (
partition by xxx
order by xxx
rows/range between … and …)
起始行 : unbounded preceding
终止行 : unbounded following
当前行 :current row
前n行 : n preceding
后n行 : n following
聚组函数:min、max、sum、count、avg
排名函数:row_number、dense_rank、rank
行函数:lag、lead、first_value、last_value
其他:cube、rollup
记录的合并与展开 : concat_ws、collect_set、collect_list、explode
半连接(left semi join):左半连接实现了类似in、exists的查询语义,输出符合条件的左表内容;
反连接(left anti join):两表关联,只返回主表的数据,并且只返回主表与子表没关联上的数据,这种连接就叫反连接。反连接一般就是指的 not in 和 not exists;