写点什么

如何基于 Apache Pulsar 和 Spark 进行批流一体的弹性数据处理?

  • 2019-06-20
  • 本文字数:8245 字

    阅读完需:约 27 分钟

如何基于Apache Pulsar和Spark进行批流一体的弹性数据处理?

批流现状

在大规模并行数据分析领域,AMPLab 的『One stack to rule them all』提出用 Apache Spark 作为统一的引擎支持批处理、流处理、交互查询和机器学习等常见的数据处理场景。 2017 年 7 月,Spark 2.2.0 版本正式推出的 Spark structured streaming 将 Spark SQL 作为流处理、批处理底层统一的执行引擎,提供对无界表(无边界的源源不断到达的流数据)和有界表(静态历史数据)的优化查询,而向用户提供 Dataset/DataFrame API 对批流数据联合处理,进一步模糊了批流数据处理的边界。


另一方面,Apache Flink 在 2016 年左右进入大众视野,凭借其当时更优的流处理引擎,原生的 Watermark 支持『Exaclty Once』的数据一致性保证,和批流一体计算等各种场景的支持,成为 Spark 的有力挑战者。无论是使用 Spark 还是 Flink,用户真正关心的是如何更好地使用数据,更快地挖掘数据中的价值,流数据和静态数据不再是分离的个体,而是一份数据的两种不同表征方式。


然而在实践中,构建一个批流一体的数据平台并不只是计算引擎层的任务。因为在传统解决方案中,近实时的流、事件数据通常采用消息队列(例如 RabbitMQ)、实时数据管道(例如 Apache Kafka)存储,而批处理所需要的静态数据通常使用文件系统、对象存储进行保存。这就意味着,一方面,在数据分析过程中,为了保证结果的正确性和实时性,需要对分别存储在两类系统中数据进行联合查询;另一方面,在运维过程中,需要定期将流数据转存到文件/对象存储中,通过维持流形式的数据总量在阈值之下来保证消息队列、数据管道的性能(因为这类系统的以分区为主的架构设计紧耦合了消息服务和消息存储,而且多数都太过依赖文件系统,随着数据量的增加,系统性能会急剧下降),但人为的数据搬迁不但会提升系统的运维成本,而且搬迁过程中的数据清洗、读取、加载也是对集群资源的巨大消耗。


与此同时,从 Mesos 和 YARN 的流行、Docker 的兴起到现在的 Kubernetes 被广泛采用,整个基础架构正在全面地向容器化方向发展,传统紧耦合消息服务和消息计算的架构并不能很好地适应容器化的架构。以 Kafka 为例,其以分区为中心的架构紧耦合了消息服务和消息存储。Kafka 的分区与一台或者一组物理机强绑定,这带来的问题是在机器失效或集群扩容中,需要进行昂贵且漫长的分区数据重新均衡的过程;其以分区为粒度的存储设计也不能很好利用已有的云存储资源;此外,过于简单的设计导致其为了进行容器化需要解决多租户管理、IO 隔离等方面很多架构上的缺陷。

Pulsar 简介

Apache Pulsar 是一个多租户、高性能的企业级消息发布订阅系统,最初由 Yahoo 研发, 2018 年 9 月从 Apache 孵化器毕业,成为 Apache 基金会的顶级开源项目。Pulsar 基于发布订阅模式(pub-sub)构建,生产者(producer)发布消息(message)到主题(topic),消费者可以订阅主题,处理收到的消息,并在消息处理完成后发送确认(Ack)。Pulsar 提供了四种订阅类型,它们可以共存在同一个主题上,以订阅名进行区分:


  • 独享(exclusive)订阅——一个订阅名下同时只能有一个消费者。

  • 共享(shared)订阅——可以由多个消费者订阅,每个消费者接收其中一部分消息。

  • 失效备援(failover)订阅——允许多个消费者连接到同一个主题,但只有一个消费者能够接收消息。只有在当前消费者发生失效时,其他消费者才开始接收消息。

  • 键划分(key-shared)订阅(测试版功能)——多个消费者连接到同一主题,相同 Key 总会发送给同一个消费者。


Pulsar 从设计之初就支持多租户(multi-tenancy)的概念,租户(tenant)可以横跨多个集群(clusters),每个租户都有其认证和鉴权方式,租户也是存储配额、消息生存时间(TTL)和隔离策略的管理单元。Pulsar 多租户的特性可以在 topic URL 上得到充分体现,其结构是persistent://tenant/namespace/topic。命名空间(namespace)是 Pulsar 中最基本的管理单元,我们可以设置权限、调整复制选项、管理跨集群的数据复制、控制消息的过期时间或执行其他关键任务。

Pulsar 独特架构

Pulsar 和其他消息系统的最根本区别在于其采用计算和存储分离的分层架构。Pulsar 集群由两层组成:无状态服务层,它由一组接受和传递消息的 broker 组成;分布式存储层,它由一组名为 bookies 的 Apache BookKeeper 存储节点组成,具备高可用、强一致、低延时的特点。



和 Kafka 一样,Pulsar 也是基于主题分区(Topic partition)的逻辑概念进行主题数据的存储。不同的是,Kafka 的物理存储也是以分区为单位,每个 partition 必须作为一个整体(一个目录)被存储在一个 broker 上,而 Pulsar 的每个主题分区本质上都是存储在 BookKeeper 上的分布式日志,每个日志又被分成分段(Segment)。每个 Segment 作为 BookKeeper 上的一个 Ledger,均匀分布并存储在多个 bookie 中。存储分层的架构和以 Segment 为中心的分片存储是 Pulsar 的两个关键设计理念。以此为基础为 Pulsar 提供了很多重要的优势:无限制的主题分区、存储即时扩展,无需数据迁移 、无缝 broker 故障恢复、无缝集群扩展、无缝的存储(Bookie)故障恢复和独立的可扩展性。


消息系统解耦了生产者与消费者,但实际的消息本质上仍是有结构的,因此生产者和消费者之间需要一种协调机制,达到生产、消费过程中对消息结构的共识,以达到类型安全的目的。Pulsar 有内置的 Schema 注册方式在消息系统端提供传输消息类型约定的方式,客户端可以通过上传 Schema 来约定主题级别的消息类型信息,而由 Pulsar 负责消息的类型检查和有类型消息的自动序列化、反序列化,从而降低多应用间的消息解析代码反复开发、维护的成本。当然,Schema 定义与类型安全是一种可选的机制,并不会给非类型化消息的发布、消费产生任何性能开销。

在 Spark 中实现对 Pulsar 数据的读写——Spark Pulsar Connector

自 Spark 2.2 版本 Structured Streaming 正式发布,Spark 只保留了 SparkSession 作为主程序入口,你只需编写 DataSet/DataFrame API 程序,以声明形式对数据的操作,而将具体的查询优化与批流处理执行的细节交由 Spark SQL 引擎进行处理。对于一个数据处理作业,需要定义 DataFrame 的产生、变换和写出三个部分,而将 Pulsar 作为流数据平台与 Spark 进行集成正是要解决如何从 Pulsar 中读取数据(Source)和如何向 Pulsar 写出运算结果(Sink)两个问题。


为了实现以 Pulsar 为源读取批流数据与支持批流数据向 Pulsar 的写入,我们构建了 Spark Pulsar Connector。

对 Structured Streaming 的支持


上图展示了 Structured Streaming(以下简称 SS )的主要组件:


  • 输入和输出——为了提供细粒度的容错,SS 要求输入数据源(Source)是可重放(replayable)的;为了提供端到端的 Exactly-Once 的语义,需要输出(Sink)支持幂等写出(一条消息被多次写入与一次写入效果一致,可由 DBMS、KV 系统通过键约束的方式支持)。

  • API——用户通过编写 Spark SQL 的 batch API(SQL 或 DataFrame)指定对一个或多个流、表的查询,并定义一个输出表保存所有的输出结果,而引擎内部决定如何将结果增量地写到 Sink 中。为了支持流处理,SS 在原有的 Spark SQL API 上添加了一些接口:

  • 触发器(Trigger)——控制引擎触发流处理执行、在 Sink 中更新结果的频率。

  • 水印机制(Watermark policy)——用户通过指定字段做 event time,来决定对晚到数据的处理。

  • 有状态算子(Stateful operator)——用户可以根据 Key 跟踪和更新算子内部的可变状态,完成复杂的业务需求(例如,基于会话的窗口)。

  • 执行层——当收到一个查询时,SS 决定它的增量执行方式,进行优化、并开始执行。SS 有两种可选的执行模型:

  • Microbatch model(微批处理模式)——默认的执行方式,与 Spark Streaming 的 DStream 类似,将流切成 micro batch,对每个 batch 分别处理。这种模式支持动态负载均衡、故障恢复等机制,适合将吞吐率作为主要性能指标的应用。

  • Continuous mode(持续模式)——在集群上启动长时间运行的算子,适合处理较为简单、延迟敏感类应用。

  • Log 和 State Store —— SS 利用两种持久化存储来提供容错保障:一个 Write-ahead-Log(WAL),记录被成功消费且持久化写出的每个数据源中的位置;一个大规模的 state store, 存储长期运行的聚集算子内部的状态快照。当故障发生时,SS 会根据快照的位置,通过重放之后的消息完成流处理状态的恢复。


具体到源码层面,Source 接口定义了可重放数据源需要提供的功能。


trait Source {  def schema: StructType  def getOffset: Option[Offset]  def getBatch(start: Option[Offset], end: Offset): DataFrame  def commit(end: Offset): Unit  def stop(): Unit}
trait Sink { def addBatch(batchId: Long, data: DataFrame): Unit}
复制代码


以 microbatch 执行模式为例:


  1. 在每个 microbatch 的最开始,SS 会向 source 询问当前的最新进度(getOffset),并将其持久化到 WAL 中。

  2. 随后,source 根据 SS 提供的 start end 偏移量,提供区间范围的数据(getBatch)。

  3. SS 触发计算逻辑的优化和编译,把计算结果写出给 sink(addBatch),这时才触发实际的取数据操作以及计算过程。

  4. 在数据完整写出到 sink 后,SS 通知 source 可以废弃数据(commit),并将成功执行的 batchId 写入内部维护的 commitLog 中。


具体到 Pulsar 的 connector 实现中:


  1. 在所有批次开始执行前,SS 会调用 schema 方法返回消息的结构信息,在 schema 方法内部,我们从 Pulsar 的 Schema Registry 提取出所有主题的 Schema,并进行一致性检查。

  2. 随后,我们为每个主题分区创建一个消费者,按照 (start, end] 返回主题分区中的数据。

  3. 当收到 SS 的 commit 通知时,通过 topics 中的 resetCursor 向 Pulsar 标志消息消费的完成。Sink 中构建的生产者则将 addBatch 中获取的实际数据以消息形式追加写入相应的主题中。

对批处理作业的支持

在某个时间点执行的批作业,可以看作是对 Pulsar 平台中的流数据在一个时间点的快照进行的数据分析。Spark 对历史数据的查询是以 Relation 为单位,Spark Pulsar Connector 提供 createRelation 方法的实现根据用户指定的多个主题分区构建表,并返回包含 Schema 信息的 DataSet。在查询计划阶段,Connector 的功能分成两步:首先,根据用户提供的一个或多个主题,在 Pulsar Schema Registry 中查找主题 Schema,并检查多个主题 Schema 的一致性;其次,将用户指定的所有主题分区进行任务划分(Partition),得到的分片即是 Spark source task 的执行粒度。


Pulsar 提供了两层的接口对其中的数据进行访问,基于主题分区的 Consumer/Reader 接口,以传统消息接收为语义的顺序数据读取;Segment 级的读接口,提供对 Segment 数据的直接读取。因此,相应地从 Pulsar 读数据执行批作业可以分成两种粒度(即读取数据的并行度)进行:以主题分区为粒度(每个主题分区作为一个分片);以 Segment 为粒度(将一个主题分区的多个 Segment 组织成一个分片,因此一个主题分区会有多个对应的分片)。你可以按照批作业的并行度需求和可分配计算资源选择合适的消息读取的并行粒度。另一方面,将批作业的执行存储到 Pulsar 也很直观,你只需指定写入的主题和消息路由规则(RoundRobin 或者按 Key 划分),在 Sink task 中创建的每个生产者会将待写出的消息送至对应的主题分区。

如何使用 Spark Pulsar Connector

  • 根据一个或多个主题创建流处理 Source。


val df = spark  .readStream  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("admin.url", "http://localhost:8080")  .option("topicsPattern", "topic.*") // Subscribe to a pattern  // .option("topics", "topic1,topic2")    // Subscribe to multiple topics  // .option("topic", "topic1"). //subscribe to a single topic  .option("startingOffsets", startingOffsets)  .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .as[(String, String)]
复制代码


  • 构建批处理 Source。


val df = spark  .read  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("admin.url", "http://localhost:8080")  .option("topicsPattern", "topic.*")  .option("startingOffsets", "earliest")  .option("endingOffsets", "latest")  .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .as[(String, String)]
复制代码


  • 使用数据中本身的 topic 字段向多个主题进行持续 Sink。


val ds = df  .selectExpr("topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")  .writeStream  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .start()
复制代码


  • 将批处理结果写回 Pulsar。


df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .write  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("topic", "topic1")  .save()
复制代码


注意


由于 Spark Pulsar Connector 支持结构化消息的消费和写入,为了避免消息负载中字段和消息元数据(event time、publish time、key 和 messageId)的潜在命名冲突,消息元数据字段在 Spark schema 中以双下划线做为前缀(例如,__eventTime)。


参考资料


福利推荐

Spark Pulsar Connector 即将开源,对 Spark 集成感兴趣的同学可以关注 Pulsar Shenzhen Meetup 和 StreamNative 公众号。


Apache Pulsar 是下一代云原生分布式流数据平台,它源于 Yahoo,2016 年 12 月开源,2018 年 9 月正式成为 Apache 顶级项目,逐渐从单一的消息系统演化成集消息、存储和函数式轻量化计算的流数据平台。在 Apache Pulsar 快速发展的过程中,社区的伙伴们也致力于硅谷以外的布道之旅,在中国社区开始了不平凡的历程。6 月 29 日,来自腾讯和 Pulsar 社区的开源爱好者们将齐聚一堂,共同探讨 Pulsar 最新动态,包括:


  1. Apache Pulsar 发展历史和用户案例;

  2. Apache Pulsar 2.4.0 新功能。

  3. Flink 在腾讯的实践,Flink 和 Pulsar 的集成应用。

  4. Pulsar 和 Spark 批流融合的数据存储和分析平台。

  5. Pulsar IO 运行原理与周边生态。

  6. Go Functions 的设计与实现。


访问链接,报名参加 Pulsar Shenzhen Meetup。



公众号推荐:

跳进 AI 的奇妙世界,一起探索未来工作的新风貌!想要深入了解 AI 如何成为产业创新的新引擎?好奇哪些城市正成为 AI 人才的新磁场?《中国生成式 AI 开发者洞察 2024》由 InfoQ 研究中心精心打造,为你深度解锁生成式 AI 领域的最新开发者动态。无论你是资深研发者,还是对生成式 AI 充满好奇的新手,这份报告都是你不可错过的知识宝典。欢迎大家扫码关注「AI前线」公众号,回复「开发者洞察」领取。

2019-06-20 10:005935

评论

发布
暂无评论
发现更多内容

架构实战营第四模块作业

Geek_53787a

千万级学生管理系统的考试试卷存储方案设计

Geek_7a789a

高效的代码版本控制,让你居家办公游刃有余 | 社区征文

代码托管 6月月更 初夏征文 协同开发

课程背景

IT蜗壳-Tango

6月月更

echo命令实用技巧

Nick

Docker 镜像源 echo 6月月更 tldr

618战报销冠谜底:“收割机”联想屠榜背后的三大利器是什么?

脑极体

一个完整挖洞/src漏洞实战流程【渗透测试】

网络安全学海

黑客 网络安全 信息安全 渗透测试 漏洞挖掘

leetcode 542. 01 Matrix 01 矩阵(中等)

okokabcd

LeetCode 动态规划 数据结构与算法

C语言中奇妙又有趣的符号——C语言运算(操作)符

未见花闻

6月月更

在线多行文本行转列工具

入门小站

工具

wapper解析

卢卡多多

6月月更

NodeJS Stream入门 🦺

德育处主任

node.js 6月月更

服务治理的目标与愿景

阿泽🧸

服务治理 6月月更

千万级学生管理系统考试试卷存储方案

地下地上

架构实战营

linux几个不常用但是很有用的命令

入门小站

Linux

GetX 响应式状态管理简介

岛上码农

flutter ios 安卓 跨平台应用 6月月更

JVM调优简要思想及简单案例-JVM分代模型

zarmnosaj

6月月更

在线JSON转TSV工具

入门小站

工具

网页设计的发展趋势如何

源字节1号

千万级学生管理系统试卷存储方案(架构实战营 模块四作业)

Gor

攻防演练中红队的外网纵向突破口

穿过生命散发芬芳

6月月更 攻防演练

数据库每日一题---第17天:丢失信息的雇员

知心宝贝

数据库 前端 后端 6月月更

基于Redis sentinel的千万级学生管理系统的考试试卷存储方案

Geek_e8bfe4

又回到最初的起点,记忆中你青涩的脸,我们终于来到了这一天

百思不得小赵

阅读 毕业生 6月月更

LabVIEW Arduino RS-485智能农业监测系统(项目篇—4)

不脱发的程序猿

传感器 智慧农业 LabVIEW Arduino RS-485智能农业监测系统

架构实战营模块 4 作业

Roy

架构实战营

架构实战营模块4作业

挖了蘑菇哩斯

架构实战营 存储方案

【PIMF】盘点开源鸿蒙第三方组件(第三方库)【2】

离北况归

OpenHarmony 三方库

flutter系列之:flutter中常用的box

程序那些事

flutter 程序那些事 6月月更

Java Core 「12」ReentrantLock 再探析

Samson

学习笔记 Java core 6月月更

我理解的微服务 -- 读《微服务设计模式》总结

潜水员

golang 微服务

如何基于Apache Pulsar和Spark进行批流一体的弹性数据处理?_AI&大模型_yjshen_InfoQ精选文章