YMLiang

Spark简介

三种资源管理器:Yarn、Standalone、Mesos(粗粒度、细粒度)
粗粒度管理模式:一次分配运行过程中的全部资源,且运行过程中要一直占用这些资源(即使不用),程序运行结束后,回收全部资源;
细粒度管理模式:资源按需分配。

术语解释:

  • application : spark应用程序,包含:driver(一个) + executor(多个);
  • application jar
  • cluster manager
  • worker node
  • driver program:管理应用程序。 main() + 初始化了SparkContext;
  • executor : 位于worker,task在这里执行;
  • Deploy mode : cluster(driver在集群中,看不见输出结果。用于生产环境)、client(driver在客户端,能看见结果。用于开发测试)
  • Task:任务调度的最小单位,Driver将Task发送到Executor上执行;数量由分区数决定
  • Job:Action触发Job。Job包含多个stage;
  • Stage:依据shuffle(宽依赖),将一个Job切分为多个Stage,Stage又称为TaskSets;

Spark部署运行模式:

本地模式:Spark所有进程都运行在一台机器的JVM中;
local、local[N]、local[*]、local[N,M]

伪分布式模式:在一台机器中模拟集群运行,相关的进程在同一台机器上;
local-cluster[N,cores,memory]

分布式模式包括:Standalone、Yarn、Mesos
spark://node1:7077

–deploy-mode:cluster、client(缺省值)
cluster:看不见返回结果,用于生产环境
client:能看见返回结果,用于测试环境
二者最主要的区别是:Driver运行在哪里。cluster模式,Driver运行在cluster上;client模式,Driver运行在客户机上;
standalone、Yarn都有cluster、client模式;

RDD

RDD是spark的核心概念,它是一个容错、可以并行执行的分布式数据集。
什么是RDD:

  1. 一个分区的列表
  2. 一个计算函数compute,对每个分区进行计算
  3. 对其他RDDs的依赖(宽依赖、窄依赖)列表
  4. 对key-value RDDs来说,存在一个分区器(Partitioner)【可选的】
  5. 对每个分区有一个优先位置的列表【可选的】

SparkContext

SparkContext是编写Spark程序用到的第一个类,是Spark的主要入口点,它负责和整个集群的交互;
SparkContext用于连接Spark集群、创建RDD、累加器、广播变量;

调用SparkContext的parallelize、makeRDD方法,从数组中创建RDD;
用textFile方法来从文件系统中加载数据创建RDD;

SparkContext的三大组件:DAGScheduler、TaskScheduler、SchedulerBackend
DAGScheduler : 将DAG划分成若干个Stage;
TaskScheduler : 将Stage划分成若干为Task,并对Task进行调度;
SchedulerBackend:定义了许多与Executor事件相关的处理。如:新的executor注册时记录executor的信息,增加全局的资源量(核数);executor更新状态,若任务完成的话,回收core;停止executor、remove executor等事件。

RDD的操作

可以分为:Transformation、Action
Transformation只是记录了RDD转换的轨迹,并不会发生真正的计算;
只有遇到Action操作时,才会发生真正的计算;

常见的Transformation:
会产生Shuffle的Transformation:
会产生action的Transformation:
常见的Action:

PairRDD常见操作:keys、values、sortByKey、reduceByKey、join、mapValues

RDD的持久化

cache、persist、checkpoint:都属于Transformation,即都是lazy的,需要action触发;
cache() = persist(StorageLevel.MEMORY_ONLY)
StorageLevel.MEMORY_ONLY:默认的存储级别
StorageLevel.MEMORY_AND_DISK_2:本地放一份,远程放一份;
persist:可以有更多存储级别的选择;
cache、persist:主要用做性能优化;

checkpoint:主要用来做容错:将RDD中的数据存储保存到HFDS;斩断依赖;

persist中的数据会被自动清除;checkpoint的数据需要手工清除;

RDD分区

RDD分区的原则:
尽可能使得分区的个数,等于集群核心数目;
尽可能使同一 RDD 不同分区内的记录的数量一致;

默认的分区数(并发数)(可在 spark-default.conf 配置):
spark.default.parallelism

对于 textFile 方法,默认情况下:
每个HDFS的分区文件(默认块大小128M),每个都会创建一个RDD分区;最小2个分区;
对于本地文件,默认分区个数等于 min(defaultParallelism, 2);

分区器

Hash分区、Range分区、自定义分区;
只有Key-Value类型的RDD才会有分区器,非Key-Value类型的RDD分区器的值是None。
分区器决定了:

  • RDD中分区的个数;
  • RDD中每条数据经过Shuffle过程属于哪个分区;
  • reduce的个数;

依赖

RDD的依赖分为两种:窄依赖(Narrow Dependencies)与宽依赖(Wide Dependencies,源码中称为Shuffle Dependencies)
依赖有2个作用:
其一用来解决数据容错;
其二用来划分stage。
窄依赖:父RDD的Partition 与 子RDDPartition对应的关系为 1:1 或 n:1;
宽依赖:父RDD的Partition 与 子RDDPartition对应的关系为 m:n ;
宽依赖对应着shuffle操作;
宽依赖中子RDD分区通常来自多个父RDD分区。如果子RDD的某个分区丢失,所有的父RDD分区可能都要进行重新计算;

Shuffle

Hash Shuffle => Hash Shuffle V2 => Sort Shuffle => tungsten-sort => 两个shuffle算法的合并 => Hash Shuffle去除
Hash Shuffle 算法存在的问题:生成海量的小文件(同时打开过多文件 及 低效的随机IO)
reduceByKey、grouByKey :都有shuffle,但reduceByKey在shuffle过程中传输的数据量小;
repartition、coalesce :coalesce没有shuffle;repartition有shuffle;
shuffle是划分stage的依据;

共享变量

广播变量、累加器;
广播变量,由driver广播到Executor上;在Executor中的Task之间进行共享(减少了数据的传输);共享变量只读;
累加器:则支持在所有不同节点之间进行累加计算,只能由driver读取;

作业调度

  1. SparkContext初始化;
  2. Driver向Master注册,并申请资源;
  3. Master命令Worker启动Executor;
  4. Executor启动,并向Driver注册;
  5. Driver向Executor发送Task;
  6. Executor向Driver汇报任务执行情况;
  7. 应用程序执行完毕,Driver向Master注销自己;

SparkSQL

SparkSession
RDD、Dataset、DataFrame

type DataFrame = Dataset[Row]
DataFrame = RDD[Row] + schema
Dataset = RDD + case class

spark.read.option(“”, true).csv(“”)
header、inferschema、delimiter(sep)

Action

show(n, false)
printSchema

Transformation

DSL(领域专用语言)
与RDD类似:map、flatMap、filter
缓存:cache、presist、checkpoint
select :与列有关
where
groupBy
orderBy
join
空值处理
时间日期

分析函数

分析函数名(参数) over (
partition by xxx
order by xxx
rows/range between … and …)

起始行 : unbounded preceding
终止行 : unbounded following
当前行 :current row
前n行 : n preceding
后n行 : n following

聚组函数:min、max、sum、count、avg
排名函数:row_number、dense_rank、rank
行函数:lag、lead、first_value、last_value

其他:cube、rollup
记录的合并与展开 : concat_ws、collect_set、collect_list、explode

半连接(left semi join):左半连接实现了类似in、exists的查询语义,输出符合条件的左表内容;
反连接(left anti join):两表关联,只返回主表的数据,并且只返回主表与子表没关联上的数据,这种连接就叫反连接。反连接一般就是指的 not in 和 not exists;

Kafka & Spark Streaming

 评论


博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Material X 作为主题 , 总访问量为 次 。
Copyright 2018-2019 YMLiang'BLOG   |   京ICP备 - 19039949  |  载入天数...载入时分秒...