【ArchSummit】如何通过AIOps推动可量化的业务价值增长和效率提升?>>> 了解详情
写点什么

Spark Streaming 源码分析——DStream 的构建和运行

  • 2019-09-27
  • 本文字数:3984 字

    阅读完需:约 13 分钟

Spark Streaming源码分析——DStream的构建和运行

贝壳实时计算平台已承接公司各种实时数据任务 100 多个,日处理消费总量接近 250 亿,采用的底层引擎包括 Spark-Streaming 和 Flink。为了提升数据处理速度和稳定性,我们对各种引擎的处理特性进行了深入研究。在 Spark-Streaming 中,对流的抽象是使用 DStream 来定义的,所以想要理解 Spark-Streaming 的流处理模型,理解 DStream 的内部实现以及其如何构建和运行是很有必要的。

DStream 的构建

我们在定义一个流的处理逻辑时,首先从一个数据的流入源开始,这个数据源使用 InputDStream 定义,它是 DStream 的一个子类,之后我们会在其上调用一些 tranform 类型算子,像 map,reduce,filter 等等,每调用一个算子,都会创建一个新的 DStream,每一个新创建的 DStream 都保留着当前节点所依赖的上一个节点和当前节点的执行逻辑这两个部分,这样,多个 DStream 节点就构成了一个逻辑执行链。


比如如下代码会生成图 1 的执行链:


1stream.map(...).filter(...).foreachRDD(...)
复制代码



**虚线箭头描述的是依赖关系**


最后当调用 action 类型的算子时,所有 action 类型的算子,底层都是通过 ForEachDStream 实现的。


我们来看 ForEachDStream 的源码:


 1private[streaming] class ForEachDStream[T: ClassTag] ( 2    parent: DStream[T], 3    foreachFunc: (RDD[T], Time) => Unit, 4    displayInnerRDDOps: Boolean 5  ) extends DStream[](parent.ssc) { 6 7  override def dependencies: List[DStream[_]] = List(parent) 8 9  override def slideDuration: Duration = parent.slideDuration1011  override def compute(validTime: Time): Option[RDD[Unit]] = None1213  override def generateJob(time: Time): Option[Job] = {14    parent.getOrCompute(time) match {15      case Some(rdd) =>16        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {17          foreachFunc(rdd, time)18        }19        Some(new Job(time, jobFunc))20      case None => None21    }22  }23}
复制代码


这里我们关注 generateJob 方法,这里调用了它依赖的父 DStream 的 getOrCompute 来生成一个需要它来处理的 RDD,然后对该 RDD 做该节点本身需要做的一些操作,即 foreachFunc 闭包,其实所有 DStream 的 getOrCompute 方法底层都会调用 compute 方法,该方法会返回 RDD,即所有的 DStream 的 compute 方法中,要么其本身能够从外部拉取数据,即 InputDStream 作为 DStream 链的第一个节点,要么其本身调用依赖的上游 DStream 的 compute 方法,再对生成的 RDD 做其本节点所定义的一些操作作为其返回值。


如此,当 DStream 链的最后一个节点被调用了 compute 方法时,它能够依次递归的调用逐节点的 compute 方法,最后调用第一个 InputDStream 节点的 compute 方法生成一个能够拉取外部数据的 RDD。


其调用的时序图如下:



以上只是为了直观的理解 DStream 链是如何工作的,具体体现在分布式环境上时,是由 RDD 来定义操作,切分成 task 后由 Executor 来执行。


另外需要说的是如果我们在单个流上定义一系列除 window 外的操作,其和我们直接调用 InputDStream 的 foreachRDD 后,在 rdd 上定义操作是等效的。

DStream 的运行

除了上面介绍的 DStream 之外,在 Spark-Streaming 内部还有一些保存作业处理逻辑的模块和用于根据时间生成和管理每个批次数据的模块。下面是在 SparkStreaming 中一些比较核心的类,他们是构成一个流式作业,和使其运行起来的框架。


1.InputDStream 管理流的数据源的通用抽象类


2.JobGenerator 作业生成器


3.JobScheduler 作业调度器,用于提交作业到集群运行


4.DStreamGraph 管理创建的所有 InputDStream 的初始化和启动,但不负责 InputDStream 间依赖关系的管理,InputDStream 间依赖关系由其子类实现管理


首先看一下这四个类交互的时序图:



图中只画了一些比较重要和核心的类和逻辑。JobGenerator 每隔我们设定的时间间隔会生成一个 JobGeneratorEvent 事件用于触发生成一个作业。其内部是通过 RecurringTimer 类和 EventLoop 实现的。


代码如下:初始化 timer 和 eventLoop。


 1  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 2    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 3 4  def start(): Unit = synchronized { 5    if (eventLoop != null) return // generator has already been started 6 7    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. 8    // See SPARK-10125 9    checkpointWriter1011    eventLoop = new EventLoop[]("JobGenerator") {12      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)1314      override protected def onError(e: Throwable): Unit = {15        jobScheduler.reportError("Error in job generator", e)16      }17    }18    eventLoop.start()1920    if (ssc.isCheckpointPresent) {21      restart()22    } else {23      startFirstTime()24    }25  }
复制代码


这里 processEvent 方法用来做消息分发,根据消息的不同类型调用不同的函数进行处理。


 1  private def processEvent(event: JobGeneratorEvent) { 2    logDebug("Got event " + event) 3    event match { 4      case GenerateJobs(time) => generateJobs(time) 5      case ClearMetadata(time) => clearMetadata(time) 6      case DoCheckpoint(time, clearCheckpointDataLater) => 7        doCheckpoint(time, clearCheckpointDataLater) 8      case ClearCheckpointData(time) => clearCheckpointData(time) 9    }10  }
复制代码


startFirstTime 中调用了 timer 和 DStreamGraph 的 start 方法,将二者内部的事件循环线程启动起来。


到这里我们知道当 timer 根据时间生成 GenerateJobs 事件时,会触发 generateJobs 函数的调用。


我们来看 generateJobs 的代码:


 1  private def generateJobs(time: Time) { 2    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 3    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 4    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 5    Try { 6      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 7      graph.generateJobs(time) // generate jobs using allocated block 8    } match { 9      case Success(jobs) =>10        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)11        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))12      case Failure(e) =>13        jobScheduler.reportError("Error generating jobs for time " + time, e)14        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)15    }16    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))17  }
复制代码


其内部又调用了 DStreamGraph 的 generateJobs 方法用来生成多个 Job,之后通过 JobScheduler 对这些 Job 进行提交。


DStreamGraph 底层生成作业的过程是 DStreamGraph 实现的,它会遍历所有注册过的 ForEachDStream,并分别调用他们的 generateJob 方法,返回一个 Job 对象,这就跟我们上面讲过的 ForEachDStream 部分关联上了。


Job 里面包含了一个需要在其上执行计算的 RDD,包括所有计算逻辑的闭包,而这个闭包真正执行,是在 JobScheduler 将这个 Job 对象提交到一个线程池之后,其会在线程池内执行这个 Job 对象内的闭包逻辑,将其转换成分布式计算的 task 分发到不同的节点上去执行。


JobScheduler.submitJobSet 的如下代码:


 1  def submitJobSet(jobSet: JobSet) { 2    if (jobSet.jobs.isEmpty) { 3      logInfo("No jobs added for time " + jobSet.time) 4    } else { 5      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 6      jobSets.put(jobSet.time, jobSet) 7      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 8      logInfo("Added jobs for time " + jobSet.time) 9    }10  }
复制代码


jobExecutor 就是那个线程池,其队列长度可通过 spark.streaming.concurrentJobs 进行配置,默认等于 1。该参数决定了能够并发执行的 Job 数量。


JobHandler 是 Job 的封装,会在执行 Job 的逻辑前后分别发布 JobStarted 和 JobCompleted 事件。而 Job 对象真正执行的逻辑就是在 ForEachDStream 类中的创建 Job 时 foreachFunc 闭包。

Spark-Streaming 在贝壳的使用

目前贝壳的实时计算平台上运行着 143 个实时任务,其中 121 个为 Spark 任务,62 个为 Spark-Streaming 任务,59 个为 Structured Streaming 任务。为了实现组内任务的快速开发,我们在 Spark 实时技术栈的基础上抽象出了 Chronus 框架,将流计算抽象出 Source,Sink 和 Pipeline,其中 Source 和 Sink 可通过配置直接定义,开发时能够将精力集中在 Pipeline 中的业务逻辑上。


框架对监控,offset 管理都做了封装。同时,我们基于 Chronus 框架也开发除了很多任务模板,能够让用户通过一些简单的可视化配置直接进行流式任务的开发,其中最典型的要属 SQL 模板:



上述的那 59 个 Structured Streaming 就是基于该 SQL 模板开发的实时任务。


作者介绍:


顾渊离(企业代号名),目前负责贝壳大数据部实时计算平台底层引擎和实时场景模板相关开发工作。


本文转载自公众号贝壳产品技术(ID:gh_9afeb423f390)。


原文链接:


https://mp.weixin.qq.com/s/rkzIo8sweI2UOOnmOj6SQQ


2019-09-27 14:37661

评论

发布
暂无评论
发现更多内容

云原生新边界——阿里云边缘计算云原生落地实践

阿里巴巴云原生

云计算 容器 运维 云原生 边缘计算

软件 IT 专业的高校学生有关在线课程的问卷调查

程序员历小冰

【XXX高校】软件IT专业学生(恋爱观)调查问卷

洛神灬殇

调查报告 大学生 恋爱

面向软件 IT 专业的高校大学生课余时间自学情况调查

xiezhr

大学生日常 IT 高校学院 问卷调查

如何从 0 到 1 开发 PyFlink API 作业

Apache Flink

flink pyflink python 3.5+

Spring Cloud Stream 体系及原理介绍

阿里巴巴云原生

Java 负载均衡 微服务 云原生 中间件

IT 专业高校大学生就业方向状况调查问卷

架构精进之路

调查报告 4月日更 InfoQ 写作平台 1 周年

C盘内存杀手,原来是这款出人意料的被闲置的软件|iTunes

彭宏豪95

效率 工具 4月日更 iTunes

最新、最全、最详细的 Git 学习笔记总结(2021最新版)

民工哥

后端 Git Submodule linux运维 代码管理

Linux 上 定时备份postgresql 数据库

Yang

数据库 postgresql

2021高校IT专业大学生就业意向调查问卷

黑马腾云

边开飞机边换引擎?我们造了个新功能保障业务流量无损迁移

阿里巴巴云原生

容器 运维 k8s 中间件 弹性计算

What CANN Can?一辆小车背后的智能故事

脑极体

微信小程序登录流程详解

frank-say

UT之最后一测

好好学习,天天向上

云信技术系列课 | RTC 系统音频弱网对抗技术发展与实践

网易云信

WebRTC 音频

如何通过openLooKeng更高效访问HBase?

openLooKeng

Java 大数据 Bigdata MySQL 高可用

容器 & 服务: 扩容(二)

程序员架构进阶

Kubernetes 28天写作 弹性扩容 4月日更

获取chrome80谷歌浏览器存储的指定网站Cookie数据方法详解

老猿Python

Python chrome 爬虫 Cookie

音视频编解码--编码参数CRF

Fenngton

ffmpeg 视频编解码 视频压缩 码率控制 CRF

百度智能云成中国跳水队独家AI合作伙伴圆梦东京!

百度大脑

百度智能云

生命中的无奈

小天同学

读书 读后感 生命 4月日更

特斯拉行车数据被篡改?专家称车企很难自证清白,保留“数据指纹”的区块链技术在路上

CECBC

指纹

LeetCode题解:151. 翻转字符串里的单词,栈,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

智能小车系列-动力系统(ezPWM)

波叽波叽啵😮一口盐汽水喷死你

pwm ezPWM PWM信号

Jcenter 停止服务,说一说我们的迁移方案

Antway

android maven Gradle

浪潮云再次入围央采2021年云计算服务采购名单

浪潮云

云计算

资讯|WebRTC M90 更新

网易云信

WebRTC

排查dubbo接口重复注销问题,我发现了一个巧妙的设计

捉虫大师

dubbo

赋能制造产业智能化转型 百度大脑开放日福州解密

百度大脑

百度大脑 开放日 智能化

智能小车系列-串口设置

波叽波叽啵😮一口盐汽水喷死你

串口 树莓派串口 ttyAMA0

Spark Streaming源码分析——DStream的构建和运行_文化 & 方法_顾渊离_InfoQ精选文章