GTLC全球技术领导力峰会·上海站,首批讲师正式上线! 了解详情
写点什么

Event Sourcing 和 CQRS 落地(四):深入使用 -Axon

2019 年 7 月 04 日

Event Sourcing和CQRS落地(四):深入使用-Axon

在本系列的前一篇文章中,我们介绍了 CQRS 实现,以数据库为例使用 JPA 去做 View 的 ORM,本文将重点介绍 Axon 实现。


深入使用 Axon

实现 snapshot

在前面文章中,我们略微涉及了一些 snapshot 的概念,其实这个概念还是比较好理解的。当事件堆积到一定程度,每次 load 都会花费一定时间,这个时候自然会想到 snapshot,先将部分事件进行计算,然后生成 snapshot,后续 load 的时候先读取 snapshot,这样就省去了很多计算过程。如果你读过之前改写 event store 部分的代码,就会发现每个 aggregate 实际上只会存储一个 snapshot,每当新生成时会替换老的。对一个 aggregate 来说,snapshot 同样和事件差不多,它都可以当做事件来处理,但是对于 aggregate 来说,我并不想去处理 snapshot,我只是需要一个计算好的结果而已。基于这个,Axon 给我们提供了 AggregateSnapshotter,这类 snapshot 可以直接还原 aggregate 状态。


了解大概原理之后,我们要做的事情其实比较明确:


  1. 告诉 Axon 我们要在什么时候触发 snapshot 的生成。

  2. 告诉 Axon 我们要生成什么样的 snapshot。


    @Bean    public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore,                                                                                        SnapshotTriggerDefinition snapshotTriggerDefinition,                                                                                        ParameterResolverFactory parameterResolverFactory) {        return CustomEventSourcingRepository.builder(ContractAggregate.class)            .eventStore(eventStore)            .snapshotTriggerDefinition(snapshotTriggerDefinition)            .parameterResolverFactory(parameterResolverFactory)            .build();    }
@Bean public SnapshotTriggerDefinition snapshotTriggerDefinition(Snapshotter snapshotter) { return new EventCountSnapshotTriggerDefinition(snapshotter, 5); }
@Bean public AggregateSnapshotter snapShotter(CustomEmbeddedEventStore eventStore, ParameterResolverFactory parameterResolverFactory) { return AggregateSnapshotter.builder() .eventStore(eventStore) .parameterResolverFactory(parameterResolverFactory) .aggregateFactories(Collections.singletonList(new GenericAggregateFactory<>(ContractAggregate.class))) .build(); }
复制代码


  • SnapshotTriggerDefinition是用来定义 snapshot 策略的,这里使用了 Axon 提供的策略,是基于 event count 的。

  • AggregateSnapshotter告诉 Axon 我们需要生成这类 snapshot,其实还有一类 snapshot 叫SpringAggregateSnapshotter,如果我们使用的是系统自带的EventSourcingRepository,那么可以直接使用这类 snapshot,实质上看代码,它就是取找了EventSourcingRepository这个 bean 的 AggregateFactory。

  • Repository 初始化的时候指定snapshotTriggerDefinition,那么在执行 load 方法的时候,就会去触发了。


启动项目,PUT 某个 aggregate 5 次之后,你就会发现snapshot_event_entry这张表里多了记录,snapshot 就生成了。


实现 upcaster

upcaster 的概念也是比较好理解的,比如 create 事件,后期加入了一个新的字段,而老的事件没这个字段,这个时候就需要一个升级过程,把老的事件升级成新的事件,大部分的升级都是单个事件内的增减变动字段,也有比较复杂的情况,一个事件变多个,或者多个变一个等等。在 Axon 中把这个过程称为 upcaster,对应的 interface 是 Upcaster,里面只有一个方法Stream<T> upcast(Stream<T> intermediateRepresentations);看这个方法,理论上来说可以实现 N 对 N 的转换,Axon 提供了 one to one 和 one to many 的实现,这两种转换应该可以满足 90% 以上的需求了,我们在开发过程中本身也应该对删除事件谨慎一点,一般不会去删除一个事件,可以在消费的时候忽略就行了,而不是直接删除这个事件。


那么要进行升级,事件本身必须要有个版本的概念,Axon 提供了@Revision注解用以标明事件的版本,如果没有标注则为 null,由于之前的例子都是没有 version 的,那么我们往 contract 里面添加个字段 industryName。更新 event、command、aggregate、model、view,然后给 AbstractEvent加上@Reivision("1.0.0")注解,启动工程并发送一个 GET contracts/{id} 请求原来已经建立的数据,这时候会发现 industryNamenull,下面我们实现默认行业为 “互联网”。


  1. 由于大部分的升级都是针对同一个事件的增减字段,这里建立了SameEventUpCaster


public abstract class SameEventUpCaster extends SingleEventUpcaster {
protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
return outputType(intermediateRepresentation.getType()) != null; }
@Override protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) { return intermediateRepresentation.upcast( outputType(intermediateRepresentation.getType()), JsonNode.class, d -> this.doUpCastPayload(d, intermediateRepresentation), metaData -> this.doUpCastMetaData(metaData, intermediateRepresentation) ); }
public SimpleSerializedType outputType(SerializedType originType) { return new SimpleSerializedType(eventTypeName(), outputRevision(originType.getRevision())); }
public abstract String eventTypeName();
public abstract String outputRevision(String originRevision);
public abstract JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation);
public abstract MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation);
}
复制代码


  1. 建立一个处理事件转换的中心,目的是将转换操作分发下去:


public class ContractEventUpCaster extends SingleEventUpcaster {    private static List<SameEventUpCaster> upCasters = Arrays.asList(        new ContractCreatedEventUpCaster(),        new ContractUpdatedEventUpCaster()    );
@Override protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) { return upCasters.stream().anyMatch(o -> o.canUpcast(intermediateRepresentation)); }
@Override protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) { SameEventUpCaster upCaster = upCasters.stream() .filter(o -> o.canUpcast(intermediateRepresentation)) .findAny().orElseThrow(RuntimeException::new); return upCaster.doUpcast(intermediateRepresentation); }}
复制代码


  1. 实现各个事件的具体升级方式:


public class ContractCreatedEventUpCaster extends SameEventUpCaster {

@Override public String eventTypeName() { return ContractCreatedEvent.class.getTypeName(); }
@Override public String outputRevision(String originRevision) { final HashMap<String, String> revisionConvertMpp = new HashMap<>(); revisionConvertMpp.put(null, "1.0.0");
return revisionConvertMpp.get(originRevision); }
@Override public JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation) {
if (intermediateEventRepresentation.getType().getRevision() == null) { ((ObjectNode) document).put("industryName", "互联网"); }
return document; }
@Override public MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation) { return document; }}
public class ContractUpdatedEventUpCaster extends SameEventUpCaster {

@Override public String eventTypeName() { return ContractUpdatedEvent.class.getTypeName(); }
@Override public String outputRevision(String originRevision) { final HashMap<String, String> revisionConvertMpp = new HashMap<>(); revisionConvertMpp.put(null, "1.0.0");
return revisionConvertMpp.get(originRevision); }
@Override public JsonNode doUpCastPayload(JsonNode document, IntermediateEventRepresentation intermediateEventRepresentation) {
// 每个版本只有一种升级方案,然后链式一步一步升级 if (intermediateEventRepresentation.getType().getRevision() == null) { ((ObjectNode) document).put("industryName", "互联网"); }
return document; }
@Override public MetaData doUpCastMetaData(MetaData document, IntermediateEventRepresentation intermediateEventRepresentation) { return document; }}
复制代码


  1. 添加配置:


    @Bean    public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,                                                 PersistenceExceptionResolver persistenceExceptionResolver,                                                 @Qualifier("eventSerializer") Serializer eventSerializer,                                                 EntityManagerProvider entityManagerProvider,                                                 EventUpcaster contractUpCaster,                                                 TransactionManager transactionManager) {        return JpaEventStorageEngine.builder()            .snapshotSerializer(defaultSerializer)            .upcasterChain(contractUpCaster)            .persistenceExceptionResolver(persistenceExceptionResolver)            .eventSerializer(eventSerializer)            .entityManagerProvider(entityManagerProvider)            .transactionManager(transactionManager)            .build();    }    @Bean    public EventUpcaster contractUpCaster() {        return new ContractEventUpCaster();    }
复制代码


启动项目,再次请求 /contracts/{id} ,数据已经更新了。但是有个问题,那就是 view 视图的数据还没有更新,这部分的数据还是需要编写脚本去做升级的,暂时没有什么更好的办法。


Command 优化

  1. 自定义CommandGateway

  2. Axon 提供的CommandGateway接口,我们看到任意的 object 都能发送,并且不能附带 MetaData,所以这里我们进行一个自定义:


public interface ContractCommandGateway {
// fire and forget void sendCommand(AbstractCommand command);
// method that will wait for a result for 10 seconds @Timeout(value = 6, unit = TimeUnit.SECONDS) Long sendCommandAndWaitForAResult(AbstractCommand command);

// method that will wait for a result for 10 seconds @Timeout(value = 6, unit = TimeUnit.SECONDS) void sendCommandAndWait(AbstractCommand command);
// method that attaches meta data and will wait for a result for 10 seconds @Timeout(value = 6, unit = TimeUnit.SECONDS) ContractAggregate sendCommandAndWaitForAResult(AbstractCommand command, @MetaDataValue("userId") String userId);
// this method will also wait, caller decides how long void sendCommandAndWait(AbstractCommand command, long timeout, TimeUnit unit) throws TimeoutException, InterruptedException;}
复制代码


  1. 增加重试机制,由于分布式的问题,event 在存储的时候还是会发生资源争夺,在 InnoDB 下的表现就是 A 节点存储了 seq 为 1 的 event,B 节点在 A 存储前内存中先读取到了 0 ,然后进行存储,这个时候就会有重复的风险,比较好的做法是将同一个 aggregate 的操作尽量分配到一个节点下面去处理,但是事实上完全避免是不太可能的,所以了这里我们需要一个重试机制去做补偿,Axon 也提供了这个机制,代码如下:


@Slf4jpublic class CommandRetryScheduler implements RetryScheduler {
@Override public boolean scheduleRetry(CommandMessage commandMessage, RuntimeException lastFailure, List<Class<? extends Throwable>[]> failures, Runnable commandDispatch) { log.info(MessageFormat.format("aggregate [{0}] execute [{1}] retry [{2}] time", commandMessage.getIdentifier(), commandMessage.getCommandName(), failures.size()));
if (failures.size() > 2) { return false; }
commandDispatch.run();
return true; }}
复制代码


  1. 截获 command 做一些事情,比如从 Security Context 中获取 user 信息并作为 MetaData 发送,或者为 CreateCommand 类型的 command 自动生成一个 ID,而不用我们手动去构建:


public interface MetaDataUserInterface {
String getName();
Long getUserId();
Long getCustomerId();}
@Getter@Setter@Builderpublic class MetaDataUser implements MetaDataUserInterface {
private String name;
private Long userId;
private Long customerId;}
@AllArgsConstructor@Configurationpublic class CommandInterceptor implements MessageDispatchInterceptor {
private final UIDGenerator uidGenerator;
@Override public BiFunction<Integer, GenericCommandMessage<AbstractCommand>, GenericCommandMessage<AbstractCommand>> handle(List messages) { return (index, message) -> {
// create command 自动生成 ID if (message.getPayload() instanceof CreateContractCommand) { CreateContractCommand payload = (CreateContractCommand) message.getPayload(); payload.setIdentifier(uidGenerator.getId()); }
// 添加 user info 作为 MetaData,由于项目不设计 security 这里就简单的附加一个假的用户 Map<String, MetaDataUserInterface> map = new HashMap<>();
map.put("user", MetaDataUser.builder().customerId(1L).name("Test").userId(2L).build()); return map.isEmpty() ? message : message.andMetaData(map); }; }}
复制代码


这里为了不涉及 Spring Security,我直接 new 了一个测试对象进去,大家可以取自己的 user。


  1. 最后添加配置代码:


    @Bean    public ContractCommandGateway getCommandGateway(SimpleCommandBus simpleCommandBus, CommandInterceptor commandInterceptor) {        return CommandGatewayFactory.builder()            .commandBus(simpleCommandBus)            .retryScheduler(new CommandRetryScheduler())            .dispatchInterceptors(commandInterceptor)            .build()            .createGateway(ContractCommandGateway.class);    }
复制代码


  1. 最后替换掉 command gateway 的调用:


    private final ContractCommandGateway contractCommandGateway;
@PostMapping public Long createContract(@RequestBody @Valid CreateContractCommand command) { return contractCommandGateway.sendCommandAndWaitForAResult(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); contractCommandGateway.sendCommandAndWait(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { contractCommandGateway.sendCommandAndWait(new DeleteContractCommand(id)); }
复制代码


这里大家可能注意到,我将原来 async 的发送都改成了 sync 的,因为 async 的调用,如果过程异常,其实这个请求并不会丢出异常,sendCommandAndWaitForAResult command 可以有返回值,这个在官方文档中也有介绍,就不做过多的介绍。完整的例子 - branch session5


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


2019 年 7 月 04 日 08:464764

评论 2 条评论

发布
用户头像
作者写的不错,为什么InfoQ上更新的这么慢呢?github上好像全部更新完了
2019 年 07 月 04 日 18:56
回复
给大家一个学习和理解的时间哈,如果有更多读者反馈希望一次性看完,我们将加快更新脚步哈~
2019 年 07 月 05 日 08:57
回复
没有更多了
发现更多内容

MySQL 数据查询语言(DQL)& 事务控制语言(TCL)详解

若尘

MySQL

【LeetCode】分割回文串Java题解

HQ数字卡

算法 LeetCode 28天写作

马云最新演讲提出构建新金融体系,看区块链如何担当?

茜茜公主

七日更 3月日更

要拥有必先懂失去怎接受——浅谈前景理论

Justin

心理学 28天写作 游戏设计

面试官就是这么欺负人:new Object()到底占用几个字节?

xcbeyond

Java java对象分析 3月日更

大数据热是华而不实吗?大数据和小数据有什么本质区别

读字节

大数据 物联网 数据隐私 大数据 Google 小数据

架构学习(2021年03月06日)

张小胖

写作平台的一些乱象

ES_her0

28天写作 3月日更

kvm

梅花鹿鹿

kvm

在有限的时间里,拿到通才的帐号,登入无限的游戏。

叶小鍵

工作中迷迷糊糊,不知道自己想要什么?

一笑

28天写作

虚拟化存储

lenka

产品经理 3月日更

新业务团队应用数字化的4个能力

boshi

数字化转型 七日更

关于 Python 中的字符串,我在补充两点,滚雪球学 Python

梦想橡皮擦

Python 28天写作 3月日更

常用工具幕布高级会员获取

白程序员的自习室

电商管理系统之发票子系统设计(二)

长沙造纸农

架构设计 高并发系统设计 电商 电子发票 发票

如何设计三极管控制继电器电路

不脱发的程序猿

28天写作 电路设计 继电器电路设计 三极管 3月日更

面向业务的高可用架构设计

架构精进之路

架构设计 七日更 3月日更

文字君和ta的朋友们

InfoQ写作平台官方

Docker部署ClickHouse监控平台

wjchenge

(28DW-S8-Day15) 在线教育的MOT

mtfelix

在线教育 28天写作 峰值体验 关键时刻 MOT

Elasticsearch Mapping Root Object

escray

elastic 七日更 28天写作 死磕Elasticsearch 60天通过Elastic认证考试 3月日更

数据库概述

在即

数据库 28天写作 28天挑战 3月日更

常见的设计模式原则

一个大红包

设计模式 设计原则 28天挑战 3月日更

翻译:《实用的Python编程》04_00_Overview

codists

Python

专访 | 我与毕玄大师的对话

九叔

Java 阿里巴巴 中间件 架构师 访谈录

新消费品品牌的崛起给户外广告带来了哪些新机遇?

󠀛Ferry

七日更 3月日更

超干货 (实战经验)结合公司业务分析离线数仓建设实践

五分钟学大数据

大数据 数据仓库 28天写作 3月日更

18 个 Java8 日期处理的实践,太有用了!

xcbeyond

Java java8 日期处理 3月日更

最全Hive SQL语法、Hive函数及使用注意事项(一)

五分钟学大数据

大数据 Hive SQL 28天写作 3月日更

打造移动版的开发环境

雨夜的博客

php vagrant 移动版开发环境

DNSPod与开源应用专场

DNSPod与开源应用专场

Event Sourcing和CQRS落地(四):深入使用-Axon-InfoQ