写点什么

Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

  • 2019-07-02
  • 本文字数:4045 字

    阅读完需:约 13 分钟

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了Event Sourcing的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot工程搭建

打开 http://start.spring.io/ 选择对应版本(这里是 2.1.5 )以及相应依赖,这里多选了一些之后会用到的服务:



添加配置文件:


yamlspring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
复制代码


为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加


搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:


添加 Axon 依赖:


  <!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->  <dependency>    <groupId>org.axonframework</groupId>    <artifactId>axon-spring-boot-starter</artifactId>    <version>4.1.1</version>    <exclusions>      <exclusion>        <groupId>org.axonframework</groupId>        <artifactId>axon-server-connector</artifactId>      </exclusion>    </exclusions>  </dependency>  <!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->  <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>27.1-jre</version>  </dependency>
复制代码


解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。


2、Axon 的配置


axon:  serializer:    general: jackson
复制代码


这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate


public interface ContractInterface {
Long getId();
@NotBlank String getName();
@NotBlank String getPartyA();
@NotBlank String getPartyB();}
@Getter@Setter@AllArgsConstructor@NoArgsConstructor@Aggregatepublic class ContractAggregate implements ContractInterface {
@AggregateIdentifier private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;}
复制代码


2、定义 commands


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractCommand {    @TargetAggregateIdentifier    private Long identifier;}
@Getter@Setter@NoArgsConstructorpublic class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@NoArgsConstructor@Getter@Setterpublic class DeleteContractCommand extends AbstractCommand { public DeleteContractCommand(Long identifier) { super(identifier); }}

复制代码


3、定义 events


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractEvent {
@TargetAggregateIdentifier private Long identifier;}

@Getter@Setter@NoArgsConstructorpublic class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@Getter@Setter@NoArgsConstructorpublic class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) { super(identifier); }}

复制代码


这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。


    @CommandHandler    public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {        if (null == command.getIdentifier()) {            command.setIdentifier(generator.getId());        }        AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);    }
@CommandHandler private void on(UpdateContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData); }
@CommandHandler private void on(DeleteContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData); }
@EventSourcingHandler private void on(ContractCreatedEvent event) { this.setIdentifier(event.getIdentifier()); this.onUpdate(event); }
@EventSourcingHandler private void onUpdate(ContractUpdatedEvent event) { this.setName(event.getName()); this.setPartyA(event.getPartyA()); this.setPartyB(event.getPartyB()); }
@EventSourcingHandler(payloadType = ContractDeletedEvent.class) private void on() { this.setDeleted(true); }
复制代码


  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。


@RestController@RequestMapping("/contracts")@AllArgsConstructorpublic class ContractController {
private final CommandGateway commandGateway;
@PostMapping public void createContract(@RequestBody @Valid CreateContractCommand command) { commandGateway.send(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); commandGateway.send(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { commandGateway.send(new DeleteContractCommand(id)); }}
复制代码


启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


2019-07-02 09:099066

评论 1 条评论

发布
用户头像
不错哦 DDD&CQRS&ES 大利器
2019-09-20 15:23
回复
没有更多了
发现更多内容

龙蜥牵手如意 RISC-V 社区,共筑 RISC-V 软件生态新未来

OpenAnolis小助手

开源 操作系统 risc-v 龙蜥社区

缓存穿透的解决方式?—布隆过滤器

量贩潮汐·WholesaleTide

缓存

亲测可用!麒麟环境下安装 SeaTunnel 完整指南

白鲸开源

Linux 开源 安装 麒麟操作系统 Apache SeaTunnel

达摩院玄铁、龙蜥等联合出题,首届 CIE 全国 RISC-V 高水平创新和应用大赛邀您报名

OpenAnolis小助手

操作系统 risc-v 龙蜥社区 龙蜥社区赛题

OASA 6 月月会圆满结束,同步联盟目标、漏洞激励计划等新进展

OpenAnolis小助手

开源 龙蜥社区 龙蜥社区安全联盟 软件适配

VMware Cloud Foundation Automation 9.0 新增功能

sysin

vcf

基于YOLOv8的田间杂草检测识别项目|完整源码数据集+PyQt5界面+完整训练流程+开箱即用!

申公豹

yolov8

倒计时 1 天!龙蜥邀您参加 RISC-V 中国峰会

OpenAnolis小助手

操作系统 risc-v 龙蜥社区 Anolis OS 龙蜥社区 RISC-V SIG

PostgreSQL 数据库中 ETL 操作的实战技巧

谷云科技RestCloud

数据库 postgresql 数据处理 ETL 数据集成

阿里云 Serverless 重塑创蓝云智通信底座,引领行业变革!

阿里巴巴云原生

Serverless

了解案例共创活动

华为云开发者联盟

垃圾回收算法有哪些?了解哪些垃圾回收器?

不在线第一只蜗牛

Java 算法 JVM

VMware Cloud Foundation Operations 9.0 新增功能

sysin

vcf

从论文提示词注入看智能体安全

冯骐

网络安全 智能体 大模型 SQL注入 提示词

给 DolphinScheduler 加一个 SQL Copilot 聊天助手,这个主意怎么样?

白鲸开源

GitHub 开源 AI Apache DolphinScheduler Copilot

DolphinScheduler 如何高效调度 AnalyticDB on Spark 作业?

白鲸开源

MySQL spark 开源 Apache DolphinScheduler analyticDB

【IoTDB 线上小课 17】开源 ≠ 免费,3 分钟总结开源商用指南

Apache IoTDB

特斯拉Optimus V3,来了!!

机器人头条

特斯拉 人形机器人 宇树科技 智元机器人 特斯拉optimus

MySQL 数据同步至 S3file,并接入 Hive 访问:SeaTunnel 实践指南

白鲸开源

MySQL hive Doris 数据同步 Apache SeaTunnel

【重磅发布,限时下载】《WhaleStudio商业案例白皮书》上线,数智化转型最佳实践一次看全

白鲸开源

大数据 白皮书 DataOps 白鲸开源 商业案例

​​超越传统:低代码如何以敏捷开发+高效能撬动企业数智转型性价比天花板?​

电子尖叫食人鱼

低代码

龙蜥中级认证课程上线,Linux 技术进阶新选择

OpenAnolis小助手

Linux 操作系统 龙蜥社区人才培养计划

自 4O 之后,Voice 从 Assistant 到 Agent,新机会都藏在哪些场景里?|Voice Agent 学习笔记

RTE开发者社区

智野双全一车搞定 豪华智能越野“第一车”即将发布

极客天地

回看限时上线!2025 Altair 区域技术交流会精彩演讲回播来啦

Altair RapidMiner

人工智能 AI 仿真 CAE 航空航天

10+热门 AI Agent 框架深度解析:谁更适合你的项目?

测吧(北京)科技有限公司

MiniMax 将完成近 3 亿美元融资,估值超 40 亿美元;Grok 上线动漫 AI 伴侣功能丨日报

RTE开发者社区

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现_文化 & 方法_周国勇_InfoQ精选文章