来 DTDS 全球数字人才发展峰会,与刘润、叶军、快刀青衣畅聊成长>> 了解详情
写点什么

Event Sourcing 和 CQRS 落地(六):实现可靠消息

2019 年 7 月 08 日

Event Sourcing 和 CQRS落地(六):实现可靠消息

在本系列的上一篇文章中,作者介绍了 Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法,本文将主要介绍如何实现可靠消息。


实现可靠消息

什么是可靠消息

微服务盛行的时代下,消息成为了不可缺少的组件,首先我们看一个例子,contract 系统创建了一个合同,然后发送创建合同的消息。看似简单,实际上分析一下它的出错可能性,会有以下几种:


  1. 创建合同成功,发送消息失败;

  2. 创建合同失败,发送消息成功;

  3. 创建合同成功,发送消息成功;

  4. 创建合同失败,发送消息失败;


同时成功或者同时失败,这个情况是一致的,是正确的,我们需要关心的就是不一致的情况。那么最简单的办法,就是让创建合同和发送消息成为一个事务,要么一起成功,要么一起失败,但是这么做的话耦合性太强,本身合同创建成功了,却因为消息发送的失败强制回滚。这个时候,可能就想到了存储消息数据,将合同创建和消息数据的存储作为一个事务,消息发送成功之后再去删除消息数据,定期去扫描未发送的消息数据,来保证消息的发送。但是这么做还是有一定的代价的,需要实现消息的存储,消息存储和合同创建还是耦合在一起的,不过这样的模式到 Event Sourcing 下面那就比较理想了,因为本身消息数据和 event 是一样的,存储了 event 相当于完成了存储消息数据,只需要在 event 下做一个标记即可。


做完了上面这些,就能保证消息一定从 producer 到 broker 这一过程,当然要做到消息不丢,必然产生的结果就是消息可能会重复,情况就是 broker 收到了消息,但是没有通知到 producer,这种情况下 producer 是认为消息没有投递成功的,会出现重复投递的情况。保证了消息一定送达 broker 之后,就是 consumer 和 broker 的关系了,consumer 在消费之后需要告诉 broker 消费成功,否则 broker 需要一直保存这些消息。当然消费端可能需要做更多的事情,比如保证同一 aggregate 事件的顺序消费。下面文章会以在 Axon 框架上做一些拓展,以分别实现 consumer 和 producer。


拓展 DomainEventEntry

上面也说到了,在 Event Sourcing 模式下,我们只需要给事件加上一个是否投递的标志,这里我们就看看如何实现(这里只针对 JPA 做了实现)。


  1. 建立对应的 entity 以取代DomainEventEntry


@Entity(name = "DomainEventEntry")@Getter@Setter@Table(indexes = @Index(columnList = "aggregateIdentifier,sequenceNumber", unique = true))public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> {
@NotNull @Column(columnDefinition = "tinyint(1) default 0") private boolean sent = false;
public CustomDomainEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer) { super(eventMessage, serializer, byte[].class); this.setSent(false); }
/** * Default constructor required by JPA */ protected CustomDomainEventEntry() { }}
复制代码


  1. 建立对应的 storage 以取代 JpaEventStorageEngine:


public class CustomJpaEventStorageEngine extends JpaEventStorageEngine {
public CustomJpaEventStorageEngine(Builder builder) { super(builder); }
@Override protected Object createEventEntity(EventMessage<?> eventMessage, Serializer serializer) { return new CustomDomainEventEntry(asDomainEventMessage(eventMessage), serializer); }
public static CustomJpaEventStorageEngine.Builder builder() { return new CustomJpaEventStorageEngine.Builder(); }
// 此处略去了 builder 部分代码 public static class Builder extends JpaEventStorageEngine.Builder { ... }}
复制代码


  1. 更新对应的 config:


    @Bean    public EventStorageEngine eventStorageEngine(Serializer defaultSerializer,                                                 PersistenceExceptionResolver persistenceExceptionResolver,                                                 @Qualifier("eventSerializer") Serializer eventSerializer,                                                 EntityManagerProvider entityManagerProvider,                                                 EventUpcaster contractUpCaster,                                                 TransactionManager transactionManager) {        return CustomJpaEventStorageEngine.builder()            .snapshotSerializer(defaultSerializer)            .upcasterChain(contractUpCaster)            .persistenceExceptionResolver(persistenceExceptionResolver)            .eventSerializer(eventSerializer)            .entityManagerProvider(entityManagerProvider)            .transactionManager(transactionManager)            .build();    }
复制代码


handler 实现可靠消息的生产端

做好了准备工作再发送消息就比较清晰了,我们需要做的就是在事件存储后去尝试发送消息,然后标记为已发送即可,在之前的 实现 CQRS 中我们留了一个坑,就是 view 端的更新不是在事件存储之后,这里我们就去实现发消息在事件存储之后,然后 view 层去监听消息更新。具体的实现就是利用 entity postPersist 去监听存储,在 transaction 成功后去尝试发送消息,代码如下:



@EntityListeners(CustomDomainEventEntryListener.class)public class CustomDomainEventEntry extends AbstractSequencedDomainEventEntry<byte[]> { ...}
@Component@Slf4jpublic class CustomDomainEventEntryListener { private static CustomDomainEventEntryRepository customDomainEventEntryRepository;
private static ContractPublisher contractPublisher;
@Autowired public void init(CustomDomainEventEntryRepository customDomainEventEntryRepository, ContractPublisher contractPublisher) { this.customDomainEventEntryRepository = customDomainEventEntryRepository; this.contractPublisher = contractPublisher; }
@PostPersist void onPersist(CustomDomainEventEntry entry) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override public void afterCompletion(int status) { if (status == TransactionSynchronization.STATUS_COMMITTED) { CompletableFuture.runAsync(() -> sendEvent(entry.getEventIdentifier())); } } }); }
@Transactional public void sendEvent(String identifier) { CustomDomainEventEntry eventEntry = customDomainEventEntryRepository.findByEventIdentifier(identifier);
if (!eventEntry.isSent()) { contractPublisher.sendEvent(eventEntry); eventEntry.setSent(true); customDomainEventEntryRepository.save(eventEntry); } }}
@Repositorypublic interface CustomDomainEventEntryRepository extends JpaRepository<CustomDomainEventEntry, String> {
/** * 查找事件 * * @param identifier * * @return */ CustomDomainEventEntry findByEventIdentifier(String identifier);}
@Component@AllArgsConstructor@Slf4jpublic class ContractEventPublisher {
public void sendEvent(DomainEvent event) { // use stream to send message here log.info(MessageFormat.format("prepare to sending message : {0}]", new Gson().toJson(event))); }
public void sendEvent(CustomDomainEventEntry event) { // use com.craftsman.eventsourcing.stream to send message here ObjectMapper mapper = new ObjectMapper();
HashMap payload = null; HashMap metaData = null; try { payload = mapper.readValue(event.getPayload().getData(), HashMap.class); metaData = mapper.readValue(event.getMetaData().getData(), HashMap.class); } catch (Exception exception) { log.error(MessageFormat.format("byte[] to string failed; exception: {0}", exception)); }
if (payload == null || metaData == null) { log.warn(MessageFormat.format("nothing to send; exception: {0}", event.getEventIdentifier())); return; }
DomainEvent domainEvent = new DomainEvent( event.getType(), event.getAggregateIdentifier(), event.getPayload().getType().getName(), event.getPayload().getType().getRevision(), event.getSequenceNumber(), event.getEventIdentifier(), event.getTimestamp(), payload, metaData);
this.sendEvent(domainEvent); }}
复制代码


  • DomainEvent 是为了统一消息的格式包装的类,具体可以看代码这里就不贴了

  • ContractEventPublisher 作为消息统一发送出口,为了不涉及 rabbitmq 暂时以 log 的形式代替消息发送,后续在 Spring Cloud Stream 优化中实现完整的流程


实现消费端

DomainEvent的属性中,我们可以看到有一个sequenceNumber字段,这个字段可以用来保证同一 aggregate 的事件顺序,那么在消费端可以以 type aggregate sequenceNumber 形成一张表,用来记录每个 aggregate 的最新状态,如果 aggregate 数据量比较大,也可以分表存储,一般 aggregate_id 索引之后,单表性能在百万级别,应该都没什么问题。这样在消费的时候先比较 sequenceNumber 差异,只消费差异值为 1 的事件,就可以保证同一 aggregate 的事件被顺序消费。之后会写篇关于 Spring Cloud Stream 的文章,用来作为服务之间的桥梁,并解决框架用 header 作为路由之后引起的问题,这里暂时不做深入。完整的例子 - branch session7


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


《Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化》


2019 年 7 月 08 日 10:203311

评论

发布
暂无评论
发现更多内容

超级全面的测试用例设计,你确定不来看一看?

程序员一凡

软件测试 测试用例 测试工程师

数字资产的发行是全球都无法阻挡的大趋势!

CECBC区块链专委会

数字资产

福利拉满!腾讯云大神亲码Redis深度手册【基础+应用+原理+集群+扩展+源码】六大核心知识,全部一次让你搞懂

神奇小汤圆

Java 数据库 程序员 架构

入门到精通!阿里码农熬了2晚整理的Java工具,真香

飞飞JAva

Java java工具类

展开说说,Spring Bean IOC、AOP 循环依赖

小傅哥

Java spring 小傅哥 ioc 循环依赖

建筑行业全周期区块链产融平台

CECBC区块链专委会

建筑行业

NumPy之:结构化数组详解

程序那些事

Python 数据分析 Numpy 程序那些事

就这一次,阿里大牛终于把困扰我多年的Spring Boot【入门+基础+进阶+项目实战】全部讲清楚了

神奇小汤圆

Java 编程 程序员 架构

区块链与物联网融合发展都会有哪些机遇与挑战

CECBC区块链专委会

区块链

基于golang分布式爬虫系统的架构体系v1.0

雨夜的博客

go 架构

太简单了!这套Java异常处理的总结,80%的人都没看过

牛哄哄的java大师

Java

进击的速溶咖啡:当中国AI开始玩工业化

脑极体

模块三作业:消息队列详细设计文档

薛定谔的指南针

架构实战营

架构实战营 模块二 作业

Pitt

架构实战营

Golang实现文件复制的技巧

liuzhen007

golang 5月日更

算法训练营 - 学习笔记 - 第五周

心在飞

喊话特斯拉、保时捷、戴森三巨头,户外造势热点!

󠀛Ferry

5月日更

新手学习微服务,得先看看这篇文章

Java架构师迁哥

比比看!Java时间和空间的复杂度算法,3分钟你能学会哪个?

java专业爱好者

Java

采取有效云网络安全策略的5个基本步骤

浪潮云

云计算

打破思维定式(二)

Changing Lin

5月日更

4.2 Go语言从入门到精通:延迟函数 defer

xcbeyond

defer go语言 Go语言从入门到精通 5月日更

Spring-技术专题-设计模式和研究分析

李浩宇/Alex

spring 设计模式 原理分析 5月日更

Vue Router 10 条高级技巧

Thrash

技巧

我不懂区块链,但不影响我改变世界!

北熊说链

区块链 数字货币 太空猫社区联盟

ipfs挖矿多少钱一台?ipfs挖矿到底靠谱吗?ipfs投资有风险吗?

v:IPFS456

Filecoin IPFS挖矿值得投资吗 IPFS矿机多少钱一台 IPFS矿机投资靠谱吗 IPFS挖矿可靠吗

假期过得是真快啊

IT蜗壳-Tango

5月日更

自研消息队列架构设计文档

菠萝吹雪—Code

架构实战营

postgresql数据库 timescaledb 时序库 超级表 块的压缩(compress_chunk()的应用)

Yang

数据库 postgresql

网络攻防学习笔记 Day5

穿过生命散发芬芳

5月日更 网络攻防

OAuth 2.0 与 OIDC

Zhang

OAuth 2.0 OIDC

「中国技术开放日·长沙站」现场直播

「中国技术开放日·长沙站」现场直播

Event Sourcing 和 CQRS落地(六):实现可靠消息-InfoQ