【AICon】探索RAG 技术在实际应用中遇到的挑战及应对策略!AICon精华内容已上线73%>>> 了解详情
写点什么

通过 CQRS/ 事件溯源框架 Reveno 实现高负载交易事务处理

  • 2016-06-14
  • 本文字数:8467 字

    阅读完需:约 28 分钟

在当今世界,事务的处理每时每刻都在发生,其范围包括使用关系型数据库进行订购处理的个体零售网站,乃至每秒进行 10 多万次处理的实时交易系统。

Reveno 是一个全新的无锁事务处理框架,它支持 JVM 平台,并基于 CQRS 及事件溯源模式实现。虽然它只是一个简单而强大的工具,但在性能方面也毫不逊色。所有事务都被持久化为只读的日志,并且可以通过按顺序重演事件的方式恢复领域模型的最新状态。所有的运行时操作都是在内存中执行的,从而使其吞吐量可达到每秒种几百万次事务的数量级,平均延迟时间则限制在微秒级。虽然 Reveno 的功能如此强大,但它仍然是一个通用目的的框架,涵盖了大量不同种类的用例,并提供了丰富的引擎配置选项。举例来说,你可以调整它的持久性配置,在极其随意(为了提高系统吞吐量)与高度受限(对于数据丢失的情况具有较低的容忍度)之间进行选择。

对于不同的用例来说,其需求也是千差万别的。作为一个通用框架,它需要考虑所有不同的可能性。在本文中,我将通过示例为读者展现如何使用 Reveno 框架开发一个简单的交易系统。

首先让我们来了解一下 Reveno 如何处理你的领域模型。作为一个基于 CQRS 的框架,Reveno 会将你的领域切分为 _ 事务模型与查询模型 _。对这两个模型的定义没有任何限制,你无需使用任何强制性的注解、基类,甚至不需要实现 Serializable 接口,只需使用最简单的 POJO 就可以完成任务。

没有任何一种单一的途径能够应对所有不同的用例,因此应用程序的设计者需要决定如何处理事务模型的回滚操作。Reveno 为实现模型对象提供了两种主要方式:可变与不可变模型。这两种方式各有不同的底层处理机制,并且各有其优缺点。在 Java 中,不可变对象的开销很低,并且无需进行同步操作,这种方式目前非常流行。Reveno 能够非常高效地处理不可变对象,但每种使用不可变类型的领域都会产生额外的垃圾对象,Reveno 也不例外。因此,如果你能够接受一些额外的GC 开销,那么就应当将这种方式作为默认的选择。反之,如果使用可变模型,那么在进行事务运行时,就要为已使用的对象生成额外的序列化快照,而这将影响到你的性能。幸运的是,如果你坚持使用可变模型,并且仍需要保证性能的最大化及GC 影响的最小化,那么你可以选择一种额外的可变模型特性,“补偿行为”(Compensating Actions)。简单来说,补偿行为是指在实现常规的事务处理函数时,一并实现的一种手动回滚行为。如果读者想了解这方面的更多细节,请参考 Reveno 的官方文档页面

现在,我们已经设计好了一些基本规则,那么让我们进行一些实际的编码工作吧。在我们所设计的交易系统中存在大量的帐户,每个帐户可以有零至多个订单。我们必须提供各种维护性操作,例如帐户的创建和订单的处理等等。在实际应用中,这种类型的系统可能需要应对大量的负载,例如每秒种处理 10 个直至 50 万个事务,甚至更多。更复杂的是,此类系统对于延迟非常敏感,而频繁出现的负载峰值可能会直接造成财务损失。

安装

如果你正在使用一些流行的构建工具,例如 Maven、Gradle、Sbt 等等,那么你可以在 Maven Central 中加入一个 Reveno 的依赖。目前为止,共有 3 个可用的库能够选择:

  • reveno-core —— 包括所有 Reveno 核心功能包,负责引擎的初始化和事务处理等等。
  • reveno-metrics —— 这个库中的包负责从运行中的引擎中收集各种指标,并将这些指标传递给 Graphite、Slf4j 等工具。
  • reveno-cluster —— 支持在主 - 从架构的集群中运行 Reveno,以提供故障转移的能力。

你可以在 Reveno 安装页面中找到完整的安装指南与示例。

定义事务模型

让我们首先从领域模型的定义开始我们的开发过程。正如我们之前所说,领域模型将通过简单的 POJO 进行创建。我个人倾向于使用不可变对象,因为这将大大简化整个工作。它们不仅能够绕开各种并发问题,并且最重要的一点在于,由于不需要保留已访问对象的快照,因此它在 Reveno 中有非常出色的性能表现。Reveno 允许我们直接使用不可变对象(可以说,Reveno 也成为了一个帮助我们学习如何在常规的 Java 应用中处理不可变性的优秀教程)。

让我们首先定义一个表现系统中典型的交易帐户的实体(为了简单起见,我们将实例变量都定义为 public,但在实际应用中并不存在这种限制):

复制代码
public class TradeAccount {
public final long id;
public final long balance;
public final String currency;
private final LongSet orders;
public TradeAccount(long id, String currency) {
this(id, 0, currency, new LongOpenHashSet());
}
private TradeAccount(long id, long balance,
String currency, LongSet orders) {
this.id = id;
this.balance = balance;
this.currency = currency;
this.orders = orders;
}
public LongSet orders() {
return new LongOpenHashSet(orders);
}
}

正如我们所见,这个类是不可变的。但这种值对象并不具备任何功能,往往因此被人称为“贫血”对象。因此,更好的方式是让TradeAccount 类能够实现一些实用的功能,例如处理订单以及进行货币计算:

复制代码
public class TradeAccount {
public final long id;
public final long balance;
public final String currency;
private final LongSet orders;
public TradeAccount(long id, String currency) {
this(id, 0, currency, new LongOpenHashSet());
}
private TradeAccount(long id, long balance,
String currency, LongSet orders) {
this.id = id;
this.balance = balance;
this.currency = currency;
this.orders = orders;
}
public TradeAccount addBalance(long amount) {
return new TradeAccount(id, balance + amount, currency, orders);
}
public TradeAccount addOrder(long orderId) {
LongSet orders = new LongOpenHashSet(this.orders);
orders.add(orderId);
return new TradeAccount(id, balance, currency, orders);
}
public TradeAccount removeOrder(long orderId) {
LongSet orders = new LongOpenHashSet(this.orders);
orders.remove(orderId);
return new TradeAccount(id, balance, currency, orders);
}
public LongCollection orders() {
return new LongOpenHashSet(orders);
}
}

现在,这个类就变得非常实用了。在开始讲述实际的订单处理细节之前,首先要说明一下 Reveno 如何使用它的事务模型。所有的实体都会保存在某个 repository 中,任何类型的处理函数都可以访问该 repository(我们稍后将对此进行详细地讲解)。这些实体相互之前通过 ID 进行引用,并通过 ID 在 repository 中进行访问。由于内部的性能优化机制,所有的 ID 都限制为 long 类型。

Order 类的定义也与之类似,为了简便起见,我们将忽略这部分源代码。不过,你可以在 GitHub 上下载完整的示例代码,并在本文的末尾找到更多的链接。

定义查询模型

我们已经简单地探索了如何创建 Reveno 中的事务模型。从逻辑上说,查询功能的定义也同样关键。在 Reveno 中,查询是通过“视图”的定义而创建的,每个视图都表现了事务模型中的某些实体。除了定义视图类之外,你还应当为每种视图类型提供映射器。我们稍后将对细节进行深入讲解。

当一个事务成功地完成之后,Reveno 将对所改变的实体进行映射操作,以保证视图的更新发生在命令完成之前。在默认情况下,Reveno 中的查询模型是保存在内存中的。让我们为TradingAccount 类定义一个视图:

复制代码
public class TradeAccountView {
public final double balance;
public final Set<OrderView> orders;
public TradeAccountView(double balance, Set<OrderView> orders) {
this.balance = balance;
this.orders = orders;
}
}

TradingAccountView 类中还包括其他种类视图(在这个示例中对应着 OrderView)的一个集合,在进行查询、序列化、JSON 格式转换等操作时,这种方式能够带来很大的便利。Reveno 映射器支持多种实用的方法,以简化将 ID 的集合映射到视图的集合等操作。我们稍后将进行一些实际操作。

定义命令与事务行为

为了在 Reveno 中执行事务,我们必须首先执行一个“命令”对象。命令对象本身可以是一个简单的 POJO,它需要在系统中注册一个特定的处理函数。通常来说,命令将用于执行某些聚合与校验逻辑,以只读方式访问 repository。但最重要的是,命令需要履行它的职责,以发送各种“事务行为”(因此命令也被称为“状态转变器”)。

事务行为是用于在领域模型中进行状态改变的组件,它通过对 repository 的读 - 写访问以执行。事务行为对象本身可以表现为一个 POJO,并在系统中注册对应的处理函数。所有行为组合在一起成为一个单一的原子性事务,它包含在当前所执行命令的范围内。在成功执行完成之后,事务行为将被持久化至底层的存储引擎中,并且在重启或发生任何故障之后重演其状态。

在这个交易系统中,我们需要创建新的交易帐户,并设定初始的余额。与之前的做法一样,我们首先要定义一个事务命令:

复制代码
public class CreateAccount {
public final String currency;
public final double initialBalance;
public CreateAccount(String currency, double initialBalance) {
this.currency = currency;
this.initialBalance = initialBalance;
}
public static class CreateAccountAction {
public final CreateAccount info;
public final long id;
public CreateAccountAction(CreateAccount info, long id) {
this.info = info;
this.id = id;
}
}
}

我们实际上共创建了两个类。CreateAccount 作为命令,CreateAccountAction 则作为事务行为。通常来说,这种切分方式并非强制性的。如果命令与事务行为数据完全匹配,那么你可以放心地重用同一个类。但在这个示例中,我们所获取的货币数值类型为 double(例如来自于某些遗留的终端系统),而在内部引擎中,货币的数值将保存为 long,以确保其精确度能够完全匹配。

现在,我们就可以初始化一个 Reveno 引擎,并定义命令与事务行为的处理函数了:

复制代码
Reveno reveno = new Engine(pathToEngineFolder);
reveno.domain().command(CreateAccount.class, long.class, (c, ctx) -> {
long accountId = ctx.id(TradeAccount.class);
ctx.executeTxAction(new CreateAccount.CreateAccountAction(c, accountId));
if (c.initialBalance > 0) {
ctx.executeTxAction(new ChangeBalance(
accountId, toLong(c.initialBalance)));
}
return accountId;
});
reveno.domain().transactionAction(CreateAccount.CreateAccountAction.class,
(a, ctx) -> ctx.repo().store(a.id,
new TradeAccount(a.id, a.info.currency)));
reveno.domain().transactionAction(ChangeBalance.class,
(a, ctx) -> ctx.repo().
remap(a.accountId, TradeAccount.class,
(id, e) -> e.addBalance(a.amount))
);

这段代码包括的内容很多,让我们仔细分析一下。首先,我们定义了一个 CreateAccount 命令处理函数,它负责生成下一个帐户 ID,并执行了一个事务命令以进行帐户的创建。如果在定义时传入了初始的余额,则还需执行 ChangeBalance 这个事务行为。需要指出的是,ctx.executeTxAction 这个方法调用不会阻塞。当命令处理函数成功完成之后,所有事务行为都会在一个单一的进程中执行。因此,如果需要在任何一个 TxAction 处理函数中进行回滚操作,那么这些事务行为所生成的改动都将被顺利回滚(实际的回滚机制是基于事务模型等实现的)。

将实体映射至查询模型

由于事务模型与查询模型是分离的,因此我们需要定义一些映射器,将实体转换为对应的视图表现。不过,我们并不需要在代码中明确地调用这些映射方法,因为 Reveno 会自动发现 repository 中的“脏”实体,并调用相应的映射方法。让我们看看 TraceAccount 是如何映射到 TradeAccountView 的:

复制代码
reveno.domain().viewMapper(TradeAccount.class,
TradeAccountView.class, (id,e,r) ->
new TradeAccountView(fromLong(e.balance),
r.linkSet(e.orders(), OrderView.class)));

这段代码中的 id 指代实体的标识符,e 指代实体本身,而 r 指代一个特殊的映射上下文,其中包含各种实用的方法。实际完成映射工作的是 r.linkSet(…) 方法,它将以延迟的方式将 ID 指针的集合映射至实际视图的集合。

我们可以通过相同的方式定义 Order 至 OrderView 之间的映射:

复制代码
reveno.domain().viewMapper(Order.class, OrderView.class, (id,e,r) ->
new OrderView(fromLong(e.price), e.size, e.symbol,
r.get(TradeAccountView.class, e.accountId)));

正如你所见,我们的查询模型也是通过不可变对象组成的,这一点与事务模型中的实体一样,它将极大地简化映射逻辑。再次强调,虽然这一点并非强制约束,但如若不然,则我们必须自行负责映射逻辑的正确性。

执行命令

Reveno 中的事务处理操作默认就是异步的。当你在一个运行中的引擎中执行某个命令时,该方法调用将立即返回一个 CompletableFuture 对象,最终将在将来某一时刻返回结果。Reveno 在内部定义了一个具有多个阶段的“管道”,每个阶段将处理各自的线程。在这些管道中如果选择逐个传递对象会产生很大的消耗,因此在这里就可以使用批处理方式。在高负载情况下,Reveno 会在每个阶段处理进行批处理。正因为如此,Reveno 在一开始就为系统提供了高吞吐能力。

在完成了所有定义与业务逻辑实现之后,我们可以开始使用这个引擎了。首先我们需要启动它:

复制代码
reveno.startup();

随后,我们就可以在系统中创建一个新的交易帐户了。你应该留意一点,executeCommand() 方法也存在一个同步的版本,它对于测试以及编写示例非常有用:

复制代码
long accountId = reveno.executeSync(new CreateAccount("USD", 5.15));

在这个示例中,Reveno 内部将调用相应的命令以及事务行为处理函数,后者将为你创建一个新的美元帐户,并将初始余额设为 5.15 美元。我们可以通过以下方式检查它的正确性:

复制代码
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).balance);

这段代码将打印出“5.15”。为了让这个示例看起来更有趣,让我们为这个帐户添加一个新的订单:

复制代码
long orderId = reveno.executeSync(
new MakeOrder(accountId, "EUR/USD", 1, 1.213));

我们在这里创建了一个以 1.213 美元购买 1 欧元的新订单。随后,我们可以再次检查帐户信息,以了解其中的变化:

复制代码
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).orders.size());

这一次所打印的结果是“1”,这表示这个帐户有一个未完成的订单。最后,让我们关闭这个订单,完成这个以美元购买欧元的操作,它将会使帐户中的余额减少 1.213,最终余额为 3.937 美元。

复制代码
reveno.executeSync(new ExecuteOrder(orderId));
// the balance is expected to be 3.937, after order successfully executed
System.out.println(reveno.query().find(TradeAccountView.class,
accountId).balance);

持久化

正如我们在介绍部分所说的一样,Reveno 首先是一个事务处理框架。你可以在同一个目录下重启你的引擎,而仍然能看到模型的最新状态。我们尽可能使该框架的每个部分都具有可配置性,而一点与持久性同样相关。你可以通过调用 reveno.config() 方法查看所有的可选项。

发布事件

Reveno 本身自带一个事件处理子系统。你的任务就是为它定义自己的事件类(通常来说定义为 POJO)以及处理函数,并发布这些事件以处理事务行为。需要指出的重点在于,事件只有在命令执行成功之后才会被发布,并且完全是异步的。所有的视图映射过程都严格地发生于事件处理函数执行之前。

事件的执行结果同样会被持久化到存储引擎中,如果事件成功地完成了,那么在引擎重启之后通常也不会被再次处理。但这种行为并不能得到严格的保障,因此,如果你需要确保处理函数是 100% 幂等的,则应当检查 EventMetadata.isReplay 这个标记,在每个事件处理函数中都可以访问它。

让我们再次扩展一下这个示例,让它对于某个交易帐户中的余额变化事件进行发布与处理。首先,我们将定义这个事件,并添加适当的字段:

复制代码
public class BalanceChangedEvent {
public final long accountId;
public BalanceChangedEvent(long accountId) {
this.accountId = accountId;
}
}

当某个帐户的余额产生变化时,我们只需要了解帐户的 ID,因为我们可以在处理函数中查询相应的视图。我们将通过以下方法定义事件处理函数:

复制代码
reveno.events().eventHandler(BalanceChangedEvent.class, (e, m) -> {
TradeAccountView account = reveno.query().find(TradeAccountView.class,
e.accountId);
System.out.println(String.format(
"New balance of account %s from event is: %s",
e.accountId, account.balance));
});

相应地,我们也需要在 ChangeBalance 事务行为处理函数的定义中添加一行代码:

复制代码
reveno.domain().transactionAction(ChangeBalance.class, (a, ctx) -> {
ctx.repo().remap(a.accountId, TradeAccount.class,
(id, e) -> e.addBalance(a.amount));
// publish an event to all listeners
ctx.eventBus().publishEvent(new BalanceChangedEvent(a.accountId));
});

由于 ChangeBalance 这个事务行为出现在多个命令中,当添加了这段事件发布代码后,我们就会不断收到它的事件。还有一点需要注意,publishEvent 调用会立即返回,而事件的发布则是 _ 最终 _ 某一时刻才会发生的。最终,我们将看到以下输出:

New balance of account 1 from event is: 5.15

New balance of account 1 from event is: 3.937

性能检测

现在,整个示例已经可以运行了,那么让我们来看看这个应用能够处理怎样的负载。Reveno 提供了一个非常实用的reveno-metrics库,能够帮助你追踪某个运行中引擎的性能指标。与其他部分一样,这个指标库也经过了优化,包括堆外内存(off-heap memory)的使用以及无锁的代码,因此它对于整体性能所造成的影响非常小。它还支持与某些流行的监控系统进行集成,例如 Graphite。

(需要指出的是,reveno-metrics 总的来说是一个性能监控工具,而不是一个微基准测试框架。如果要获取准确的基准测试结果,可以考虑使用 JMH 或类似的工具。)

我们选择的环境 MacBook Pro 2.7 GHz i5 CPU,首先要在代码中对指标集合进行初始化,以使用 Reveno Slf4j 这个 sink ,随后重复运行 ChangeBalance 这个命令 4 千 5 百万次(包括用于预热的迭代):

  • reveno.instances.MAC-15_local.default.latency.mean: 68804
  • reveno.instances.MAC-15_local.default.latency.min: 775
  • reveno.instances.MAC-15_local.default.latency.max: 522265
  • reveno.instances.MAC-15_local.default.throughput.hits: 1183396

这些数字表示平均延迟时间约 69 微秒,最小值为 775 纳秒,而最高值则为 522 微秒,总吞量是每秒运行 1183396 次事务。考虑到后台所需完成的各种工作以及持久性级别,这一结果令人印象十分深刻。

结论

Reveno 框架目前才刚刚崭露头角,但它的发展十分迅速。你可以访问我们的官方网站以学习更多的相关知识。我们对于任何建议以及反馈都保持开发的态度。你也可以加入我们的 Google 讨论小组,在 Issues 页面中提交 bug,或是向 mailto:support@reveno.org 提交非公开问题或其他任何问题。

本文中所描述的 Demo 的完整代码可以在 GitHub 上找到,你还能够找到使用 Reveno 的各种示例(或者自行提交一个pull request)。

关于作者

Artem Dmitriev目前在 GetIntent 这家广告科技创业公司担任软件工程师,他的工作是交付大规模实时投标这方面需求的平台。近几年来,Artem 致力于在 JVM 平台上创建高负载的系统。他的工作背景包括为多市场交易平台开发核心引擎,这些平台通常对于延迟与吞吐量有很高的要求。Artem 对于开源软件及其开发充满热情,他热诚欢迎用户的反馈,可以通过 art.dm.ser@gmail.com 向他发送邮件。

查看英文原文 High Load Trading Transaction Processing with Reveno CQRS/Event Sourcing Framework

公众号推荐:

2024 年 1 月,InfoQ 研究中心重磅发布《大语言模型综合能力测评报告 2024》,揭示了 10 个大模型在语义理解、文学创作、知识问答等领域的卓越表现。ChatGPT-4、文心一言等领先模型在编程、逻辑推理等方面展现出惊人的进步,预示着大模型将在 2024 年迎来更广泛的应用和创新。关注公众号「AI 前线」,回复「大模型报告」免费获取电子版研究报告。

AI 前线公众号
2016-06-14 19:217468
用户头像

发布了 428 篇内容, 共 171.4 次阅读, 收获喜欢 36 次。

关注

评论

发布
暂无评论
发现更多内容

API渗透测试的基本流程及关键点

阿泽🧸

11月月更 API渗透测试

正则表达式学习笔记(二)

lxmoe

正则表达式 学习笔记 11月月更

极客时间运维进阶训练营第四周作业

9527

这一次,带你深入浅出Go语言切片和数组

海风极客

Go 11月月更

全网讲的最好的微服务,SpringCloud架构进阶

程序知音

Java 微服务 SpringCloud java架构 后端技术

码农必备?清华大学开源了一款写代码神器!

Jackpop

正则表达式学习笔记(一)

lxmoe

正则表达式 学习笔记 11月月更

云原生系列 【轻松入门容器基础操作】

叶秋学长

云原生 沙箱实验 11月月更 操作手册

【LeetCode】找到最高海拔Java题解

Albert

算法 LeetCode 11月月更

2022年最新版68道Redis面试题,20000字干货,赶紧收藏起来备用!

钟奕礼

Java 程序员 java程序员 java面试 java编程

链路状态路由协议 OSPF (三)

我叫于豆豆吖.

11月月更

pytorch实现卷积神经网络实验

Studying_swz

人工智能 11月月更

YRCloudFile V6.9.0 加速企业在大数据应用技术创新

焱融科技

云计算 分布式系统 高性能 文件存储

链路状态路由协议 OSPF (二)

我叫于豆豆吖.

11月月更

面了个阿里拿38k出来的,让我见识到了基础顶端

程序知音

Java java面试 java架构 后端技术 Java面试八股文

昇腾AI创新大赛燃情上演,大咖齐聚共话人工智能发展新篇章

Geek_2d6073

一款超好用的开源密码管理器!

Jackpop

Spring 5(六)新功能

浅辄

Spring5 JUnit 11月月更

算法题学习---判断一个链表是否为回文结构

桑榆

算法题 11月月更

2022-11-18:给定一个数组arr,表示连续n天的股价,数组下标表示第几天 指标X:任意两天的股价之和 - 此两天间隔的天数 比如 第3天,价格是10 第9天,价格是30 那么第3天和第9天的指

福大大架构师每日一题

算法 rust 福大大

用户特征分析的方法

穿过生命散发芬芳

11月月更 用户特征分析

复杂时序逻辑电路

攻城狮Wayne

Verilog 11月月更 时序逻辑

芯启源加入龙蜥社区,推动集成电路和DPU芯片创新落地

OpenAnolis小助手

开源 龙蜥社区 CLA 芯启源

[力扣] 剑指 Offer 第四天 - 数组中重复的数字

陈明勇

Go 数据结构与算法 力扣 11月月更

计算机网络:VLAN基本概念与原理

timerring

计算机网络 VLAN 11月月更

综合实验——高级网络应用检测

我叫于豆豆吖.

11月月更

【C语言】if 关键字

謓泽

11月月更

Meta开源新工具啊,Git地位危险了?

Jackpop

【愚公系列】2022年11月 微信小程序-页面配置

愚公搬代码

11月月更

K8S环境的Jenkin性能问题处理

程序员欣宸

Kubernetes jenkins 11月月更

三面头条 + 四面阿里 + 五面腾讯拿 offer 分享面经总结

程序知音

java面试 大厂面试 java架构 后端技术 Java面试八股文

通过CQRS/事件溯源框架Reveno实现高负载交易事务处理_Java_Artem Dmitriev_InfoQ精选文章