Streaming SQL 在贝壳的演进之路

发布于:2020 年 6 月 23 日 10:05

Streaming SQL在贝壳的演进之路

实时计算平台,是贝壳内部统一承接实时需求和管理实时任务的平台。目前支持了公司埋点、商机、交易、商业化等若干部门的业务,目前总共运行了 570 多个实时计算任务,日志流量单日吞吐 1041 亿。

随着实时数仓等业务的推进,越来越多的任务接入到我们平台,使我们开发和运维实时任务的成本急剧升高。于是我们迫切希望能有一种快速开发和容易维护的方法,最终我们把希望的目光投向了 SQL。

我们都知道,SQL 作为一种通用的描述数据处理的语言,有着统一的标准和广泛的使用。它的学习成本低,开发效率高,行业内有着完整的生态,成熟的优化规则。

但是,SQL 其实更多的是在线上系统的、面向关系数据库的 OLTP 场景和离线数仓的 OLAP 场景中使用。那么能否将 SQL 应用到实时计算的场景来提升实时任务的开发效率呢?

带着这个问题,我们开始了 SQL on Streaming 的挑战。

1. SQL on Streaming

1.1 问题

我们都知道,SQL 是作用在有限数据集上的,查询引擎读取完整可用的数据集,生成固定大小的结果。

相比之下,数据流持续地提供新的记录,数据随着时间到达,因此流查询必须持续的处理到达的数据。所以问题变成了:如何在流上定义一个能够使 SQL 作用在其上的视图

1.2 动态表

动态表概念的提出,用来支持对结果更新的查询,保留流和批输入的统一语义。动态表是一个不断更新的表,可以像常规的表一样进行查询。

但是,对动态表的查询是连续运行的,而且根据输入表的不断更新,查询出来的结果表也是不断更新的,因此结果表也是一个动态表。

因此,我们可以认为:流和动态表其实是同一种东西,只是在不同角度看待问题而产生的不同概念罢了,二者其实是可以相互转换的。我们可以将流转换为动态表,也可以将动态表转换为流。

下图显示了可以在流上处理关系查询的概念模型:

Streaming SQL在贝壳的演进之路

那么我们如何在流上定义一个动态表?

有两种模式:追加和更新。

追加模式下,流中的每条新增记录都会插入到动态表中,不会对动态表中的历史数据做修改。如下图所示:

Streaming SQL在贝壳的演进之路

更新模式 下,流中的每条记录都会根据其唯一的健值或新增,或对历史的数据做更新操作。如下图所示:

Streaming SQL在贝壳的演进之路

1.3 持续查询

由于动态表随时间而变,我们必须定义查询动态表的意义。

假设我们有一个动态表 A,那么我们可以定义:

它在时间 t 时刻的快照为 A[t],这时就可以使用任意的 SQL 来对其进行查询,生成一个结果表。

定义查询的操作为 q,将查询作用到 A[t] 上,我们表示为 q(A[t]),生成的结果表为 R[t]。

于是我们有:

复制代码
R[t] = q(A[t])

在动态表上的持续查询,表示为在 t 的每个时间点,都会执行一个 t 时刻在动态表 A 上的批量查询,而所生成的结果表也同样是一个持续更新的动态表。它可能会不断更新先前的行,也可能添加新行。

好了,理论部分说完了,那么我们如何将 SQL on Streaming 有效的落地呢?

在当时,平台上的所有任务都还是在使用 spark 技术栈,而且当时 Spark 新推出了 Structured Streaming 模块。因此,我们自然而然的将该模块作为了我们将 SQL on Streaming 落地的第一阶段的解决方案。

2. SQL 1.0

我们当时调研的版本是 2.3.1,Structured Streaming 刚刚推出不久,其虽然支持一些比较复杂的窗口聚合操作,但是限制也比较多。其仅在 api 层面上提供较为完善的语义,在 SQL 层面并没有提供较好的支持。

当时就我们平台上的任务来说,也没有在流维关联、窗口聚合这方面有比较强烈的需求,我们最初对于产品的定位也仅仅是做一些简单的 ETL 工作。

于是,最初的设计如下:

a. 仅支持简单的 select * from xxx where conditions…和 union 语句
b. 在一个作业里可以以临时视图的形式定义多个逻辑节点,最终视图之间的依赖关系构成了整个作业的 DAG
c. 针对上下游数据提供 formatter,对上下游的数据提供格式化输入输出的能力。
d. 对不同的外部存储,提供多种适配的组件来支持多种第三方存储的写能力

2.1 产品展示

下图是我们使用 SQL 1.0 配置出的一个对业务后端日志做简单 ETL 的任务。可以看到这里面有五个节点:

上游 source_kafka_1 是读取外部 kafka 中的后端日志;

中间有三个节点,是处理过程;

最后绿色的 sink_kafka_1 是将处理后的数据输出到下游 kafka 中的 topic。

Streaming SQL在贝壳的演进之路

下图是对输入端 kafka 的配置:

Streaming SQL在贝壳的演进之路

里面包含了要连接的 kafka 集群名称,订阅的 topic,读取数据的格式、字段和类型。对于 kafka 中的数据格式,我们提供了 4 种定义 schema 和解析的方式:

a. 默认:所有数据按一个单独的列提供,不做解析,列名可自定义
b. json:对 json 做深度解析,以 jsonpath 的方式定义各列的访问路径。
c. 正则:以正则表达式的形式提取各列数据,匹配出正则中第一组括号的匹配项,此种方式一般用作数据格式不规范的场景
d. 分隔符:可指定自定义分隔符,以索引的方式定义各列数据的访问方式。

此外,在定义中间若干处理节点时,除要配置 SQL 语句外,还要指定上游依赖的节点:

Streaming SQL在贝壳的演进之路

最后,在配置 sink 组件时,我们要指定上游的数据依赖,输出到下游的 kafka 集群名称,topic,和输出数据格式,包括想要的字段名称,字段格式,提供了将 string 转换成 json object/array 的能力。

Streaming SQL在贝壳的演进之路

2.2 产品设计

下面是设计的组件类图:

Streaming SQL在贝壳的演进之路

  • Plugin 是所有 Source,Operator,Sink 的父类
  • UpStream 是对所有上游组件的封装,封装了本节点生成的视图名称,所有连接的下游节点,和获取本节点的 DataFrame 的方法,SqlSource 和 SqlOperator 都可以作为上游组件
  • DownStream 是对所有下游组件的封装,维护了所有的上游依赖,SqlOperator 和 SqlSink 都可以作为下游组件。
  • SqlSource 是对输入数据源头的封装,提供了构建 Formatter 的能力
  • SqlOperator 是对中间处理节点的封装,目前仅支持简单 Filter 和 Union
  • SqlSink 封装了将上游数据输出到外部的能力,目前支持的 Sink 类型包括 kafka,console,druid

目前平台上运行着的 SQL 1.0 的实时任务有 60 个,而且数量还在增加。

2.3 问题

2.3.1 checkpoint

我们知道,在定义 Structured Streaming 任务的时候必须指定 checkpoint 路径,而且 Structured Streaming 底层在做 checkpoint 的时候是同步的,也就是说会阻塞整体的处理进度。

当凌晨有大量离线任务运行导致 hdfs 繁忙的时候,会导致 checkpoint 阻塞,从而导致数据的积压和延迟,这对下游一些对实时性要求较高的系统来说是不可接受的。

2.3.2 offset

Structured Streaming 的消费进度是保存在 checkpoint 里的,而且每次任务重启都会直接从 checkpoint 里读取 offset,并且会在 driver 端屏蔽掉自动提交 offset 功能。

因此会有以下几个问题:

a. 消费进度和消费组无关,无法从消费组上共享消费进度。
b. 无法从外部监控到消费进度,积压等情况,不利于任务的监控报警。
c. 重置 offset 流程复杂。

3. SQL 2.0

首先,我们对 SQL 2.0 的产品定位为提供一种以纯 SQL 的方式来定义流式作业。那么围绕这一宗旨,SQL 2.0 应该具备哪些能力呢?

a. 定义维表,且能实时维表关联
b. 支持定义逻辑视图
c. 支持自定义 UDF
d. 对时间窗口的支持,包括各种分组聚合

在调研了多款开源产品之后,我们选择了 Flink 这款目前最火热的开源流处理框架。它对 SQL 的支持是目前所有实时处理框架中最好的。

所以,我们最终决定在 Flink-SQL 上做扩展,来实现我们的功能。

Streaming SQL在贝壳的演进之路

3.1 DDL

考虑到在引擎应该只提供能力,而和管理着元数据系统打通应该属于应用层的能力,因此我们初步设计是:希望能用一种 DDL 的方式来打通应用层和引擎层,同时用来定义外部数据源,而 Flink SQL 中暂还并未提供这种能力。

因此,我们基于 Antlr4 工具,对标准 SQL 中的 DDL 语法做了一些扩展,使其不但能够描述外部数据的格式,而且能够提供访问外部数据的一些连接信息,以及提取特定字段的能力。

示例建表语句如下:

复制代码
-- 定义数据源表
create source table kafka_source_tb (
system_type string extraction '$.system_type',
client_os_type string extraction '$.client_os_type',
ucid string extraction '$.ucid',
ts bigint extraction '$.timestamp',
watermark key ts delay for 5s
) with (
type = 'kafka',
dataType = 'json',
brokers = 'off03-bigdata.mars.ljnode.com:9092',
topics = 'data-pipeline-common-dev',
groupId = 'test-group'
);
-- 定义维表
create dim table redis_dim_tb (
ucid string,
first_login_time string,
device_id string,
primary key ucid
) with (
type = 'redis',
server = '127.0.0.1',
port = '6379',
cache = 'all'
);
-- 定义输出表
create sink table console_sink_tb (
ucid string,
first_login_time string,
device_id string,
client_os_type string,
system_type string,
ts bigint
) with (
type = 'console',
dataType = 'json'
);

其中:

  • with 语句块中是对外部数据源的连接描述信息
  • 数据源表中的 watermark key xxxdelayforperiod 为定义水印的语句
  • 维表中的 primary key 用来定义主键,主要是为了在维表关联时查找外部数据

3.2 维表关联

到目前为止,我们已经在 SQL 2.0 中提供了 1.0 中的全部能力,理论上已经能够提供给用户做简单的 ETL 使用了。

但是作为一个有理想的青年,我们并不满足于此。而且,为了后续提供构建实时数仓和实时指标的能力,简单的 ETL 是不够的,我们还要和很多的外部数据做关联。

为了满足这方面的需求,我们基于 calcite 的 SQL 解析功能和阿里为社区贡献的 Async I/O 功能,实现了维表关联的能力。

下面对这部分功能的设计做详细介绍。

3.2.1 设计

首先,思路就是先解析 SQL,如果在 join 语句后面出现了用户定义的维表,就触发维表关联的 SQL 改写逻辑,改写逻辑如下:

a. 抽离嵌套子查询,独立注册成临时表
b. 合并流表关联维表节点,提取关联条件
c. 对流表的 DataStream 做转换,使用 Async I/O,访问外部存储做数据关联
d. 替换原 SQL 语句中所有被流表和维表原表名限定的字段的所属表名,生成新的有效的 SQL

这样理解起来可能有些困难,我们在上面三个 DDL 的基础上举个例子,使用如下 DML 来定义一个作业:

复制代码
insert into console_sink_tb ({1}
ucid, first_login_time, device_id,
client_os_type, system_type)
select
ucid, first_login_time, device_id,
client_os_type, system_type
from kafka_source_tb a
join redis_dim_tb b
on a.ucid = b.ucid

第一步:由于本例中不存在嵌套查询,第一步可以跳过。

第二步:合并流维关联节点,提取关联条件。

上述 SQL 经过 SQL 解析后,会生成如下语法树:

Streaming SQL在贝壳的演进之路

该语法树经过转换后,变为:

Streaming SQL在贝壳的演进之路

翻译成 SQL 后就是:

复制代码
select
a_J_b.ucid,
a_J_b.first_login_time,
a_J_b.device_id,
a_J_b.client_os_type,
a_J_b.system_type
from a_J_b

第三步:对流表的 DataStream 做转换,使用 Async I/O,访问外部存储做数据关联。

根据我们前面讲的概念,流和动态表是可以相互转换的,因此,我们可以先将流表转换成 DataStream,然后根据维表定义中的连接信息访问外部数据源,再根据提取出的关联条件,获取和过滤出我们需要的数据。实际上是通过 RichAsyncFunction 提供的异步并行执行的能力,能够同时请求多条数据,提高效率。

将关联后的数据流注册成中间表,表名即流维表节点合并的名字(a_J_b),这样就可以将转换后的 SQL 语句作用到该表上了。

第四步:需要对原 SQL 语句的 where 子句或 group by 子句以 a.client_os_type 的方式中引用到字段的所属表名做替换,将 a 替换成 a_J_b。

因为当我们将流表关联维表合并为一个节点后,原来的 a 已经变成了一个不可识别的标识符了。

3.2.2 缓存优化

在生产环境中,绝大多数的流都是很快的,如果每条数据都要访问一次外部存储,那么,除了整体的性能会差以外,对外部存储也会造成很大压力,甚至会把外部系统压垮。

另一方面,考虑到其实很多维表变更并不频繁,而且数量也不会很大,那么,我们完全可以将维表的数据缓存在内存中,设定好过期策略做到同步更新。

对于开启了缓存的维表,内存中的缓存在任务刚启动时是空的,这会有一个预热的阶段。另外可对缓存设定两种过期策略,一种是缓存大小,一种是过期时间。

缓存大小策略是根据 LRU 算法进行数据淘汰,过期时间是根据最后读取数据的时间,当数据被缓存淘汰,程序会重新查询外存,并更新到缓存中,以此来实现缓存中数据的更新。

另一方面当数据量比较大时,单节点不足以将绝大多数维度数据缓存,可以预先根据与参与关联的维表主键对应的流表字段做预分区。

即根据某一个字段,保证该字段下同值的记录总被分配到同一个下游节点上,这样每个下游节点只缓存本节点能用到的数据,且能保证该部分的值域仅占总量的很小一部分。

3.2.3 关联条件支持表达式

可以看到上述方案虽然解决了流表和维表关联的问题,但是是有很多限制的。

比如说我们拿 hbase 作为维表来举例,就要求关联条件中必须包含 hbase 的 rowkey,而且 rowkey 必须作为关联条件的一部分,其值是必须能够直接从流表中取到的,也就是要求关联条件中只能是 EQUAL 类型的表达式,而且等号两边必须只能是对列的简单引用。

但是很多时候,hbase 的 rowkey 可能并不是一个单一含义的值,它也可能是一个业务逻辑上的联合主键,需要将多列拼接起来才能构成一个完整的 rowkey,这个时候关联条件的限制就成为了一个使用上的痛点。

用户当然可以通过定义临时视图的形式绕过在这个限制,但是这样又增加了用户的使用成本,所以我们也集中精力解决了这个问题。

我们看下面这个例子:

复制代码
select a.col1, a.col2, b.coll
from a
join b
on a.col1 + a.col3 = b.col

其实思路很简单,我们在关联维表前,需要将流表 a 转换成 DataStream,其实就是在转换的过程中添加一个环节。

a 中仅包含 col1,col2,col3 字段,但是关联条件中需要 col1 + col3,所以我们将表 a 转换成临时视图 __function_tmp_view__0,它表达的逻辑如下 SQL:

复制代码
select
a.col1,
a.col2,
a.col3,
a.col1 + a.col3 AS __function_tmp_field__0
from a

这样关联条件就变成了:

__function_tmp_field__0 = b.col

相当于我们将表达式的值的计算过程移动到了我们自动添加的临时视图中,将计算过程提前了。这样我们在关联的时候就能够直接获取关联条件中所需要的值了。

3.3 维表的接入

实际上,业务方在有维表关联的需求时,很多都是希望能够直接关联业务库。

但是受限于集群访问业务库的安全和稳定性问题,我提供了一种额外的方式能够将用户的业务库数据通过 binlog 的方式来实时同步到 hbase 和 redis 中,而 SQL 引擎只需要去和 HBase 或 Redis 中的数据做关联就可以了。

目前我们已经支持了 HBase,Redis,Http,Phoenix 等方式的维表关联。

3.4 新的问题

以上就是我们在构建 Streaming SQL 2.0 的过程中遇到的一些问题和解决办法。

在 SQL 1.0 中存在的 checkpoint 和 offset 问题在 2.0 中被框架自身所消化,但也随之给我们带来了新的挑战:

  • Flink 内部的状态管理如何优雅的配置和管理起来
  • Flink 的 checkpoint/savepoint 功能如何利用和管理

这些问题都是需要我们去解决的。

4. 平台化建设

SQL 2.0 在 2019 年 8 月份在我们平台上上线,目前已经有 200 多个任务正在运行,覆盖了实时数仓,实时交易,商业化,租赁等业务部门;涉及了 ETL,维表关联,数据落地 clickhouse 等场景,而且任务数量增长很快。

4.1 产品展示

Streaming SQL在贝壳的演进之路

实时计算平台目前已经集成了数据源管理功能,用于管理实时领域的结构化数据元信息,用户可以预先配置和共享数据源,数据源可以自动拉取样例数据,生成 schema 信息。

在 SQL 2.0 任务配置过程中,用户可以选择已经存在的数据源,后端会自动生成自定义的 DDL。这样用户只需编写 DML 定义处理逻辑就可以了,无需编写复杂的 DDL,提升了用户的开发效率。最右侧是对任务中涉及的数据源的管理功能。

另外,我们也提供了 SQL 的语法校验功能,使用 antlr4 自定义语法文件解析和校验 DDL 语句,使用 calcite 解析和校验 DML 语句,能够做到在线验证 SQL 语法。

此外,我们也提供了查看执行计划,和在线 Debug 等功能,能够大大提升用户的开发效率和 debug 效率。

Streaming SQL在贝壳的演进之路

Streaming SQL在贝壳的演进之路

4.2 任务运维

目前贝壳的所有实时任务是统一托管在 Hermes 实时计算平台上的,而 SQL 2.0 在平台上只是一个特殊的场景任务。

整体的任务运维和监控报警由平台统一管控,能提供任务指标上报和监控,任务心跳监控,任务失败自动拉起等功能。

Streaming SQL在贝壳的演进之路

5. 挑战 && 规划

Streaming SQL 在实时计算领域有着广泛的应用场景,在数据分析,数据清洗,实时数仓和实时指标等方面都提供了非常重要的技术支撑。Streaming SQL 为下游提供了高效的开发手段和可靠、低延迟的实时处理能力。

在实践的过程中,我们经历了在 spark-structure-streaming 上构建的 SQL 1.0 版本,和之后基于 Flink-SQL 之上构建的 SQL 2.0 版本。这其中遇到过一些问题,包括 DDL 的设计和实现,维表关联,平台化和任务监控等。

最后,是我们对未来工作的一些思考和规划:

  • 数据血缘管理,可以直观清晰的帮助我们了解实时数据和任务之间的依赖关系。
  • 数据流量监控,采集和监控数据流量信息,做好数据量增长的预警工作,同时数据流量的采集数据也能为我们对资源的使用情况,以及任务的资源评估做一个依据。
  • 资源自适应,可以根据上游流量信息动态估算一个任务大概需要多少资源
  • 大状态优化和管理,对于 flink 任务中可能出现的大任务状态的管理和查询功能
  • 实时数仓,基于 Flink-SQL 构建的实时数仓及其相关生态的工具链

后续我们将为大家汇报贝壳 Streaming SQL 的新进展和新成果。敬请期待~

本文转载自公众号贝壳产品技术(ID:beikeTC)。

原文链接

https://mp.weixin.qq.com/s?__biz=MzIyMTg0OTExOQ==&mid=2247485612&idx=2&sn=f779495ca9ffc4b72743eecccb006089&chksm=e8373bdcdf40b2ca6c5545f837994fec8cb640e2802e5cd3292df6f8df03cd70af3ca22076c7&scene=27#wechat_redirect

阅读数:3 发布于:2020 年 6 月 23 日 10:05

评论

发布
暂无评论