【ArchSummit】如何通过AIOps推动可量化的业务价值增长和效率提升?>>> 了解详情
写点什么

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

  • 2021-12-06
  • 本文字数:5645 字

    阅读完需:约 19 分钟

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

作者 | 袁小栋、程君杰

审核校对 | 杜恒、岁月、白玙、不周

 

随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。


RocketMQ Streams 适合大数据量->高过滤->轻窗口计算的场景,核心打造轻资源,高性能优势,在资源敏感场景中有很大优势,最低 1core,1g 可部署,建议的应用场景(安全,风控,边缘计算,消息队列流计算)。


RocketMQ Streams 兼容 Blink(Flink 的阿里内部版本) 的 SQL,UDF/UDTF/UDAF,多数 Blink 任务可以直接迁移成 RocketMQ Streams 任务。将来还会发布和 Flink 的融合版本,RocketMQ Streams 可以直接发布成 Flink 任务,既可以享有 RocketMQ Streams 带来的高性能,轻资源,还可以和现有的 Flink 任务统一运维和管理。


本篇文章主要从五个方面来介绍 RocketMQ Streams 实时计算平台:

首先简单先介绍一下什么是 RocketMQ Streams;

第二部分,基于 RocketMQ Streams 的 SDK,来了解下它是怎么去使用的;

第三部分,RocketMQ Streams 整体的架构以及它的原理实现;

第四部分,在云安全的场景下该怎么使用 RocketMQ Streams;

第五部分,RocketMQ Streams 的未来规划。

一、什么是 RocketMQ Streams?


本章节从基础简介、设计思路和特点三方面对 RocketMQ streams 进行整体介绍。

1.1 RocketMQ Streams 简介

1)首先,它是一个 Lib 包,启动即运行,和业务直接集成;

2)然后,它具备 SQL 引擎能力,兼容 Blink SQL 语法,兼容 Blink UDF/UDTF/UDAF;

3)其次,它包含 ETL 引擎,可以无编码实现数据的 ETL,过滤和转存;

4)最后,它基于数据开发 SDK,大量实用组件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的场景。



1.2 RocketMQ Streams 的特点


RocketMQ streams 基于上述的实现思路,可以看到它有以下几个特点:


1.2.1 轻量


1 核 1g 就可以部署,依赖较轻,在测试场景下用 Jar 包直接写个 main 方法就可以运行,在正式环境下最多依赖消息队列和存储(其中存储是可选的,主要是为了分片切换时的容错)。

1.2.2 高性能


实现高过滤优化器,包括前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹等,比优化前性能提升 3-5 倍,资源节省 50%以上。

1.2.3 维表 JOIN(千万数据量维表支持)


设计高压缩内存存储数据,无 java 头部和对齐的开销,存储接近原始数据大小,纯内存操作,性能最大化,同时对于 Mysql 提供了多线程并发加载,提高加载维表的速度。

1.2.4 高扩展的能力

  • Source 可按需扩展,已实现:RocketMQ,File,Kafka;

  • Sink 可按需扩展,已实现:RocketMQ,File,Kafka,Mysql,ES;

  • 可按 Blink 规范扩展 UDF/UDTF/UDAF;

  • 提供了更轻的 UDF/UDTF 扩展能力,不需要任何依赖就可以完成函数的扩展。

1.2.5 提供了丰富的大数据的能力

包括精确计算一次灵活的窗口,双流 join,统计,开窗,各种转换过滤,满足大数据开发的各种场景,支持弹性容错的能力。

 

二、RocketMQ Streams 的使用


RocketMQ Streams 对外提供两种 SDK,一种是 DSL SDK,一种是 SQL SDK,用户可以按需选择; DSL SDK 支持实时场景 DSL 语义; SQL SDK 兼容 Blink(Flink 的阿里内部版本) SQL 的语法,多数 Blink SQL 可以通过 RocketMQ Streams 运行;


接下来,我们详细的介绍一下这两种 SDK。

2.1 环境要求

  • JDK1.8 版本以上;

  • Maven 3.2 版本以上。

2.2 DSL SDK

利用 DSL SDK 开发实时任务时,需要做如下的一些准备工作:

2.2.1 依赖准备

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-streams-clients</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


准备工作完成后,就可以直接开发自己的实时程序。

2.2.2 代码开发

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false) .map(message->message + "--") .toPrint(1) .start();
复制代码


其中:

1)Namespace 是业务隔离的,相同的业务可以写成相同的 Namespace。相同的 Namespace 在任务调度里可以跑在进程里,也可以共享一些配置;

2)pipelineName 可以理解成就是 job name ,唯一区分 job;

3)DataStreamSource 主要是创建 Source,然后这个程序运行起来,最终的结果就是在原始的消息里面会加"--",然后把它打印出来。

2.2.3 丰富的算子

RocketMQ streams 提供了丰富的算子, 包括:

  • source 算子:包括 fromFile, fromRocketMQ, fromKafka 以及可以自定义 source 来源的 from 算子;

  • sink 算子: 包括 toFile, toRocketMQ, toKafka,toDB,toPrint, toES 以及可以自定义 sink 的 to 算子;

  • action 算子:包括 Filter,Expression,Script,selectFields,Union,forEach,Split,Select,Join,Window 等多个算子。

2.2.4 部署执行

基于 DSL SDK 完成开发,通过下面命令打成 jar 包,执行 jar,或直接执行任务的 main 方法。


mvn -Prelease-all -DskipTests clean install -Ujava -jar jarName mainClass &
复制代码


2.3 SQL SDK

2.3.1 依赖准备

 

  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>rsqldb-clients</artifactId>      <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


2.3.2 代码开发

 

首先开发业务逻辑代码, 可以保存为文件也可以直接使用文本;


CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` ( `scan_time` VARCHAR, `file_name` VARCHAR, `cmdline` VARCHAR,) WITH ( type='file', filePath='/tmp/file.txt', isJsonData='true', msgIsJsonArray='false');

-- 数据标准化
create view data_filter asselect *from ( select scan_time as logtime , lower(cmdline) as lower_cmdline , file_name as proc_name from table_name)xwhere ( lower(proc_name) like '%.xxxxxx' or lower_cmdline like 'xxxxx%' or lower_cmdline like 'xxxxxxx%' or lower_cmdline like 'xxxx' or lower_cmdline like 'xxxxxx' );
CREATE TABLE `output` ( `logtime` VARCHAR , `lower_cmdline` VARCHAR , `proc_name` VARCHAR) WITH ( type = 'print');
insert into outputselect *from aegis_log_proc_format_raw;
复制代码


其中

  • CREATE FUNCTION:引入外部的函数来支持业务逻辑, 包括 flink 以及系统函数;

  • CREATE Table:创建 source/sink;

  • CREATE VIEW:执行字段转化,拆分,过滤;

  • INSERT INTO:数据写入 sink;

  • 函数:内置函数,udf 函数。

2.3.3 SQL 扩展

RocketMQ streams 支持三种 SQL 扩展能力,具体实现细节请看:https://github.com/alibaba/rsqldb

1)通过 Blink UDF/UDTF/UDAF 扩展 SQL 能力;

2)通过 RocketMQ streams 扩展 SQL 能力,只要实现函数名是 eval 的 java bean 即可;

3)通过现有 java 代码扩展 SQL 能力,create function 函数名就是 java 类的方法名。

2.3.4 SQL 执行

你可以从这里下载最新的 Rocketmq Streams 代码并构建。


cd rsqldb/mvn -Prelease-all -DskipTests clean install -Ucp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录
复制代码


解压 tar.gz 包, 进入目录结构


tar -xvf rocketmq-streams-{版本号}-distribution.tar.gzcd rocketmq-streams-{版本号}
复制代码


其目录结构如下 

  • bin 指令目录,包括启动和停止指令

  • conf 配置目录,包括日志配置以及应用的相关配置文件

  • jobs 存放 sql,可以两级目录存储

  • ext 存放扩展的 UDF/UDTF/UDAF/Source/Sink

  • lib 依赖包目录

  • log 日志目录


2.3.4.1 执行 SQL


#指定sql的路径,启动实时任务bin/start-sql.sh sql_file_path
复制代码


2.3.4.2 执行多个 SQL


如果想批量执行一批 SQL,可以把 SQL 放到 jobs 目录,最多可以有两层,把 sql 放到对应目录中,通过 start 指定子目录或 sql 执行任务。


2.3.4.3 任务停止


# 停止过程不加任何参数,则会将目前所有运行的任务同时停止bin/stop.sh
# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止bin/stop.sh sqlname
复制代码


2.3.4.4 日志查看


目前所有的运行日志都会存储在 log/catalina.out 文件中。


三、架构设计及原理分析

3.1 RocketMQ Streams 设计思路

在了解完 RocketMQ streams 的基本简介,接下来,我们看下 RocketMQ streams 的设计思路,设计思路主要从设计目标和策略两个方面来介绍:

3.1.1 设计目标

  • 依赖少,部署简单,1 核 1g 单实例可部署,可随意扩展规模;

  • 打造场景优势,重点打造大数据量->高过滤->轻窗口计算的场景,功能覆盖度要全,实现需要的大数据特性:Exactly-ONCE、灵活的窗口(滚动、滑动、会话窗口);

  • 要在保持低资源的前提下,对高过滤有性能突破,打造性能优势;

  • 兼容 Blink SQL,UDF/UDTF/UDAF,让非技术人员更容易上手。


3.1.2 策略(适配场景:大数据量>高过滤/ETL>低窗口计算)

  • 采用 shared-nothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展,并发能力取决于分片数;

  • 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错;

  • 利用存储实现状态备份,实现 Exactly-ONCE 的语义。用结构化远程存储实现快速启动,不等本地存储恢复。

  • 重力打造过滤优化器,通过前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹提高过滤性能



3.2 RocketMQ Streams Source 的实现

1)Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。

2)Source 支持分片的自动负载和容错

  • 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作;

  • 当有新分片时,发送新增分片消息,让算子完成分片初始化。

3)数据源通过 start 方法,启动 consuemr 获取消息;

4)原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。



3.3 RocketMQ Streams Sink 的实现

1)Sink 是实时性和吞吐的一个结合;

2)实现一个 sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐;

3)常规的使用方式是写 message->cache->flush->存储的方式,系统会严格保证每次批次写入存储的量不超过 batchsize 的量,如果超过了,会拆分成多批写入;



4)Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐(一个分片一个 cache);

5)可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask;

6)通过调用 flush 方法刷新 cache 到存储;

7)Sink 的 cache 会有内存保护,当 cache 的消息条数>batchSize,会强制刷新,释放内存。


3.4 RocketMQ Streams Exactly-ONCE 实现

1)Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作,消息至少消费一次;

2)每条消息会有消息头部,里面封装了 queueld 和 offset;

2)组件在存储数据时,会把 queueld 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重;

3)内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。



3.5 RocketMQ Streams Window

实现方式:

1)支持滚动、滑动和会话窗口,支持事件时间和自然时间(消息进入算子的时间);

2)支持 Emit 语法,可以在触发前或触发后,每隔 n 段时间,更新一次数据;比如 1 小时窗口,窗口触发前希望每分钟看到最新结果,窗口触发后希望不丢失迟到一天内的数据,且每 10 分钟更新数据。

3)支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时,有丢失窗数据的风险;

4)快速启动,无需等待本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算;

5)利用消息队列负载均衡,实现扩容缩容容,每个 queue 是一份组,一个分组同一刻只被一台机器消费;

6)正常计算依赖本地存储,具备 flink 相似的计算性能。



四、RocketMQ Streams 在安全场景的最佳实践

4.1 背景

从公共云转战专有云,遇到了新的问题。因为专有云像大数据这种 saas 服务是非必须输出的,且最小输出规模也比较大,用户成本会增加很多,难落地,导致安全能力无法快速同步到专有云。



4.2 解决办法

RocketMQ Streams 在云安全的应用-流计算

  • 基于安全场景打造轻量级计算引擎,基于安全高过滤的场景特点,可以针对高过滤场景优化,然后再做较重的统计、窗口、join 操作,因为过滤率比较高,可以用更轻的方案实现统计和 join 操作;

  • SQL 和引擎都可热升级



业务结果

1)规则覆盖:自建引擎,覆盖 100%规则(正则,join,统计);

2)轻资源,内存是公共云引擎的 1/24,cpu 是 1/6,依赖过滤优化器,资源不随规则线性增加,新增规则无资源压力,通过高压缩表,支持千万情报;

3)SQL 发布,通过 c/s 部署模式,SQL 引擎热发布,尤其护网场景,可快速上线规则;

4)性能优化,对核心组件进行专题性能优化,保持高性能,每实例(2g,4 核,41 规则)5000qps 以上。

 

五、RocketMQ Streams 的未来规划

5.1 打造 RocketMQ 一体化计算能力

1)和 RocketMQ 整合,去除 DB 依赖,融合 RocketMQ KV;

2)和 RocketMQ 混部,支持本地计算,利用本地特点,打造高性能;

3)打造边缘计算最佳实践

5.2 Connector 增强

1)支持 pull 消费方式,checkpoint 异步刷新;

2)兼容 blink/flink connector。

5.3 ETL 能力建设

1)增加文件,syslog 的数据接入能力

2)兼容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳实践

5.4 稳定性和易用性打造

1)Window 多场景测试,提升稳定性,性能优化;

2)补充测试用例,文档,应用场景。


六、开源地址

RocketMQ-Streams: https://github.com/apache/rocketmq-streams

RocketMQ-Streams-SQL:https://github.com/alibaba/rsqldb


以上是本次对 RocketMQ stream 的整体介绍,希望对大家有所帮助和启发。

 

2021-12-06 12:155559

评论 1 条评论

发布
用户头像
双流join怎么用的,源码中的例子跑不起来啊?
2021-12-21 15:51
回复
没有更多了
发现更多内容

如何实现多存储文件传输,镭速提供多存储文件传输解决方案

镭速

软件测试/测试开发丨接口测试APIObject 模式、原则与应用

测试人

软件测试 自动化测试 接口测试 测试开发

为什么要使用CDN?CDN有什么优势?

海拥(haiyong.site)

三周年连更

数据可视化、数据分析常用的图表都有哪些?(一)

百度开发者中心

数据可视化 #百度智能云# 数据分析可视化

flutter系列之:如何自定义动画路由

程序那些事

flutter 架构 大前端 程序那些事

GPU 加速药物研发与基因组学分析

百度开发者中心

GPU服务器

软件测试/测试开发丨接口测试通用 API 封装实战

测试人

软件测试 自动化测试 接口测试 测试开发

数据可视化、数据分析常用的图表都有哪些?(二)

百度开发者中心

数据可视化 #百度智能云# 数据分析可视化

测试用例该如何编写?

测吧(北京)科技有限公司

测试

devops如何使用chatgpt提高工作效率

wisonzhu

DevOps

建设司库管理体系,数智化转型打破数据壁垒

智达方通

全球司库 司库体系建设 司库管理体系 智达方通

数据可视化、数据分析常用的表格组件都有哪些?(三)

百度开发者中心

数据可视化 百度智能云 数据分析工具

众说纷纭,低代码发展到底动了谁的“奶酪”?

这我可不懂

低代码 JNPF

3DCAT实时云渲染助力广府庙会元宇宙焕新亮相,开启线上奇趣之旅!

3DCAT实时渲染

元宇宙 实时渲染云 3D实时云渲染

OMG!这个Ins快拍保存到相册的办法绝了!还在犹豫什么,都给我冲!

frank

Instagram

selenium源码通读·7 |webdriver/common/by.py-By类分析

测试 自动化测试 测试框架 源码剖析 selenium

数据生产压力突增23倍,平台“可观测性”如何帮这家制造集团排忧解难? | 奇点云技术分享

奇点云

数据中台 可观测性 制造业 奇点云

CloudQuery 社区版回归直播即将开启

BinTools图尔兹

直播 社区版

软件测试/测试开发丨接口测试配置的数据驱动

测试人

软件测试 自动化测试 测试开发

浪潮海岳低代码平台inBuilder开源社区版正式发布

inBuilder低代码平台

开源 低代码平台

国家工信安全中心权威认证!

百度开发者中心

工业互联网 百度飞桨 文心一言

玩转AIGC,5分钟 Serverless 部署 Stable Diffustion 服务

Serverless Devs

Serverless AIGC Stable Diffustion

深入了解 WebAssembly —— 一种新的 Web 可执行文件格式

NGINX开源社区

nginx webassembly

火爆的低代码开发具有哪些技术特点?

力软低代码开发平台

ChatGPT:改变未来沟通方式的人工智能语言模型

wisonzhu

如何使用 Linux find 命令查找文件?

wljslmz

三周年连更

TitanIDE 新版本来袭,全新“效能看板”上线

行云创新

ide

软件测试/测试开发丨接口测试数据的数据驱动

测试人

软件测试 自动化测试 接口测试 数据驱动 测试开发

selenium源码通读·8 |webdriver/common/keys.py-Keys类分析

Python 自动化测试 测试框架 源码剖析 selenium

CSA GCR大会正式发布全球首个云渗透测试认证专家课程,腾讯安全获评“特别贡献单位”

腾讯安全云鼎实验室

云安全

TikTok视频怎么无水印保存到相册?这有啥难的,跟我学轻松变大神~

frank

TikTok

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统_大数据_袁小栋_InfoQ精选文章