【AICon】探索RAG 技术在实际应用中遇到的挑战及应对策略!AICon精华内容已上线73%>>> 了解详情
写点什么

苏宁 11.11:基于 Apache Ignite 日均十亿数据对账实践应用

  • 2018-11-12
  • 本文字数:6630 字

    阅读完需:约 22 分钟

苏宁11.11:基于Apache Ignite日均十亿数据对账实践应用

项目背景

对账平台简介

时至今日,分布式微服务架构在大型企业应用建设中已经得到普遍使用。企业内部,随着各服务子系统的拆分,业务数据的处理链路也越来越长,一条数据从最上游业务系统产生到最下游消费系统使用往往需要经历数个甚至十几个大小不一、作用不同的服务子系统;企业对外,和外部系统例如银行、第三方接口的数据吞吐也越来越多,典型的例如转账支付交易记录核对。因此,对于多方流转共用的数据进行比较核对、及时发现差错遗漏或重复这种需求就日益增多。遵循避免重复开发、抽象共通规则、建设专职系统的思路,苏宁内部很早就开发实施了一套数据对账开放平台(以下简称为“对账平台”)并得到良好应用,主体架构及数据处理过程如下简图。


问题提出与解决思路

随着集团大开发战略快速推进,业务数据量呈现爆发式增长态势,原对账平台已逐渐不能满足使用,主要表现为清洗与核对计算速度较慢、整体架构难以扩展,限制了接入业务系统的数量和数据规模。因此,对账平台升级改造项目应运而生。


针对原对账平台的问题进行分析,不难发现,不具备良好水平扩展性是其最关键的痛点,主要体现之一便是核对计算的数据库存储过程性能不能通过扩大数据库集群等方法来实现近线性扩展。其次,痛点之二是计算处理过程中磁盘 IO 操作依赖较重,核对计算存储过程执行过程中需要频繁扫描磁盘数据,导致性能较差。为解决这些问题,我们首先确定升级改进的大方向,即使用分布式计算架构代替数据库存储过程集中计算架构,并经过多次地调研对比,最终采用 Apache Ignite 平台作为本次升级改进的主要技术选型。

基于 Ignite 的升级方案要点

元数据结构:二进制编组器

由于业务系统动态接入,对账平台在设计开发阶段无法预知接入方的数据格式,即数据结构拥有哪些字段、分别为什么类型。这些元数据结构都是在系统上线运行过程中由对账管理员创建对账计划时配置,因此对账平台必须支持元数据管理,并且在数据处理过程中使用元数据作为数据模式(Schema)。


对账平台原先的解决方案为:将用户配置的数据模式字段信息存储为数据库中的结构化数据,类似于关系型数据库中的系统表如 MySQL 中的 information_schema.COLUMNS,对账平台使用赋予 DDL 权限的账号连接数据库,根据配置字段信息生成建表语句,根据配置的数据清洗、核对规则生成计算存储过程脚本,再执行这些语句脚本以实现运行时元数据配置的支持。这种方案优点是实现相对简单容易,但是缺点也很明显:


第一应用程序使用的数据库账户权限较高;


第二关系型数据库的行存方式导致只取用少数列时性能不够理想;


第三使用存储过程完成数据处理计算不利于横向扩展。


对此,新方案采用 Ignite 中的二进制编组器(Binary Marshaller)来满足数据处理过程中的元数据支持需求。Ignite 的二进制编组器是一种新的序列化/反序列化格式,它可以实现数据对象值、元数据的动态处理,它提供了以下几个特性:


  1. 允许只访问数据对象的部分属性而不用反序列化整个对象。

  2. 允许运行时动态添加或者删除对象字段而不需要固定的数据模型类(Model)。

  3. 允许基于类型名称创建新对象而不需要数据模型类定义。


这样一来,通过以上几点,在不直接操作使用 CGlib、javassist 之类动态字节码生成技术的情况下,新方案可以满足动态元数据配置及使用的需求。并且,由于业务系统原始数据可能字段多达上百个而真正需要对账处理使用的可能仅十多个,基于上述第一条特性,在后面的数据处理计算过程中,本方案较原方案相比效率上有很大提升。

数据接入:跨节点 ExecutorService 与分布式内存缓存

业务系统原始数据接入是整个处理过程中的第一步。由于各业务系统建设时期、技术架构大多存在差异,因此最初与各业务系统协商达成一致,采用业务系统生成文本格式数据文件传输到公共 FTP 服务器的方式来实现数据接入,对账平台按照设定好的定时调度任务驱动,查询数据库获取处于“解析数据文件”环节“待处理”状态的对账任务,根据获取到的结果数据访问 FTP 检查并获取所需的数据文件,再根据用户事先配置好的数据模板来解析数据文件并持久化入库等待后续处理。


由于采用了苏宁内部的集中定时调度架构,对账平台集群作为定时调度任务平台的客户端,每次触发器只会将调度运行指令下发到对账平台集群上的某一个节点来启动解析数据文件入库的任务。



原方案调度解析任务


在业务系统数据量一般的情况下,单节点解析处理尚能满足需求,但随着更多财务相关业务系统接入,财务业务系统每个对账周期内的数据量较大、对解析处理时限要求较高,单节点解析处理已不能支撑其需求。并且单节点执行任务这种模式显然没有充分利用整个集群的计算资源,容易造成某节点负荷过重而其它节点却比较空闲的情况。鉴于此,设法改进任务执行方式,调用多节点并行执行同一个任务,既能改善单个任务的执行效率,又能分摊任务执行压力。


Ignite 计算网格中的分布式 ExecutorService 能力正是非常适合的解决方案。它实现了 JDK 标准的 ExecutorService 接口,因此原来单节点创建提交任务逻辑代码几乎不用改动即可在新的方案中继续使用,只是线程池由原来的单节点 JVM 进程内变成了整个集群化跨节点,本质上是由 Ignite 将某节点提交的计算程序闭包序列化后发送到分配的节点上反序列化执行(如图)。这样,解析数据文件入库的任务可以实现分布提交、分布执行,快速实现了集群并行化,并自动获得了任务负载均衡、故障自动转移的特性。



新方案调度解析任务


在调用集群多节点并行加载数据时,将数据加载到何处也是需要考虑的问题。原解决方案将解析后的原始数据以 JDBC batch 操作方式直接存入关系型数据库,后续的核对过程也在数据库中完成。新方案则采用 Ignite 核心也是最强劲的特性“分布式数据网格”作为数据主工作存储。主要依据其有以下重要特点:


  1. 分布式。Ignite 数据网格一定程度上和 HDFS 比较相似,两者均支持数据分片存储、多副本备份以实现数据

  2. 安全。但不同的是,Ignite 支持数据的更新与删除,并且支持事务。理论上,采用分区模式的 Ignite 数据网格可以轻松地通过增加集群节点方式进行水平扩展,从而支持 TB 级的内存数据存储。

  3. SQL 支持。Ignite 数据网格自带 SQL 引擎,并支持 JDBC 驱动,可以通过标准的 JDBC 方式进行 DDL、DML 操作,极大地降低了编程开发难度和使用门槛。

  4. 索引支持。IgniteSQL 网格支持索引。对于 SQL 中声明创建的每一个索引,Ignite 都会采用一颗专用 B+树的数据结构来管理缓存数据。通过索引特点,Ignite 能够进一步提高数据查询检索效率以满足更高响应时间要求。

  5. 支持通读通写。内存毕竟不是持久化,数据最终还是需要落到诸如关系型数据库的持久化存储中去。通过配置通读、后写缓存,新方案同样实现数据到持久化存储中的批量操作。



Ignite 分布式数据网格(官方图)

预处理清洗:规则链封装与 StreamReceiver

处理完原始数据的接入,接下来关键是各种数据处理规则来对数据进行加工计算,可以说这是对账平台最重要最复杂的环节之一。由于接入的业务系统之间差异很大,数据质量参差不齐,并且数据比对逻辑也千差万别,因此对账平台无法按照开发专用程序的方式来针对每一个接入业务系统进行定制,而必须支持用户灵活配置各种规则。根据业务梳理,主要的清洗规则分为两类:


  1. 单行内清洗,包括:


条件筛选:判断是否符合设定条件以决定是否继续进行后续清洗。例如,对于“交易类型”等于 A 的数据继续本组规则后续处理;


文本截取:指定文本字段无条件或满足设定条件时正/反向截取特定起止位置子串。例如,当“订单类型”字段第 1~2 位等于“XS”时正向截取“备注”字段第 8~10 位设置到字段“预留 1”中;


单行多个文本字段拼接:将指定的多个文本字段内容连接成新字符串。例如,将“交易日期”、“流水号”、“支付方式”拼接设置到字段“预留 2”中;


数字字段运算:单个或多个数字类型字段值进行表达式四则运算。例如,将表达式“单价*(销售数量-退单数量)/10000.0”的计算结果存入字段“预留 3”中;


字段设值:根据某字段条件将另一字段设置为指定值。例如,当“门店代码”包含“NJ”时将“地区”设置为“南京”;


筛选过滤:抛弃符合设定条件的数据行。例如,当“备注”字段包含“重复下单”时该条数据不参与后续处理及核对;


等等;


  1. 多行间清洗,包括:


分组汇总:根据指定的分组字段累加设定的汇总字段。例如,将“交易单号”、“商家代码”相同的数据行汇总其“订单金额”、“商品数量”;


分组排重:根据指定的分组字段排重只保留一行。例如,根据“凭证号”、“流水号”分组,只保留一条数据进行后续处理。


由上可见,数据清洗规则组合多变,每个清洗规则组中可以包含多个具体清洗规则,多个清洗规则组之间还存在顺联关系,可以看做一个 ETL 过程,整体还是比较复杂的。


对此,对账平台原方案设定了不可更改的规则优先级,强制单行条件筛选规则最先处理、多行清洗规则最后处理、其它规则按配置的先后顺序处理,具体实现采用数据文件解析循环中完成单行清洗规则处理、动态生成存储过程完成多行间清洗规则处理的办法。此方案对于大多数一般需求已可以满足,但也可以看出,强制优先级不能满足复杂需求,例如先条件筛选后文本截取、根据截取结果再条件筛选的场景;将“数据清洗”过程割裂为单行、多行两部分分散实现,也不利于扩展及维护。


新方案则首先对清洗规则进行抽象,将各具体规则地位平等化,视作对每行数据的不同计算处理,多个具体清洗规则按照配置的先后顺序组成一条处理链,多条处理链形成一个数据处理管道(如图)。



预处理加工计算抽象示意


显而易见,该模型符合流处理特点,结合前述的数据文件解析步骤,非常适合使用 Ignite 的数据注入和流处理特性。通过 IgniteDataStreamerAPI,对账平台可以持续高效地注入数据,并且通过 StreamReceiver 将清洗规则管道包含在其中对数据进行处理。基于各节点并行的数据解析注入任务,StreamReceiver 很容易实现大规模的高效单行数据计算处理,类似 Spark 中 RDD 的 Map 算子操作,但与 Spark Map 算子操作不同的是,Ignite StreamReceiver 数据处理 process 方法的入参是可变的数据实体,开发人员可以在计算过程中直接改变数据实体的值(例如对某字段的值进行截取并更新),由 Ignite 本身对数据实体进行并发锁安全控制;而 Spark Map 算子方法入参是不可变的数据对象,原数据对象的值不可改变,每次计算实际上生成新的 RDD。单行清洗规则解决了,那么多行间清洗规则怎么办呢?此处,我们利用了 Ignite 本身核心的分布式键值缓存能力。两种多行间清洗规则其根本都可以看做是分组(Group),那么当遇到多行间清洗规则时,我们先设置一个 IgniteCache,在处理行数据时,取规则的分组字段拼合作为键,到该键值缓存中查找是否存在对应的值,如果不存在,那么就将本行数据放入其中,如果已存在,那么就视具体规则为分组汇总还是分组排重,将该行数据的汇总字段值累加到缓存值对象中或者抛弃。通过这种方式,单行清洗规则与多行清洗规则都可以应用到循环行处理的清洗规则链管道模型中去,从而符合统一抽象设计。

双方数据核对:内存数据库与 SQL 网格

之前的数据加载及预处理加工都是 A、B 单方各自处理,双方之间数据互不影响,而主业务过程的最后一个环节即“核对”环节,就需要用双方的数据进行比对了。为了提高平台通用性满足复杂多变的对账业务需求,核对环节同样支持多种核对规则单元的灵活组合配置,基本核对规则单元主要包括:


  1. 精确匹配,指 A 方数据记录的某字段值等于 B 方数据记录的某字段值。例如,A 方数据的“订单号码”等于 B 方数据的“交易流水号”。

  2. 相关匹配,指 A 方数据记录的某字段等于指定值,同时 B 方数据记录的某字段等于另一指定值。例如,A 方数据的“交易类型”等于“下单”,B 方数据的“收支类型”等于“收入”。

  3. 容差匹配,又分为正、反向两种,指 A 方数据记录的某数值字段值与 B 方数据记录的某数值字段值的差(正向)或者和(反向)的绝对值小于等于指定值。例如,A 方“订单金额”与 B 方“支付金额”差值绝对值小于等于 0.01(元)。

  4. 加减匹配,指 A 方的一个或多个数值字段进行加减运算后的值与 B 方数据记录的一个或者多个数值字段加减运算值相等。例如,A 方的(“订单金额”-“优惠金额”)等于 B 方的(“资金收入”+“虚拟抵扣”)。

  5. 一个或多个基本规则单元组成一条核对规则,多条核对规则再进一步构成完整的多层次核对规则组,双方数据需依次尝试依据核对规则组中的核对规则,直到符合某条核对规则而核对成功或者不符合任一核对规则而记为失败差异。在规则组中还隐含了一条“潜规则”原则:A、B 方每条数据最多只允许被成功匹配使用一次。假设 A 方某条数据 A1 能且仅能与 B 方某两条数据 B1、B2 符合核对规则 R1,那么 A1 与 B1 核对成功后 A1、B1 即被使用消耗,A1 不能再用作与 B2 核对,故 B2 为差异。


显然,在核对过程中,双方数据记录一旦依据某规则核对成功就需要采用某种方式进行标记筛除,以免被重复使用。原方案基于关系型数据库存储过程实现双方数据核对,概括就是:由应用根据核对规则组生成存储过程程序脚本,动态刷入数据库再调用;在双方数据表中设一字段用来标识该记录是否被核对成功使用,初始值均为否;存储过程中以一方数据查询结果集为游标,遍历每行数据,以基本核对规则单元作为查询条件,到另一方数据表中查找尚未核对成功并匹配的记录,若有,则取第一行作为核对成功,更新双方的成功使用标识。 可以看出原解决方案基于磁盘数据持久化的读写性能也不能支持更高的效率要求,并且结果集游标遍历查询核对算法的时间复杂度较高(O(n2)),故新方案重点解决这两方面的问题。对于第一点,实际上在前述数据接入环节新方案设计已经将双方数据分别放入 Ignite 分布式内存集合中,因此数据 IO 大多都基于内存,速度自然比磁盘 IO 要高出许多。关于第二点,新方案采用 Ignite 的内存数据库特性,将 A、B 双方数据的分布式内存缓存集合利用 SQL 语句能力进行处理,用先排序后比对的算法将时间复杂度降低到 O(n),具体做法为:根据核对规则所使用到的字段(包括精确匹配、容差匹配、加减匹配后的值等)作为排序字段,组成排序 SQL 语句,对 A、B 方数据缓存合集分别进行排序并增加行号;再从双方排序后数据集中从第一条开始,依据排序字段顺序进行逐字段规则单元核对比较,若全部符合则为核对成功,将双方数据行的标记字段更新,且双方均取下一条记录开始新核对,若不符合,则取核对失败字段值较小一方的下一条数据再来从该核对规则的首个单元字段开始尝试核对;重复此过程直到某方数据耗尽。(如下图)



基于排序数据集的核对算法示意


由于双方数据在分布式内存缓存中,且分布式并行排序效率很高,加上核对算法通过基于排序后的数据进行比对避免了对一方全量无序数据的重复查询检索,因此总体性能上有很大提升。进一步地,还可以对一方的某一精确匹配规则单元字段值进行采样分段,作为上下界参数构造出一个部分核对的子任务,其只负责从缓存数据集中取上下界内的数据并结合另一方进行核对,从而实现多线程并行处理以达到更高效核对的目标。

实践效果

以某真实数据对账需求为例,该需求 A、B 双方原始数据各约 1600 万,A 方数据模板共 19 个字段,B 方数据模板共 127 个字段,A 方数据清洗规则 4 组共 653 条,B 方数据清洗规则 19 组共 61 条,双方核对规则 4 组共 24 条。在相同的硬件条件下,各环节用时等指标对比如下。



除可以实现更大数据量处理外,更重要的是新方案可以轻松实现水平扩展以提高系统的处理能力,只需要向集群中增加 Ignite 节点即可。因此,目前线上实际在用的对账平台日均累计处理数据量已达到十亿左右,有力地支撑了前端各业务源系统的运行数据后期处理需求。




作者简介


任道亮,苏宁易购 IT 总部员工平台研发中心高级技术经理,负责小微企业发票服务、对账开放平台等应用系统的架构、开发工作。曾参与电力、电信运营商、公安等行业的多个大小企业应用系统开发项目,具有较广行业范围的从业经历。对新流行技术有较大好奇心和良好学习能力,在服务端、大数据开发方面有较深的理解和丰富的项目实践经验。


蒋旭曦,苏宁易购 IT 总部员工平台研发中心副总监,负责共享产品和智能应用产品的研发管理工作。主导并参与 RPA 机器人,苏宁云财小微企业 SAAS 财税服务,集团合并报表,共享作业平台,极客办公平台等多个重要产品的架构设计和研发管理工作。有多年的企业咨询和互联网从业经验,曾带领团队实施过多个知名外企的 ERP,BPM,DW,BI 项目,也在互联网独角兽公司担任过资深架构师的职位。


2018-11-12 10:0110282

评论 4 条评论

发布
用户头像
学习了
2018-12-10 16:24
回复
用户头像
厉害,大佬
2018-11-12 16:36
回复
用户头像
厉害,涨知识了~
2018-11-12 16:35
回复
用户头像
good job!
2018-11-12 16:31
回复
没有更多了
发现更多内容

npm进阶(一) 更换成淘宝镜像源以及 cnpm

No Silver Bullet

npm 12月日更

南瓜电影 7 天内全面 Serverless 化实践

Serverless Devs

阿里云 ECS 南瓜电影 SAE

恒源云(GPUSHARE)_【功能更新】实例日志上线,操作一目了然

恒源云

深度学习 算力加速

Go语言学习查缺补漏ing Day5

恒生LIGHT云社区

golang 编程语言

Kafka之为什么需要消息队列

编程江湖

大数据 kafka

Kyligence + 亚马逊云科技丨实现云上的精细化运营和数字化指挥

Kyligence

如何搭建批流一体大数据分析架构?

Kyligence

es单机安装及配置其系统服务

elasticsearch

7.《重学JAVA》--运算符

杨鹏Geek

Java 25 周年 28天写作 12月日更

中科柏诚持续推进数字网络技术,蓄力元宇宙布局

联营汇聚

十年期货股票行情数据轻松处理——TDengine在同心源基金的应用

TDengine

数据库 tdengine 时序数据库

SpringBoot中如何优雅的使用多线程

编程江湖

JAVA开发 springboot

腾讯音乐iOS开发四次面试记录

iOSer

ios 腾讯 面试题 iOS面试 腾讯音乐

老电影和图片变清晰的秘密!分辨率提升400%的AI算法

百度大脑

人工智能

Python代码阅读(第68篇):指定值出现次数

Felix

Python 编程 列表 阅读代码 Python初学者

主机入侵检测策略之基线检测

网络安全学海

网络安全 信息安全 渗透测试 安全漏洞 暴力猜解

谈谈对微软Dapr的理解

行云创新

微软 服务网格 dapr

网易云信发布两大元宇宙解决方案,打响进军元宇宙第一枪

网易云信

人工智能 音视频 元宇宙

【IT运维】公司内网服务器可以远程桌面连接吗?怎么连接?

行云管家

云计算 运维 IT运维 远程运维

元宇宙浪潮之下,数字身份至关重要

CECBC

netty系列之:性能为王!创建多路复用http2服务器

程序那些事

Netty 程序那些事 http2 12月日更

飞桨双十二礼包,上海“拆箱”啦!

百度大脑

人工智能

元宇宙与电信运营商

CECBC

6000字,详解数据仓库明星产品背后的技术奥秘

百度开发者中心

数据库 大数据

Web3.0时代的社交网络会有哪些新变化?

CECBC

【等保小知识】信息安全等级保护四级系统有哪些?

行云管家

网络安全 等级保护

List 去重的 6 种方法

编程江湖

List java 编程

Rust 元宇宙 14 —— 创建角色和同步

Miracle

rust 元宇宙

JavaScript 中的 .forEach() 和 for...of

devpoint

JavaScript foreach for...of 12月日更

Aeron 是如何实现的?—— Ipc Subscription

BUG侦探

共享内存 Aeron Ipc Subscription

Linux一学就会之Centos8系统进程管理 ps管理进程

学神来啦

Linux 运维 linux一学就会 uptime centos8

苏宁11.11:基于Apache Ignite日均十亿数据对账实践应用_大数据_任道亮_InfoQ精选文章