目录
一、spark简介
spark是什么
spark的特征
二、Spark RDD
RDD基本概念
RDD五大属性
HDFS与Partition
RDD流程图
Lineage血统
三、Spark 算子
转换算子
行动算子
控制算子
cache
persist
checkpoint
执行原理
四、任务提交方式
Standalone-client
Standalone-cluster
yarn-client
yarn-cluster
五、窄依赖和宽依赖
六、Stage
stage切割规则
stage计算模式
七、SparkShuffle
SparkShuffle概念
HashShuffle
普通机制
合并机制
SortShuffle
普通机制
bypass机制
八、Spark资源调度和任务调度
调度流程
流程图解
粗细粒度资源申请
粗粒度资源申请(Spark)
细粒度资源申请(MR)
九、广播变量和累加器
广播变量
累加器
十、Spark SQL
DataFrame
DataSet
DSL操作
action
查询
Limit
排序
组函数
去重
聚合
Union
Join
save
SparkSQL的数据源
SparkSQL底层架构
谓词下推
十一、Spark Streaming
概述
示例
DStream的输出操作
Spark Streamin运行架构
参考资料
一、spark简介
spark是什么
Apache Spark是专门为大数据处理而设计的通用的计算引擎。spark拥有MapReduce所具有的优点,但不同于Map Reduce的是Job中间输出结果可以缓存到内存中,从而不再需要读写HDFS,减少磁盘数据交互,因此Spark能更好的适应机器学习和数据挖掘等需要迭代的算法。
Spark提供了Spark RDD 、 Spark SQL 、 Spark Streaming 、 Spark MLlib 、 Spark GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是 spark 一站式开发的特点。
spark的特征
- 更快的速度
- 内存计算下,Spark 比 Hadoop 快100倍。
- 易用性
- Spark 提供了80多个高级运算符。
- 通用性
- Spark 提供了大量的库,包括Spark Core、Spark SQL、Spark Streaming、MLlib、GraphX。 开发者可以在同一个应用程序中无缝组合使用这些库。
- 支持多种资源管理器
- Spark 支持 Hadoop YARN,Apache Mesos,及其自带的独立集群管理器
- Spark生态系统
- Shark:Shark基本上就是在Spark的框架基础上提供和Hive一样的HiveQL命令接口,为了最大程度的保持和Hive的兼容性,Spark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替HadoopMapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
- SparkR:SparkR是一个为R提供了轻量级的Spark前端的R包。 SparkR提供了一个分布式的data frame数据结构,解决了 R中的data frame只能在单机中使用的瓶颈,它和R中的data frame 一样支持许多操作,比如select,filter,aggregate等等。(类似dplyr包中的功能)这很好的解决了R的大数据级瓶颈问题。 SparkR也支持分布式的机器学习算法,比如使用MLib机器学习库。SparkR为Spark引入了R语言社区的活力,吸引了大量的数据科学家开始在Spark平台上直接开始数据分析之旅。
二、Spark RDD
RDD基本概念
- RDD(Resilient Distributed Datasets),弹性分布式数据集,是分布式内存的一个抽象概念。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。
- 对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
- RDD具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。
- 现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。
- 为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等。
- RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join。
- RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区。
- RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个dataset片段。
- RDD是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据和分区。逻辑上可以认为RDD是一个分布式的集合
- Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、MapPartitionsRDD、ShuffledRDD、ReliableCheckpointRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
RDD五大属性
- RDD是由一系列的Partition组成的
- 函数是作用在每一个 partition/split 上
- RDD之间有一系列的依赖关系
- 分区器是作用在 (K,V) 格式的 RDD 上
- RDD提供一系列最佳的计算位置
HDFS与Partition
- hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件。假设block设置为128M,你的文件是260M,那么这 份文件占3个block(128+128+4)。这样的设计虽然会有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容。(p.s. 考虑到hdfs冗余设计,默认三份拷贝,实际上3*3=9个block的物理空间。)
- spark中的partition是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition组成的。partition是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
- block位于存储空间、partition位于计算空间,block的大小是固定的、partition大小是不固定的, block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到.
- Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
RDD流程图
注意
- textFile 方法底层封装的是 MR 读取文件的方式,读取文件之前先进行 split 切片,默认 split大小是一个 block 大小
- RDD实际上不存储数据,这里方便理解,暂时理解为存储数据
- 什么是K,V格式的RDD ?
- 如果RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V格式的RDD
- 哪里体现 RDD 的弹性(容错)?
- partition数量,大小没有限制,体现了 RDD 的弹性
- RDD之间依赖关系,可以基于上一个 RDD 重新计算出 RDD
- 哪里体现 RDD 的分布式?
- RDD是由 Partition 组成, partition 是分布在不同节点上的
- RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念
Lineage血统
RDD的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
三、Spark 算子
Spark 记录了 RDD 之间的生成和依赖关系。但是只有当 F 进行行动操作时,Spark 才会根据 RDD的依赖关系生成 DAG,并从起点开始
转换算子
- Transformations 类算子叫做转换算子(本质就是函数), Transformations 算子是延迟执行,也叫懒加载执行。
- 常见Transformations类算子
- filter:过滤符合条件的记录数, true 保留, false 过滤掉。
- map:将一个 RDD 中的每个数据项,通过 map 中的函数映射变为一个新的元素。特点:输入 一条,输出一条数据。
- flatMap:先 map 后 flat 。与 map 类似,每个输入项可以映射为0到多个输出项。
- sample:随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样。
- reduceByKey 将相同的 Key 根据相应的逻辑进行处理。
- sortByKey / sortBy 作用在 K,V格式的RDD 上,对 key 进行升序或者降序排序。
package com.libing.spark.operator
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 转换算子是对rdd相互转换
* filter/map/flatMap/sample/reduceByKey
*
* @author liar
* @date 2022/9/4 10:31
* @version 1.0
*/
object TransformationsFun {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local").setAppName("transformation_operator")
val sc: SparkContext = new SparkContext(conf)
//数据(rdd)的产生,rdd之间的相互转换
val file: String = TransformationsFun.getClass.getClassLoader.getResource("wc.txt").getFile
val lineRDD: RDD[String] = sc.textFile(file)
// println(sc.defaultMinPartitions)
// println(sc.defaultParallelism)
//
// /**
// * filter
// * filter不会改变数据的整体结构
// */
// val resultRDD: RDD[String] = lineRDD.filter(x => {
// if (x.contains("shanghai"))
// true
// else
// false
// })
// resultRDD.foreach(println)
// println("------------------------")
//
//
// /**
// * map & flatMap
// */
// val value: RDD[Array[String]] = lineRDD.map(x => x.split(" "))
// val value1: RDD[String] = lineRDD.flatMap(_.split(" "))
// value.foreach(println)
// value1.foreach(println)
// println("------------------------")
//
//
// /**
// * reduceByKey
// */
// lineRDD.map(x => (x, 1)).reduceByKey(_ + _).foreach(println)
/**
* sortBYKey & sortBy
* 排序,sortBy指定key再去排序
* 默认升序,false降序
*/
val result: RDD[(String, Int)] = lineRDD.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey((x, y) => x + y)
result.sortBy(_._2).foreach(println)
result.sortBy(_._2, false).foreach(println)
result.sortBy(_._1, false).foreach(println)
result.sortByKey().foreach(println)
/**
* sample(抽样)
*/
lineRDD.sample(true, 0.5).foreach(println)
sc.stop()
}
}
wc.txt文件如下
行动算子
Action 类算子叫做行动算子, Action 类算子是触发执行。
一个application 应用程序中有几个 Action 类算子执行,就有几个 job 运行。
常见Action 类算子
count :返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。
take(n) :返回一个包含数据集前 n 个元素的集合。
first:效果等同于 take(1) ,返回数据集中的第一个元素。
foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
collect:将计算结果回收到 Driver 端。
package com.libing.spark.operator
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author liar
* @date 2022/9/4 12:48
* @version 1.0
*/
object ActionFun {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local").setAppName("action_operator")
val sc: SparkContext = new SparkContext(conf)
val file: String = TransformationsFun.getClass.getClassLoader.getResource("wc.txt").getFile
val lineRDD: RDD[String] = sc.textFile(file)
/**
* 行动算子
* count foreach collect take first
*/
println("------行动算子开始-------")
println("======foreach开始=======")
lineRDD.foreach(println)
println("======foreach结束=======")
println("======count开始=======")
//计算数据集中存在的元素数量
println(lineRDD.count())
println("======count结束=======")
println("======collect开始=======")
lineRDD.collect().foreach(println)
//1.collect的作用
//Spark内有collect方法,是Action操作里边的一个算子,这个方法可以将RDD类型的数据转化为数组,同时会从远程集群里拉取数据到driver端。
//2.已知的弊端
//首先,collect是Action里边的,根据RDD的惰性机制,真正的计算发生在RDD的Action操作。那么,一次collect就会导致一次Shuffle,而一次Shuffle调度一次stage,然而一次stage包含很多个已分解的任务碎片Task。这么一来,会导致程序运行时间大大增加,属于比较耗时的操作,即使是在local模式下也同样耗时。
//其次,从环境上来讲,本机local模式下运行并无太大区别,可若放在分布式环境下运行,一次collect操作会将分布式各个节点上的数据汇聚到一个driver节点上,而这么一来,后续所执行的运算和操作就会脱离这个分布式环境而相当于单机环境下运行,这也与Spark的分布式理念不合。
//最后,将大量数据汇集到一个driver节点上,并且像这样val arr = data.collect(),将数据用数组存放,占用了jvm堆内存,可想而知,是有多么轻松就会内存溢出。
//3.如何规避
//若需要遍历RDD中元素,大可不必使用collect,可以使用foreach语句;
//若需要打印RDD中元素,可用take语句,返回数据集前n个元素,data.take(1000).foreach(println),这点官方文档里有说明;
//若需要查看其中内容,可用saveAsTextFile方法。
//总之,单机环境下使用collect问题并不大,但分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。
//4.补充:
//collectPartitions:同样属于Action的一种操作,同样也会将数据汇集到Driver节点上,与collect区别并不是很大,唯一的区别是:collectPartitions产生数据类型不同于collect,collect是将所有RDD汇集到一个数组里,而collectPartitions是将各个分区内所有元素存储到一个数组里,再将这些数组汇集到driver端产生一个数组;collect产生一维数组,而collectPartitions产生二维数组。
println("======collect结束=======")
println("======take开始=======")
//返回现有数据集的前n个元素
lineRDD.take(2).foreach(println)
println("======take结束=======")
println("======first开始=======")
//在Spark中,First函数始终返回数据集的第一个元素。它类似于take(1)。
val str: String = lineRDD.first()
println(str)
println("======first结束=======")
println("------行动算子结束-------")
sc.stop()
}
}
运行结果如下:
------行动算子开始-------
======foreach开始=======
hello nihao
hello shanghai
hello beijing
hello cq
hello nihao
hello shanghai
hello nihao
======foreach结束=======
======count开始=======
7
======count结束=======
======collect开始=======
hello nihao
hello shanghai
hello beijing
hello cq
hello nihao
hello shanghai
hello nihao
======collect结束=======
======take开始=======
hello nihao
hello shanghai
======take结束=======
======first开始=======
hello nihao
======first结束=======
------行动算子结束-------
控制算子
将RDD 持久化,持久化的单位是 partition
控制算子有三种: cache , persist , checkpoint 。 cache 和 persist 都是懒执行的。必须有一个 action 类算子触发执行
checkpoint 算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系
package com.libing.spark.operator
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* cache,persist,checkpoint
* @author liar
* @date 2022/9/4 20:33
* @version 1.0
*/
object ControlFun {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local").setAppName("control_operator")
val sc: SparkContext = new SparkContext(conf)
val file: String = TransformationsFun.getClass.getClassLoader.getResource("test.txt").getFile
val lineRDD: RDD[String] = sc.textFile(file)
/**
* 控制算子(持久化算子)
*/
println("------控制算子开始-------")
/**
* cache 对rdd进行持久化到内存的操作(中间结果的持久化),能够提升性能
*/
// println("======cache开始=======")
// println("有cache的情况")
// //cache() = persist() 只放在内存
// lineRDD.cache()
// val startTime: Long = System.currentTimeMillis()
// val count = lineRDD.count()
// val endTime: Long = System.currentTimeMillis()
// println(s"此rdd共计$count 条数据,初始化数据以及cache用时为:${endTime - startTime}")
// println("----------------------")
// println("无cache的情况")
// val startTime2: Long = System.currentTimeMillis()
// val count2 = lineRDD.count()
// val endTime2: Long = System.currentTimeMillis()
// println(s"此rdd共计$count2 条数据,初始化数据以及cache用时为:${endTime2 - startTime2}")
// println("======cache结束=======")
// println("======persist开始=======")
// //DISK_ONLY会将读取的数据暂时存放进磁盘(AppDataLocalTemp)目录下,待app运行结束将会删除临时文件
// //删除文件的日志
// //22/09/04 21:32:58 INFO ShutdownHookManager: Deleting directory C:UsersliarAppDataLocalTempspark-bf14f82c-076a-429d-b958-1b373c8d7ed8
// lineRDD.persist(StorageLevel.DISK_ONLY)
// val startTime: Long = System.currentTimeMillis()
// val count = lineRDD.count()
// val endTime: Long = System.currentTimeMillis()
// println(s"此rdd共计$count 条数据,初始化数据以及cache用时为:${endTime - startTime}")
// println("----------------------")
// println("无persist的情况")
// val startTime2: Long = System.currentTimeMillis()
// val count2 = lineRDD.count()
// val endTime2: Long = System.currentTimeMillis()
// println(s"此rdd共计$count2 条数据,初始化数据以及cache用时为:${endTime2 - startTime2}")
// println("======persist结束=======")
/**
*将这个RDD标记为检查点。它将被保存到检查点目录内的一个文件,使用SparkContext#setCheckpointDir设置,所有对其父rdd的引用将被删除。该函数必须在该RDD上执行任何作业之前调用。强烈建议将该RDD持久化到内存中,否则将其保存到文件中将需要重新计算。
*
* 执行原理:
* 1.当rdd的job执行完毕后,会从finalrdd从后往前回溯
* 2.当回溯到某一个rdd调用了checkpoint方法,会对当前的rdd做一个标记
* 3.spark框架会自动启动一个新的job,重新计算这个rdd的数据,将数据持久化到hdfs或本地
*/
println("======checkpoint开始=======")
//checkpoint 使用需要设置数据存储路径
sc.setCheckpointDir("./checkpoint")//本地
//sc.setCheckpointDir("hdfs://node01:8020/checkpoint")//hdfs(node01为主节点)
lineRDD.persist(StorageLevel.DISK_ONLY)
lineRDD.checkpoint()
val startTime: Long = System.currentTimeMillis()
val count = lineRDD.count()
val endTime: Long = System.currentTimeMillis()
println(s"此rdd共计$count 条数据,初始化数据以及cache用时为:${endTime - startTime}")
println("----------------------")
println("无persist的情况")
val startTime2: Long = System.currentTimeMillis()
val count2 = lineRDD.count()
val endTime2: Long = System.currentTimeMillis()
println(s"此rdd共计$count2 条数据,初始化数据以及cache用时为:${endTime2 - startTime2}")
println("======checkpoint结束=======")
println("------控制算子结束-------")
sc.stop()
}
}
cache
- 默认将 RDD 的数据持久化到内存中。 cache 是懒执行。
- cache() = persist() = persist(StorageLevel.Memory_Only)
- rdd.cache().count() 返回的不是持久化的RDD,而是一个数值
persist
- 可以指定持久化的级别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK 。
checkpoint
- checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系,也是懒执行。
执行原理
- 当RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
- 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。
- Spark 框架会自动启动一个新的 job ,重新计算这个 RDD 的数据,将数据持久化到Checkpint目录 中。
- 使用checkpoint 时常用优化手段:
- 对RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache
- 这样新启动的 job 只需要将内存中的数据拷贝到Checkpint目录中就可以,省去了重新计算这一步。
四、任务提交方式
Standalone-client
执行流程
- client模式提交任务后,会在客户端启动 Driver 进程。
- Driver会向 Master 申请启动 Application 启动的资源。资源申请成功,Driver 端将 task 分发到 worker 端执行,启动 executor 进程(任务的分发)。
- worker端( exectuor 进程)将 task 执行结果返回到 Driver 端(任务结果的回收)。
总结
- client模式适用于测试调试程序。
- Driver 进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在 Driver 端可以看到 task 执行的情况。生产环境下不能使用 client 模式,是因为:假设要提交100个 application 到集群运行, Driver 每次都会在 client 端启动,那么就会导致客户端100次网卡流量暴增的问题。
Standalone-cluster
执行流程
- cluster 模式提交应用程序后,会向 Master 请求启动 Driver 。
- Master 接受请求,随机在集群一台节点启动 Driver 进程。
- Driver 启动后为当前的应用程序申请资源。Driver 端发送 task 到 worker 节点上执行(任务的分发)。
- worker 上的 executor 进程将执行情况和执行结果返回给 Driver 端(任务结果的回收)。
总结
- Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的 worker 节点都要有,因为此种方式, spark 不会自动上传包。
- 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。 将所有的依赖包和文件各放一份在 worker 节点上。
yarn-client
执行流程
- 客户端提交一个 Application ,在客户端启动一个 Driver 进程。
- 应用程序启动后会向 RS ( ResourceManager )(相当于 standalone 模式下的 master 进程) 发送请求,启动 AM ( ApplicationMaster )。
- RS收到请求,随机选择一台 NM ( NodeManager )启动 AM 。这里的 NM 相当于 Standalone 中的 Worker 进程。
- AM启动后,会向 RS 请求一批 container 资源,用于启动 Executor 。
- RM会找到一批 NM (包含 container )返回给 AM ,用于启动 Executor 。
- AM会向 NM 发送命令启动 Executor 。
- Executor 启动后,会反向注册给 Driver , Driver 发送 task 到 Executor ,执行情况和结果返回给 Driver 端。
总结
- Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地, Driver 会与 yarn 集群中的 Executor 进行大量的通信
- ApplicationMaster (executorLauncher)的在此模式中的作用:
- 为当前的 Application 申请资源
- 给NodeManager 发送消息启动 Executor 。
- 注意: ApplicationMaster 在此种模式下没有作业调度的功能。
yarn-cluster
执行流程
- 客户机提交 Application 应用程序,发送请求到 RS ( ResourceManager ),请求启动AM ( ApplicationMaster )。
- RS收到请求后随机在一台 NM ( NodeManager )上启动 AM (相当于 Driver 端)。
- AM启动, AM 发送请求到 RS ,请求一批 container 用于启动 Excutor 。
- RS返回一批 NM 节点给 AM 。
- AM连接到 NM ,发送请求到 NM 启动 Excutor 。
- Excutor 反向注册到 AM 所在的节点的 Driver 。 Driver 发送 task 到 Excutor 。
总结
- Yarn-Cluster 主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 nodeManager中,每次提交任务的 Driver 所在的机器都是不再是提交任务的客户端机器,而是多个 NM 节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看 到日志。只能通过 yarn 查看日志。
- ApplicationMaster 在此模式中的的作用:
- 为当前的 Application 申请资源;给 NodeManger 发送消息启动 Executor 。 任务调度。
五、窄依赖和宽依赖
RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
- 窄依赖
- 父RDD 和子 RDD 的 partition 之间的关系是一对一的。或者父 RDD 和子 RDD 的 partition 关系是多对一的。不会有 shuffle 的产生。
- 宽依赖
- 父RDD 与子 RDD 的 partition 之间的关系是一对多。会有 shuffle 的产生。
六、Stage
Spark任务会根据 RDD 之间的依赖关系,形成一个 DAG 有向无环图, DAG 会提交给 DAGScheduler , DAGScheduler 会把 DAG 划分成相互依赖的多个 stage ,划分 stage 的依据就是 RDD 之间的宽窄依赖。遇到宽依赖就划分 stage ,每个 stage 包含一个或多个 task 任务。然后将这些 task 以 taskSet 的形式提交给 TaskScheduler 运行。
stage 是由一组并行的 task 组成。
stage切割规则
切割规则:从后往前,遇到宽依赖就切割 stage 。
- 从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
- 每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的;
- 最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask;
- 代表当前Stage的算子一定是该Stage的最后一个计算步骤;
总结:由于spark中stage的划分是根据shuffle来划分的,而宽依赖必然有shuffle过程,因此可以说spark是根据宽窄依赖来划分stage的。
stage计算模式
- pipeline 管道计算模式, pipeline 只是一种计算思想、模式。
- 在spark中pipeline是一个partition对应一个partition,所以在stage内部只有窄依赖。
- 数据一直在管道里面什么时候数据会落地?
- 对RDD 进行持久化( cache , persist )。
- shuffle write 的时候。
- Stage 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定的 。
- 如何改变 RDD 的分区数?
- reduceByKey(XXX,3)、GroupByKey(4)、sc.textFile(path,numpartition)
- 使用算子时传递 分区num参数 就是分区 partition 的数量。
七、SparkShuffle
SparkShuffle概念
- reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新 的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value
- 问题:
- 聚合之前,每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,RDD的partition极有可能分布在各个节点上。
- 如何聚合?
- Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
- Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
HashShuffle
普通机制
执行流程:
- 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。
- 每个buffer文件最后对应一个磁盘小文件。reduce task来拉取对应的磁盘小文件。
总结:
- map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。
- 产生的磁盘小文件的个数:
- M(map task的个数)*R(reduce task的个数)
- 产生的磁盘小文件过多,会导致以下问题:
- 在Shuffle Write过程中会产生很多写磁盘小文件的对象。在Shuffle Read过程中会产生很多读取磁盘小文件的对象。在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存的话,就会OOM。在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一 旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败, TaskScheduler不负责重试,由DAGScheduler负责重试Stage。
合并机制
执行流程:
- 合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默 认值为false,将其设置为true即可开启优化机制。
- 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出 现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与 下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少 个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
- 假设第一个stage有50个task,第二个stage有100个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会 产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。
总结:
- 产生磁盘小文件的个数: C(core的个数)*R(reduce的个数)
SortShuffle
普通机制
执行流程:
- maptask 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5M 。在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的 数据超过 5M 时,比如现在内存结构中的数据为 5.01M ,那么他会申请 5.01*2-5=5.02M 内存 给内存数据结构。如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。在溢写之前内存结构中的数据会进行排序分区 然后开始溢写磁盘,写磁盘是以 batch 的形式去写(批量),一个 batch 是1万条数据map task 执行完成后,会将这些 磁盘小文件 合并成一个大的磁盘文件,同时生成一个 索引文件。reduce task 去 map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的 数据。
- 产生的磁盘小文件的个数: 2*M(map task的个数)
bypass机制
bypass 运行机制的触发条件如下
shuffle reduce task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 的参数 值。这个值默认是 200 。
不需要进行 map 端的预聚合,比如 groupBykey , join 。产生的磁盘小文件为: 2*M(map task的个数) 。
八、Spark资源调度和任务调度
调度流程
启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master 掌握了集群资源情况。 当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向 无环图。 任务提交后, Spark 会在 Driver 端创建两个对象: DAGScheduler 和 TaskScheduler ,DAGScheduler 是任务调度的高层调度器,是一个对象。DAGScheduler 的主要作用就是将 DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage ,然后将这些 Stage 以 TaskSet 的形式提交给 TaskScheduler ( TaskScheduler 是任务调度的低层 调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中 的并行的 task 任务)。TaskSchedule 会遍历 TaskSet 集合,拿到每个 task 后会将 task 发送到 Executor 中去执行(其 实就是发送到 Executor 中的线程池 ThreadPool 去执行)。task 在 Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试3次。如果重试3次依 然失败,那么这个 task 所在的 stage 就失败了。stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到 TaskScheduler , Stage 默认重试4次。如果重试4次以后依然失败,那么这个 job 就失败了。 job 失败了, Application 就 失败了。TaskScheduler 不仅能重试失败的 task ,还会重试 straggling (落后,缓慢) task ( 也就是执 行速度比其他task慢太多的task )。如果有运行缓慢的 task 那么 TaskScheduler 会启动一个新的task 来与这个运行缓慢的 task 执行相同的处理逻辑。两个 task 哪个先执行完,就以哪个 task 的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行 可以通过 spark.speculation 属性来配置。
流程图解
粗细粒度资源申请
粗粒度资源申请(Spark)
- 在Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。
- 优点:在 Application 执行之前,所有的资源都申请完毕,每一个 task 直接使用资源就可以了,不需要 task 在执行前自己去申请资源, task 启动就快了, task 执行快了, stage 执行就快了, job 就快了, application 执行就快了。
- 缺点:直到最后一个 task 执行完成才会释放资源,集群的资源无法充分利用。
细粒度资源申请(MR)
- Application 执行之前不需要先去申请资源,而是直接执行,让 job 中的每一个 task 在执行前自己去申请资源, task 执行完成就释放资源。
- 优点:集群的资源可以充分利用。
- 缺点:task 自己去申请资源, task 启动变慢, Application 的运行就响应的变慢了。
九、广播变量和累加器
广播变量
广播变量使用
val conf = new sparkConf(
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("he1lo xxx")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile(" ./words.txt")
lineRDD.filter { x => broadcast.value.contains(x) }.foreach {println}
sc.stop()
注意事项
- 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
- 在Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。
累加器
累加器的使用
val conf = new sparkConf()
conf.setMaster("local" ).setAppName ("accumulator")
val sc = new sparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}print1n(accumu1ator. value)
sc.stop()
注意事项
- 累加器在 Driver 端定义赋初始值,累加器只能在 Driver 端读取,在 Excutor 端更新。
十、Spark SQL
DataFrame
- 可以简单的理解DataFrame为RDD+schema元信息
- 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库的二维表格
- DataFrame带有schema元信息,DataFrame所表示的数据集每一列都有名称和类型,DataFrame可以从很多数据源构建对象,如已存在的RDD、结构化文件、外部数据库、Hive表。
- RDD可以把内部元素当成java对象,DataFrame内部是一个个Row对象,表示一行行数据
- 左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。
- 右侧的DataFrame却提供了详细的结构信息,DataFrame多了数据的结构信息,即schema。
DataSet
- DataSet是分布式的数据集合,DataSet提供了强类型支持,在RDD的每行数据加了类型约束
- Dataset是在spark1.6中新添加的接口。它集中了RDD的优点(强类型和可以使用强大的lambda函数)以及使用了sparkSQL优化的执行引擎。
- DataFrame(在2.X之后)实际上是DataSet的一个特例,即对Dataset的元素为Row时起了一个别名
DSL操作
action
- show以表格的形式在输出中展示 jdbcDF 中的数据,类似于 select * from spark_sql_test 的功能。
- 功能
- show只显示前20条记录
- show(numRows: Int) 显示 numRows 条
- show(truncate: Boolean) 是否最多只显示20个字符,默认为 true 。
- show(numRows: Int, truncate: Boolean) 综合前面的显示记录条数,以及对过长字符串的显示格式。
- collect方法会将 jdbcDF 中的所有数据都获取到,并返回一个 Array 对象。
- collectAsList:获取所有数据到List
- describe(cols: String*):获取指定字段的统计信息
- first, head, take, takeAsList:获取若干行记录
- first 获取第一行记录
- head 获取第一行记录, head(n: Int) 获取前n行记录
- take(n: Int) 获取前n行数据
- takeAsList(n: Int) 获取前n行数据,并以 List 的形式展现
查询
- where(conditionExpr: String) :SQL语言中where关键字后的条件
- 可以用 and 和 or 。得到DataFrame类型的返回结果
- filter:根据字段进行筛选
- 得到DataFrame类型的返回结果。和 where 使用条件相同
- select:获取指定字段值
- 根据传入的 String 类型字段名,获取指定字段的值,以DataFrame类型返回
- selectExpr :可以对指定字段进行特殊处理
- 可以直接对指定字段调用UDF函数,或者指定别名等。传入 String 类型参数,得到DataFrame对象。
- col:获取指定字段
- 只能获取一个字段,返回对象为Column类型。
- apply:获取指定字段
- 只能获取一个字段,返回对象为Column类型
- drop:去除指定字段,保留其他字段
- 返回一个新的DataFrame对象,其中不包含去除的字段,一次只能去除一个字段。
Limit
- limit方法获取指定DataFrame的前n行记录,得到一个新的DataFrame对象。
排序
- orderBy 和 sort :按指定字段排序,默认为升序
- 按指定字段排序。加个 – 表示降序排序。 sort 和 orderBy 使用方法相同
- jdbcDF.orderBy(- jdbcDF(“c4”)).show(false)
- jdbcDF.orderBy(jdbcDF(“c4”).desc).show(false)
- 按指定字段排序。加个 – 表示降序排序。 sort 和 orderBy 使用方法相同
- sortWithinPartitions
- 和上面的 sort 方法功能类似,区别在于 sortWithinPartitions 方法返回的是按Partition排好序的DataFrame对象。
组函数
- groupBy :根据字段进行 group by 操作
- groupBy 方法有两种调用方式,可以传入 String 类型的字段名,也可传入 Column 类型的对象
- cube 和 rollup :group by的扩展
- 功能类似于 SQL 中的 group by cube/rollup
- groupedData对象
- 该方法得到的是 GroupedData 类型对象,在 GroupedData 的API中提供了 group by 之后的操作
去重
- distinct :返回一个不包含重复记录的DataFrame
- 返回当前DataFrame中不重复的Row记录。该方法和接下来的 dropDuplicates() 方法不传入指定字段时的结果相同。
- dropDuplicates :根据指定字段去重
- 根据指定字段去重。类似于 select distinct a, b 操作
聚合
- 聚合操作调用的是 agg 方法,该方法有多种调用方式。一般与 groupBy 方法配合使用。
- 以下示例其中最简单直观的一种用法,对 id 字段求最大值,对 c4 字段求和。
- jdbcDF.agg(“id” -> “max”, “c4” -> “sum”)
Union
- unionAll 方法:对两个DataFrame进行组合 ,类似于 SQL 中的 UNION ALL 操作。
Join
- 笛卡尔积
- joinDF1.join(joinDF2)
- using一个字段形式
- 下面这种join类似于 a join b using column1 的形式,需要两个DataFrame中有相同的一个列名
- joinDF1.join(joinDF2, “id”)
- using 多个字段形式
- 上面这种 using 一个字段的情况外,还可以 using 多个字段
save
save可以将data数据保存到指定的区域
dataFrame.write.format(“json”).mode(SaveMode.Overwrite).save()
SparkSQL的数据源
SparkSQL 的数据源可以是 JSON 类型的字符串, JDBC , Parquet , Hive , HDFS 等。
SparkSQL底层架构
首先拿到 sql 后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过 SparkPlanner 的策略转化成一批物理计划,随后经过消费模型转换成一个个的 Spark 任务执行。
谓词下推
Predicate Pushdown简称谓词下推,简而言之,就是在不影响结果的情况下,尽量将过滤条件提量,节约了集群的资源,也提升了任务的性能。前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。
十一、Spark Streaming
概述
- SparkStreaming 是流式处理框架 ,是 Spark API ( RDD )的扩展,支持可扩展、高吞吐量、容错的、实时数据流处理
- 实时数据的来源可以是: Kafka, Flume, Twitter, ZeroMQ 或者 TCP sockets ,在接受数据同时可以
- 使用高级功能的 复杂算子来处理流数据 。
- 最终处理后的数据可以存放在文件系统,数据库等,方便实时展现。
示例
下面以一个简单的例子开始spark streaming的学习之旅!我们会从本机的7777端口源源不断地收到以换行符分隔的文本数据流
// 在本地启动名为SimpleDemo的SparkStreaming应用
// 该应用拥有两个线程,其批处理时间间隔为1s
// 创建SparkConf
val conf = new SparkConf().setMaster("local[2]").setAppName("SimpleDemo")
// 从SparkConf创建StreamingContext并指定1秒钟的批处理大小
val ssc = new StreamingContext(conf, Seconds(1))
// 创建ReceiverInputDStream,该InputDStream的Receiver监听本地机器的7777端口
val lines = ssc.socketTextStream("localhost", 7777) // 类型是ReceiverInputDStream
// 从DStream中筛选出包含字符串"error"的行,构造出了
// lines -> errorLines -> .print()这样一个DStreamGraph
val errorLines = lines.filter(_.contains("error"))
// 打印出含有"error"的行
errorLines.print()
让我们从创建StreamingContext 开始,它是流计算功能的主要入口。StreamingContext 会在底层创建出SparkContext,用来处理数据。其构造函数还接收用来指定多长时间处理一次新数据的批次间隔(batch interval)作为输入,这里我们把它设为1 秒。
接着,调用socketTextStream() 来创建出基于本地7777端口上收到的文本数据的DStream。然后把DStream 通过filter() 进行转化,只得到包含“error”的行。最后,使用输出操作print() 把一些筛选出来的行打印出来。
到此时只是设定好了要进行的计算步骤,系统收到数据时计算就会开始。要开始接收数据,必须显式调用StreamingContext 的start() 方法。这样,Spark Streaming 就会开始把Spark 作业不断交给下面的SparkContext 去调度执行。执行会在另一个线程中进行,所以需要调用awaitTermination 来等待流计算完成,来防止应用退出。
// 启动流计算环境StreamingContext并等待它"完成"
ssc.start()
// 等待作业完成
ssc.awaitTermination()
下面结合代码逐一分析SparkStreaming应用执行的过程
在Driver端中,StreamingContext初始化时会创建一些内部的关键组件如DStreamGraph、ReceiverTracker、JobGenerator和JobScheduler等。实际上,调用StreamingContext.start()方法的时候,就会在Spark集群中的某个Worker节点上的Executor,启动输入DStream的Receiver。
Receiver负责从外部数据源接收数据,Receiver接收到数据之后,会启动一个BlockGenerator,其会每隔一段时间(可配置,默认是200ms)将Receiver接收到的数据,打包成一个block,每个block除了会保存到所运行的Executor关联的BlockManager中之外,还会发送一份block信息如blockId到Driver端的ReceiverTracker上,其会将一个一个的block信息存入一个HashMap中,key就是时间。
记下来,JobGenerator会每隔我们定义的batch时间间隔,就会去ReceiverTracker中获取经过这个batch时间间隔内的数据信息blocks,将这些block聚合成一个batch,然后这个batch会被创建为一个RDD。
这样每隔一个batch时间间隔,都会将在这个时间间隔内接收的数据形成一个RDD,这样就会产生一个RDD序列,每个RDD代表数据流中一个时间间隔内的数据。正是这个RDD序列,形成SparkStreaming应用的输入DStream。
从宏观来说,Spark Streaming 使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming 从各种输入源中读取数据,并把数据分组为小的batch。新的batch按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的batch就创建出来,在该区间内收到的数据都会被添加到这个batch中。在时间区间结束时,batch停止增长。时间区间的大小是由batch间隔这个参数决定的。batch间隔一般设在500 毫秒到几秒之间,由应用开发者配置。每个输入batch都形成一个RDD,以Spark 作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。
StreamingContext初始化后,包括Receiver启动后,再到生成输入DStream,就完成了SparkStreaming应用程序的准备工作。
DStream的转化操作
在创建好输入DStream后,对其调用了filter()算子,filter()算子是转化操作,会将操作应用到DStream的每一个RDD。
一些常见的转化操作如下图
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream 在内部是由许多RDD(批次)组成,且这些转化操作是分别应用到每个RDD 上的。例如,filter()会对DStream内的每个时间区间的数据(RDD)进行过滤,reduceByKey() 会归约每个RDD中的数据,但不会归约不同区间之间的数据。
对于本文的示例,对输入DStream作filter操作后生成新的DStream的过程如下:
DStream的输出操作
Spark Streaming允许DStream的数据输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用,该操作类似于RDD的输出操作。
在Spark核心中,作业是由一系列具有依赖关系的RDD及作用于这些RDD上的算子函数所组成的操作链。在遇到行动操作时触发运行,向DAGScheduler提交并运行作业。Spark Streaming中作业的生成与Spark核心类似,对DStream进行的各种操作让它们之间构建起依赖关系。
当遇到DStream使用输出操作时,这些依赖关系以及它们之间的操作会被记录到名为DStreamGraph的对象中表示一个job。这些job注册到DStreamGraph并不会立即运行,而是等到Spark Streaming启动后,到达批处理时间时,才根据DSteamGraph生成job处理该批处理时间内接收的数据。在Spark Streaming如果应用程序中存在多个输出操作,那么在批处理中会产生多个job。
与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
常用的一种调试性输出操作是print(),它会在每个批次中抓取DStream的前十个元素打印出来。
一些常用的输出操作如下
Spark Streamin运行架构
总体来说,Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是Spark Streaming将输入数据按照batch interval(如5秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对DSteam内各个RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备
假设batchInterval = 5s, SparkStreaming启动之后,0-5s内一直接受数据,假设SparkStreaming处理这一个批次数据的时间是3s,那么5-8s内一边接收新数据(开始第二批次),同时会触发DStream的job执行,这时会启动另一个线程处理第一批次的数据;8-10s内只是接收数据(还是第二批次);10-13s内一边接收新数据(开始第三批次),一边处理第二批次的数据,然后13-15s只是接收数据(还是第三批次),如此往复进行数据的接收与处理。
Spark Streaming相对其他流处理系统最大的优势在于流处理引擎和数据处理在同一个软件栈,其中Spark Streaming功能主要包括流处理引擎的数据接收与存储以及批处理作业的生成与管理,而Spark核心负责处理Spark Streaming发送过来的作业。
Spark Streaming分为Driver端和Client端,运行在Driver端为StreamingContext实例。该实例包括DStreamGraph和JobScheduler(包括ReceiverTracker和JobGenerator)等,而Client包括ReceiverSupervisor和Receiver等。
SparkStreaming进行流数据处理大致可以分为:启动流处理引擎、接收及存储流数据、处理流数据和输出处理结果等4个步骤,其运行架构如下
- 初始化StreamingContext对象,在该对象启动过程中实例化DStreamGraph和JobScheduler,其中DStreamGraph用于存放DStream以及DStream之间的依赖关系等信息,而JobScheduler中包括ReceiverTracker和JobGenerator。其中ReceiverTracker为Driver端流数据接收器(Receiver)的管理者,JobGenerator为批处理作业生成器。在ReceiverTracker启动过程中,根据流数据接收器分发策略通知对应的Executor中的流数据接收管理器(ReceiverSupervisor)启动,再由ReceiverSupervisor启动流数据接收器Receiver。
- 当流数据接收器Receiver启动后,持续不断地接收实时流数据,根据传过来数据的大小进行判断,如果数据量很小,则攒多条数据成一块,然后再进行块存储;如果数据量大,则直接进行块存储。对于这些数据Receiver直接交给ReceiverSupervisor,由其进行数据转储操作。块存储根据是否设置预写日志分为两种,一种是使用非预写日志BlockManagerBasedBlockHandler方法直接写到Worker的内存或磁盘中,另一种是进行预写日志WriteAheadLogBasedBlockHandler方法,即在预写日志同时把数据写入到Worker的内存或磁盘中。数据存储完毕后,ReceiverSupervisor会把数据存储的元信息上报给ReceiverTracker,ReceiverTracker再把这些信息转发给ReceiverBlockTracker,由它负责管理收到的数据块的元信息。
- 在StreamingContext的JobGenerator中维护一个定时器,该定时器在批处理时间到来时会进行生成作业的操作。在该操作中进行如下操作:
- 通知ReceiverTracker将接收到的数据进行提交,在提交时采用synchronized关键字进行处理,保证每条数据被划入一个且只被划入一个批次中。
- 要求DStreamGraph根据DSream依赖关系生成作业序列Seq[Job]。
- 从第一步中ReceiverTracker获取本批次数据的元数据。
- 把批处理时间time、作业序列Seq[Job]和本批次数据的元数据包装为JobSet,调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler,JobScheduler将把这些作业发送给Spark核心进行处理,由于该执行为异步,因此本步执行速度将非常快。
- 只要提交结束(不管作业是否被执行),SparkStreaming对整个系统做一个检查点(Checkpoint)。
- 在Spark核心的作业队数据进行处理,处理完毕后输出到外部系统,如数据库或文件系统,输出的数据可以被外部系统所使用。由于实时流数据的数据源源不断地流入,Spark会周而复始地进行数据处理,相应地也会持续不断地输出结果。
注意事项
1、local模式下需要启动至少两个线程,因为只开启了一条线程(这里只有接收数据的线程,却没有处理数据的线程),所以local模式下SparkStreaming必须至少设置两个线程
new SparkConf().setMaster("local[2]").setAppName("SimpleDemo");
2、Durations时间的设置–接收数据划分批次的时间间隔,多久触发一次job
new StreamingContext(conf, Seconds(1))
3、业务逻辑完成后,需要有一个输出操作,将SparkStreaming处理后的数据输出到外部存储系统
4、关于 StreamingContext 的 start()和 stop()
StreamingContext.start() //Spark Streaming应用启动之后是不能再添加业务逻辑
StreamingContext.stop() //无参的stop方法会将SparkContext一同关闭,解决办法:stop(false)
StreamingContext.stop() //停止之后是不能在调用start()
5、DStreams(Discretized Streams–离散的流),应用在每个DStream的算子操作,会应用在DStream内的各个RDD,进而应用在RDD的各个Partition,应用在Partition中的一条条数据,最终应用到每一条记录上
参考资料
Spark简介
Spark
Spark Streaming简单入门(示例+原理)