【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

  • 2021-12-06
  • 本文字数:5645 字

    阅读完需:约 19 分钟

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

作者 | 袁小栋、程君杰

审核校对 | 杜恒、岁月、白玙、不周

 

随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。


RocketMQ Streams 适合大数据量->高过滤->轻窗口计算的场景,核心打造轻资源,高性能优势,在资源敏感场景中有很大优势,最低 1core,1g 可部署,建议的应用场景(安全,风控,边缘计算,消息队列流计算)。


RocketMQ Streams 兼容 Blink(Flink 的阿里内部版本) 的 SQL,UDF/UDTF/UDAF,多数 Blink 任务可以直接迁移成 RocketMQ Streams 任务。将来还会发布和 Flink 的融合版本,RocketMQ Streams 可以直接发布成 Flink 任务,既可以享有 RocketMQ Streams 带来的高性能,轻资源,还可以和现有的 Flink 任务统一运维和管理。


本篇文章主要从五个方面来介绍 RocketMQ Streams 实时计算平台:

首先简单先介绍一下什么是 RocketMQ Streams;

第二部分,基于 RocketMQ Streams 的 SDK,来了解下它是怎么去使用的;

第三部分,RocketMQ Streams 整体的架构以及它的原理实现;

第四部分,在云安全的场景下该怎么使用 RocketMQ Streams;

第五部分,RocketMQ Streams 的未来规划。

一、什么是 RocketMQ Streams?


本章节从基础简介、设计思路和特点三方面对 RocketMQ streams 进行整体介绍。

1.1 RocketMQ Streams 简介

1)首先,它是一个 Lib 包,启动即运行,和业务直接集成;

2)然后,它具备 SQL 引擎能力,兼容 Blink SQL 语法,兼容 Blink UDF/UDTF/UDAF;

3)其次,它包含 ETL 引擎,可以无编码实现数据的 ETL,过滤和转存;

4)最后,它基于数据开发 SDK,大量实用组件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的场景。



1.2 RocketMQ Streams 的特点


RocketMQ streams 基于上述的实现思路,可以看到它有以下几个特点:


1.2.1 轻量


1 核 1g 就可以部署,依赖较轻,在测试场景下用 Jar 包直接写个 main 方法就可以运行,在正式环境下最多依赖消息队列和存储(其中存储是可选的,主要是为了分片切换时的容错)。

1.2.2 高性能


实现高过滤优化器,包括前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹等,比优化前性能提升 3-5 倍,资源节省 50%以上。

1.2.3 维表 JOIN(千万数据量维表支持)


设计高压缩内存存储数据,无 java 头部和对齐的开销,存储接近原始数据大小,纯内存操作,性能最大化,同时对于 Mysql 提供了多线程并发加载,提高加载维表的速度。

1.2.4 高扩展的能力

  • Source 可按需扩展,已实现:RocketMQ,File,Kafka;

  • Sink 可按需扩展,已实现:RocketMQ,File,Kafka,Mysql,ES;

  • 可按 Blink 规范扩展 UDF/UDTF/UDAF;

  • 提供了更轻的 UDF/UDTF 扩展能力,不需要任何依赖就可以完成函数的扩展。

1.2.5 提供了丰富的大数据的能力

包括精确计算一次灵活的窗口,双流 join,统计,开窗,各种转换过滤,满足大数据开发的各种场景,支持弹性容错的能力。

 

二、RocketMQ Streams 的使用


RocketMQ Streams 对外提供两种 SDK,一种是 DSL SDK,一种是 SQL SDK,用户可以按需选择; DSL SDK 支持实时场景 DSL 语义; SQL SDK 兼容 Blink(Flink 的阿里内部版本) SQL 的语法,多数 Blink SQL 可以通过 RocketMQ Streams 运行;


接下来,我们详细的介绍一下这两种 SDK。

2.1 环境要求

  • JDK1.8 版本以上;

  • Maven 3.2 版本以上。

2.2 DSL SDK

利用 DSL SDK 开发实时任务时,需要做如下的一些准备工作:

2.2.1 依赖准备

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-streams-clients</artifactId>    <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


准备工作完成后,就可以直接开发自己的实时程序。

2.2.2 代码开发

DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false) .map(message->message + "--") .toPrint(1) .start();
复制代码


其中:

1)Namespace 是业务隔离的,相同的业务可以写成相同的 Namespace。相同的 Namespace 在任务调度里可以跑在进程里,也可以共享一些配置;

2)pipelineName 可以理解成就是 job name ,唯一区分 job;

3)DataStreamSource 主要是创建 Source,然后这个程序运行起来,最终的结果就是在原始的消息里面会加"--",然后把它打印出来。

2.2.3 丰富的算子

RocketMQ streams 提供了丰富的算子, 包括:

  • source 算子:包括 fromFile, fromRocketMQ, fromKafka 以及可以自定义 source 来源的 from 算子;

  • sink 算子: 包括 toFile, toRocketMQ, toKafka,toDB,toPrint, toES 以及可以自定义 sink 的 to 算子;

  • action 算子:包括 Filter,Expression,Script,selectFields,Union,forEach,Split,Select,Join,Window 等多个算子。

2.2.4 部署执行

基于 DSL SDK 完成开发,通过下面命令打成 jar 包,执行 jar,或直接执行任务的 main 方法。


mvn -Prelease-all -DskipTests clean install -Ujava -jar jarName mainClass &
复制代码


2.3 SQL SDK

2.3.1 依赖准备

 

  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>rsqldb-clients</artifactId>      <version>1.0.0-SNAPSHOT</version></dependency>
复制代码


2.3.2 代码开发

 

首先开发业务逻辑代码, 可以保存为文件也可以直接使用文本;


CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` ( `scan_time` VARCHAR, `file_name` VARCHAR, `cmdline` VARCHAR,) WITH ( type='file', filePath='/tmp/file.txt', isJsonData='true', msgIsJsonArray='false');

-- 数据标准化
create view data_filter asselect *from ( select scan_time as logtime , lower(cmdline) as lower_cmdline , file_name as proc_name from table_name)xwhere ( lower(proc_name) like '%.xxxxxx' or lower_cmdline like 'xxxxx%' or lower_cmdline like 'xxxxxxx%' or lower_cmdline like 'xxxx' or lower_cmdline like 'xxxxxx' );
CREATE TABLE `output` ( `logtime` VARCHAR , `lower_cmdline` VARCHAR , `proc_name` VARCHAR) WITH ( type = 'print');
insert into outputselect *from aegis_log_proc_format_raw;
复制代码


其中

  • CREATE FUNCTION:引入外部的函数来支持业务逻辑, 包括 flink 以及系统函数;

  • CREATE Table:创建 source/sink;

  • CREATE VIEW:执行字段转化,拆分,过滤;

  • INSERT INTO:数据写入 sink;

  • 函数:内置函数,udf 函数。

2.3.3 SQL 扩展

RocketMQ streams 支持三种 SQL 扩展能力,具体实现细节请看:https://github.com/alibaba/rsqldb

1)通过 Blink UDF/UDTF/UDAF 扩展 SQL 能力;

2)通过 RocketMQ streams 扩展 SQL 能力,只要实现函数名是 eval 的 java bean 即可;

3)通过现有 java 代码扩展 SQL 能力,create function 函数名就是 java 类的方法名。

2.3.4 SQL 执行

你可以从这里下载最新的 Rocketmq Streams 代码并构建。


cd rsqldb/mvn -Prelease-all -DskipTests clean install -Ucp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录
复制代码


解压 tar.gz 包, 进入目录结构


tar -xvf rocketmq-streams-{版本号}-distribution.tar.gzcd rocketmq-streams-{版本号}
复制代码


其目录结构如下 

  • bin 指令目录,包括启动和停止指令

  • conf 配置目录,包括日志配置以及应用的相关配置文件

  • jobs 存放 sql,可以两级目录存储

  • ext 存放扩展的 UDF/UDTF/UDAF/Source/Sink

  • lib 依赖包目录

  • log 日志目录


2.3.4.1 执行 SQL


#指定sql的路径,启动实时任务bin/start-sql.sh sql_file_path
复制代码


2.3.4.2 执行多个 SQL


如果想批量执行一批 SQL,可以把 SQL 放到 jobs 目录,最多可以有两层,把 sql 放到对应目录中,通过 start 指定子目录或 sql 执行任务。


2.3.4.3 任务停止


# 停止过程不加任何参数,则会将目前所有运行的任务同时停止bin/stop.sh
# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止bin/stop.sh sqlname
复制代码


2.3.4.4 日志查看


目前所有的运行日志都会存储在 log/catalina.out 文件中。


三、架构设计及原理分析

3.1 RocketMQ Streams 设计思路

在了解完 RocketMQ streams 的基本简介,接下来,我们看下 RocketMQ streams 的设计思路,设计思路主要从设计目标和策略两个方面来介绍:

3.1.1 设计目标

  • 依赖少,部署简单,1 核 1g 单实例可部署,可随意扩展规模;

  • 打造场景优势,重点打造大数据量->高过滤->轻窗口计算的场景,功能覆盖度要全,实现需要的大数据特性:Exactly-ONCE、灵活的窗口(滚动、滑动、会话窗口);

  • 要在保持低资源的前提下,对高过滤有性能突破,打造性能优势;

  • 兼容 Blink SQL,UDF/UDTF/UDAF,让非技术人员更容易上手。


3.1.2 策略(适配场景:大数据量>高过滤/ETL>低窗口计算)

  • 采用 shared-nothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展,并发能力取决于分片数;

  • 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错;

  • 利用存储实现状态备份,实现 Exactly-ONCE 的语义。用结构化远程存储实现快速启动,不等本地存储恢复。

  • 重力打造过滤优化器,通过前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹提高过滤性能



3.2 RocketMQ Streams Source 的实现

1)Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。

2)Source 支持分片的自动负载和容错

  • 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作;

  • 当有新分片时,发送新增分片消息,让算子完成分片初始化。

3)数据源通过 start 方法,启动 consuemr 获取消息;

4)原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。



3.3 RocketMQ Streams Sink 的实现

1)Sink 是实时性和吞吐的一个结合;

2)实现一个 sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐;

3)常规的使用方式是写 message->cache->flush->存储的方式,系统会严格保证每次批次写入存储的量不超过 batchsize 的量,如果超过了,会拆分成多批写入;



4)Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐(一个分片一个 cache);

5)可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask;

6)通过调用 flush 方法刷新 cache 到存储;

7)Sink 的 cache 会有内存保护,当 cache 的消息条数>batchSize,会强制刷新,释放内存。


3.4 RocketMQ Streams Exactly-ONCE 实现

1)Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作,消息至少消费一次;

2)每条消息会有消息头部,里面封装了 queueld 和 offset;

2)组件在存储数据时,会把 queueld 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重;

3)内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。



3.5 RocketMQ Streams Window

实现方式:

1)支持滚动、滑动和会话窗口,支持事件时间和自然时间(消息进入算子的时间);

2)支持 Emit 语法,可以在触发前或触发后,每隔 n 段时间,更新一次数据;比如 1 小时窗口,窗口触发前希望每分钟看到最新结果,窗口触发后希望不丢失迟到一天内的数据,且每 10 分钟更新数据。

3)支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时,有丢失窗数据的风险;

4)快速启动,无需等待本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算;

5)利用消息队列负载均衡,实现扩容缩容容,每个 queue 是一份组,一个分组同一刻只被一台机器消费;

6)正常计算依赖本地存储,具备 flink 相似的计算性能。



四、RocketMQ Streams 在安全场景的最佳实践

4.1 背景

从公共云转战专有云,遇到了新的问题。因为专有云像大数据这种 saas 服务是非必须输出的,且最小输出规模也比较大,用户成本会增加很多,难落地,导致安全能力无法快速同步到专有云。



4.2 解决办法

RocketMQ Streams 在云安全的应用-流计算

  • 基于安全场景打造轻量级计算引擎,基于安全高过滤的场景特点,可以针对高过滤场景优化,然后再做较重的统计、窗口、join 操作,因为过滤率比较高,可以用更轻的方案实现统计和 join 操作;

  • SQL 和引擎都可热升级



业务结果

1)规则覆盖:自建引擎,覆盖 100%规则(正则,join,统计);

2)轻资源,内存是公共云引擎的 1/24,cpu 是 1/6,依赖过滤优化器,资源不随规则线性增加,新增规则无资源压力,通过高压缩表,支持千万情报;

3)SQL 发布,通过 c/s 部署模式,SQL 引擎热发布,尤其护网场景,可快速上线规则;

4)性能优化,对核心组件进行专题性能优化,保持高性能,每实例(2g,4 核,41 规则)5000qps 以上。

 

五、RocketMQ Streams 的未来规划

5.1 打造 RocketMQ 一体化计算能力

1)和 RocketMQ 整合,去除 DB 依赖,融合 RocketMQ KV;

2)和 RocketMQ 混部,支持本地计算,利用本地特点,打造高性能;

3)打造边缘计算最佳实践

5.2 Connector 增强

1)支持 pull 消费方式,checkpoint 异步刷新;

2)兼容 blink/flink connector。

5.3 ETL 能力建设

1)增加文件,syslog 的数据接入能力

2)兼容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳实践

5.4 稳定性和易用性打造

1)Window 多场景测试,提升稳定性,性能优化;

2)补充测试用例,文档,应用场景。


六、开源地址

RocketMQ-Streams: https://github.com/apache/rocketmq-streams

RocketMQ-Streams-SQL:https://github.com/alibaba/rsqldb


以上是本次对 RocketMQ stream 的整体介绍,希望对大家有所帮助和启发。

 

2021-12-06 12:155565

评论 1 条评论

发布
用户头像
双流join怎么用的,源码中的例子跑不起来啊?
2021-12-21 15:51
回复
没有更多了
发现更多内容

Forsage智能合约系统APP开发|Forsage智能合约软件开发(现成)

系统开发 现成系统

Gradle doesn't run because it can't find tools.jar in JRE

mengxn

kotlin Gradle

高速公路二维码定位报警系统搭建解决方案

t13823115967

高速公路二维码定位报警 智慧公安

360OS张焰:AI视觉在教育中的应用

ZEGO即构

记一次GC频繁且间隔较长解决实战总结

AI乔治

Java 架构 JVM GC

甲方日常 61

句子

工作 随笔杂谈 日常

EXCEL、图片处理常用技巧

jiangling500

Excel 图片处理

有奖讨论|作为程序员,女朋友是怎么吐槽你的?

Simon郎

女朋友 话题讨论

双非本硕四面百度竟意外成功?看完我的面试经历 网友都称:过于优秀

比伯

Java 编程 架构 面试 计算机

深入浅出 Go - sync.Pool 源码分析

helbing

Go 语言

修炼码德系列:简化条件表达式

Silently9527

Java 经验分享 代码重构 代码规范

Spring 源码学习 04:初始化容器与 DefaultListableBeanFactory

程序员小航

spring 源码 源码阅读

数据结构与算法系列之跳表(GO)

书旅

数据结构 算法 Go 语言

创业项目快速分析框架

boshi

创业 商业

《穿越数据的迷宫》笔记:中文版序二

方志

数据治理

为什么边缘计算将终止云计算?

VoltDB

数据库 云计算 数据分析 边缘计算

AnyRTC --- Flutter 实现视频通话

anyRTC开发者

flutter 音视频 WebRTC 跨平台 sdk

整天都在讨论使用SpringBoot,可你居然连缓存都不清楚

小Q

Java 缓存 学习 面试 springboot

冰河教你一次性成功安装K8S集群(基于一主两从模式)

冰河

Docker 云原生 k8s

架构第十一周作业

Nick~毓

《华为数据之道》读书笔记:第 10 章 未来已来:数据成为企业核心竞争力

方志

数字化转型 数据治理

Linux常用命令速查

jiangling500

linux命令

算力 | 手写红黑树

九叔(高翔龙)

数据结构 算法 二叉树 红黑树

Appium之测试微信小程序

清菡软件测试

App

漫画:什么是 “智能供应链” ?

京东科技开发者

云计算 供应链 智能供应链

深度剖析github star数15.1k的开源项目redux-thunk

徐小夕

Java GitHub 大前端 React

淦!终于有人把Java 8和Spring 5完美合体了,业界堪称“神迹”

Java架构追梦

Java spring 架构 面试 springboot

线程池 ForkJoinPool 简介

Java老k

Java 线程池 forkjoinpool 工作窃取

《穿越数据的迷宫》笔记:第1章 数据管理的重要性

方志

数据治理

智能新时代 安全新未来 首届国网北京电力人工智能数据竞赛正式启动

极客播报

区块链落地开发,区块链版权应用搭建

t13823115967

区块链+ 区块链落地开发 区块链版权应用搭建

RocketMQ Streams:将轻量级实时计算引擎融合进消息系统_大数据_袁小栋_InfoQ精选文章