Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

阅读数:4356 2019 年 7 月 2 日 09:09

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了 Event Sourcing 的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot 工程搭建

打开 http://start.spring.io/ 选择对应版本 (这里是 2.1.5 ) 以及相应依赖,这里多选了一些之后会用到的服务:

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

添加配置文件:

复制代码
yaml
spring:
application:
name: event-sourcing-service
datasource:
url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE
username: root
password: root
jpa:
hibernate:
ddl-auto: update
use-new-id-generator-mappings: false
show-sql: false
properties:
hibernate.dialect: org.hibernate.dialect.MySQL55Dialect

为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加

搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:

添加 Axon 依赖:

复制代码
<!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.1.1</version>
<exclusions>
<exclusion>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>27.1-jre</version>
</dependency>

解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。

2、Axon 的配置

复制代码
axon:
serializer:
general: jackson

这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate

复制代码
public interface ContractInterface {
Long getId();
@NotBlank
String getName();
@NotBlank
String getPartyA();
@NotBlank
String getPartyB();
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Aggregate
public class ContractAggregate implements ContractInterface {
@AggregateIdentifier
private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;
}

2、定义 commands

复制代码
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class AbstractCommand {
@TargetAggregateIdentifier
private Long identifier;
}
@Getter
@Setter
@NoArgsConstructor
public class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) {
super(identifier);
this.name = name;
this.partyA = partyA;
this.partyB = partyB;
}
}
@Getter
@Setter
@NoArgsConstructor
public class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) {
super(identifier, name, partyA, partyB);
}
}
@NoArgsConstructor
@Getter
@Setter
public class DeleteContractCommand extends AbstractCommand {
public DeleteContractCommand(Long identifier) {
super(identifier);
}
}

3、定义 events

复制代码
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class AbstractEvent {
@TargetAggregateIdentifier
private Long identifier;
}
@Getter
@Setter
@NoArgsConstructor
public class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) {
super(identifier);
this.name = name;
this.partyA = partyA;
this.partyB = partyB;
}
}
@Getter
@Setter
@NoArgsConstructor
public class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) {
super(identifier, name, partyA, partyB);
}
}
@Getter
@Setter
@NoArgsConstructor
public class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) {
super(identifier);
}
}

这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。

复制代码
@CommandHandler
public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {
if (null == command.getIdentifier()) {
command.setIdentifier(generator.getId());
}
AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);
}
@CommandHandler
private void on(UpdateContractCommand command, MetaData metaData) {
AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);
}
@CommandHandler
private void on(DeleteContractCommand command, MetaData metaData) {
AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData);
}
@EventSourcingHandler
private void on(ContractCreatedEvent event) {
this.setIdentifier(event.getIdentifier());
this.onUpdate(event);
}
@EventSourcingHandler
private void onUpdate(ContractUpdatedEvent event) {
this.setName(event.getName());
this.setPartyA(event.getPartyA());
this.setPartyB(event.getPartyB());
}
@EventSourcingHandler(payloadType = ContractDeletedEvent.class)
private void on() {
this.setDeleted(true);
}
  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。

复制代码
@RestController
@RequestMapping("/contracts")
@AllArgsConstructor
public class ContractController {
private final CommandGateway commandGateway;
@PostMapping
public void createContract(@RequestBody @Valid CreateContractCommand command) {
commandGateway.send(command);
}
@PutMapping("/{id}")
public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) {
command.setIdentifier(id);
commandGateway.send(command);
}
@DeleteMapping("/{id}")
public void deleteContract(@PathVariable("id") Long id) {
commandGateway.send(new DeleteContractCommand(id));
}
}

启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4

相关文章:

《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》

收藏

评论

微博

用户头像
发表评论

注册/登录 InfoQ 发表评论

最新评论

用户头像
厕所看C 2019 年 09 月 20 日 15:23 0 回复
不错哦 DDD&CQRS&ES 大利器
没有更多了