写点什么

Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

  • 2019-07-02
  • 本文字数:4045 字

    阅读完需:约 13 分钟

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了Event Sourcing的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot工程搭建

打开 http://start.spring.io/ 选择对应版本(这里是 2.1.5 )以及相应依赖,这里多选了一些之后会用到的服务:



添加配置文件:


yamlspring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
复制代码


为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加


搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:


添加 Axon 依赖:


  <!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->  <dependency>    <groupId>org.axonframework</groupId>    <artifactId>axon-spring-boot-starter</artifactId>    <version>4.1.1</version>    <exclusions>      <exclusion>        <groupId>org.axonframework</groupId>        <artifactId>axon-server-connector</artifactId>      </exclusion>    </exclusions>  </dependency>  <!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->  <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>27.1-jre</version>  </dependency>
复制代码


解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。


2、Axon 的配置


axon:  serializer:    general: jackson
复制代码


这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate


public interface ContractInterface {
Long getId();
@NotBlank String getName();
@NotBlank String getPartyA();
@NotBlank String getPartyB();}
@Getter@Setter@AllArgsConstructor@NoArgsConstructor@Aggregatepublic class ContractAggregate implements ContractInterface {
@AggregateIdentifier private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;}
复制代码


2、定义 commands


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractCommand {    @TargetAggregateIdentifier    private Long identifier;}
@Getter@Setter@NoArgsConstructorpublic class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@NoArgsConstructor@Getter@Setterpublic class DeleteContractCommand extends AbstractCommand { public DeleteContractCommand(Long identifier) { super(identifier); }}

复制代码


3、定义 events


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractEvent {
@TargetAggregateIdentifier private Long identifier;}

@Getter@Setter@NoArgsConstructorpublic class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@Getter@Setter@NoArgsConstructorpublic class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) { super(identifier); }}

复制代码


这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。


    @CommandHandler    public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {        if (null == command.getIdentifier()) {            command.setIdentifier(generator.getId());        }        AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);    }
@CommandHandler private void on(UpdateContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData); }
@CommandHandler private void on(DeleteContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData); }
@EventSourcingHandler private void on(ContractCreatedEvent event) { this.setIdentifier(event.getIdentifier()); this.onUpdate(event); }
@EventSourcingHandler private void onUpdate(ContractUpdatedEvent event) { this.setName(event.getName()); this.setPartyA(event.getPartyA()); this.setPartyB(event.getPartyB()); }
@EventSourcingHandler(payloadType = ContractDeletedEvent.class) private void on() { this.setDeleted(true); }
复制代码


  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。


@RestController@RequestMapping("/contracts")@AllArgsConstructorpublic class ContractController {
private final CommandGateway commandGateway;
@PostMapping public void createContract(@RequestBody @Valid CreateContractCommand command) { commandGateway.send(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); commandGateway.send(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { commandGateway.send(new DeleteContractCommand(id)); }}
复制代码


启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


2019-07-02 09:098940

评论 1 条评论

发布
用户头像
不错哦 DDD&CQRS&ES 大利器
2019-09-20 15:23
回复
没有更多了
发现更多内容

Advanced RAG 04:重排序(Re-ranking)技术探讨

Baihai IDP

程序员 AI 企业号 4 月 PK 榜 rag 检索增强生成

Termius:全平台SSH神器,连接管理更高效!

Rose

轻松驾驭大数据,IBM SPSS Statistics 26让统计分析更智能!

Rose

支付系统概述(九):外汇系统

agnostic

支付系统设计与实现

【实时更新】天猫商品详情数据接口采集揭秘!

tbapi

淘宝商品详情数据接口 天猫API接口 天猫商品详情接口 天猫商品数据采集

打造专业级音乐作品,Ableton Live 12 Suite是你的不二之选

Rose

Redis Desktop Manager mac直装版:你的Redis数据库管理神器!

Rose

Proxifier for mac,让网络隐私与安全触手可及

Rose

短视频评论ID采集提取软件|评论关键词下载爬取工具

Geek_16d138

短视频获客 爬虫工具 爬虫技术

Vue.js 如何在Vue应用中导入wasm文件

Changing Lin

Vue 前端

姑苏寻韵~庆开放原子开源大赛 OpenTiny 前端 Web 应用开发挑战赛路演圆满落幕。

OpenTiny社区

开源 前端 低代码 组件库

如何安全、高速、有效地利用IP代理爬取数据

陈老老老板

SimpleMind Pro思维导图mac版:让思维可视化,创新无限可能

Rose

玛雅 maya2025 系统要求 及Autodesk Maya 2025完整版破解资源

Rose

视频批量采集下载工具|短视频提取软件

Geek_16d138

短视频 爬虫工具 爬虫技术

superbuy淘宝代购集运系统类似软件是哪家公司开发的?

tbapi

淘宝代购系统 superbuy

HagoBuy淘宝代购集运系统类似软件是哪家公司开发的?

tbapi

淘宝代购系统

Beyond Compare 4:你的文件比较与合并专家,一键解决繁琐问题

Rose

超级效率神器:一按键盘,翻阅网页如履平地!

wudaxue

JProfiler for Mac v14.0.0永久注册码 Java性能调优利器

Rose

短视频批量下载提取软件功能|采集下载工具

Geek_16d138

短视频获客 爬虫工具

Pandabuy淘宝代购集运系统类似软件是哪家公司开发的?

tbapi

淘宝代购系统 淘宝代购集运系统 Pandabuy

支付系统概述(八):用户资产管理

agnostic

支付系统设计与实现

Macs Fan Control Pro for mac:智能风扇控制,优化散热,提升性能

Rose

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现_文化 & 方法_周国勇_InfoQ精选文章