写点什么

Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现

  • 2019-07-02
  • 本文字数:4045 字

    阅读完需:约 13 分钟

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现

在本系列的第一篇文章,简要介绍了Event Sourcing的基本原理以及如何实现 UID Generator,本文将展开介绍 Event-Sourcing 实现。

实现 Event Soucing

在了解相关基础之后,这里会以最简单的方式实现一个 EventSourcing 的例子,然后逐渐通过之后的过程丰富,本篇内容会实现一个将增删改操作使用 EventSoucing 取代的例子,读取部分暂时不做涉及。

Spring Boot工程搭建

打开 http://start.spring.io/ 选择对应版本(这里是 2.1.5 )以及相应依赖,这里多选了一些之后会用到的服务:



添加配置文件:


yamlspring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect
复制代码


为了便于测试,这里开启了 JPA 自动更新,开发中你可能会使用 flyway 或者其他工具来管理数据库 schema 以及数据迁移。至此,一个简单的服务就搭建完毕。

Axon 依赖和配置

1、依赖添加


搭建好工程之后,我们正式开始做 Axon 相关事情,这个工程用一个简单的 Contract 业务来作为 demo,首先添加依赖:


添加 Axon 依赖:


  <!-- https://mvnrepository.com/artifact/org.axonframework/axon-spring-boot-starter -->  <dependency>    <groupId>org.axonframework</groupId>    <artifactId>axon-spring-boot-starter</artifactId>    <version>4.1.1</version>    <exclusions>      <exclusion>        <groupId>org.axonframework</groupId>        <artifactId>axon-server-connector</artifactId>      </exclusion>    </exclusions>  </dependency>  <!-- https://mvnrepository.com/artifact/com.google.guava/guava axon 依赖了 guava-->  <dependency>    <groupId>com.google.guava</groupId>    <artifactId>guava</artifactId>    <version>27.1-jre</version>  </dependency>
复制代码


解释一下,axon 从 4.0 开始加入了 axon-server ,目的是将 event 存储和分发剥离,以更好地发挥微服务的优点,但是在项目中引用这么一个不透明的东西感觉上不太好,所以这里就不采用 axon-server 了。axon-spring-boot-starter 会采用 EmbeddedEventStore ,默认使用的是 JPA ,启动之后,你也会发现 JPA 在数据库中创建了 5 张表,分别是 association_value_entry domain_event_entry saga_entry snapshot_event_entry token_entrydomain_event_entry 用来存储事件,snapshot_event_entry 用来存储快照,token_entry 用来记录 tracking event 的争夺,其他两张表用来存储 saga。


2、Axon 的配置


axon:  serializer:    general: jackson
复制代码


这里指定使用Jackson来进行序列化,模式 Axon 是使用 XML 进行序列化的,不方便查看,并且之后的事件升级都会很麻烦,所以进行了替换。

Axon Domain Model 定义

1、定义 aggregate


public interface ContractInterface {
Long getId();
@NotBlank String getName();
@NotBlank String getPartyA();
@NotBlank String getPartyB();}
@Getter@Setter@AllArgsConstructor@NoArgsConstructor@Aggregatepublic class ContractAggregate implements ContractInterface {
@AggregateIdentifier private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;}
复制代码


2、定义 commands


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractCommand {    @TargetAggregateIdentifier    private Long identifier;}
@Getter@Setter@NoArgsConstructorpublic class UpdateContractCommand extends AbstractCommand implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public UpdateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class CreateContractCommand extends UpdateContractCommand {
public CreateContractCommand(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@NoArgsConstructor@Getter@Setterpublic class DeleteContractCommand extends AbstractCommand { public DeleteContractCommand(Long identifier) { super(identifier); }}

复制代码


3、定义 events


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class AbstractEvent {
@TargetAggregateIdentifier private Long identifier;}

@Getter@Setter@NoArgsConstructorpublic class ContractUpdatedEvent extends AbstractEvent implements ContractInterface {
private String name;
private String partyA;
private String partyB;
public ContractUpdatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier); this.name = name; this.partyA = partyA; this.partyB = partyB; }}
@Getter@Setter@NoArgsConstructorpublic class ContractCreatedEvent extends ContractUpdatedEvent {
public ContractCreatedEvent(Long identifier, String name, String partyA, String partyB) { super(identifier, name, partyA, partyB); }}
@Getter@Setter@NoArgsConstructorpublic class ContractDeletedEvent extends AbstractEvent {
public ContractDeletedEvent(Long identifier) { super(identifier); }}

复制代码


这里只抽象了统一 command 和 event,在实际业务开发过程中可以有更多抽象。

实现各个 Handler

在这里,我们实现了 Event Soucing 的模式,把各个 Handler 都放在 aggregate 里面。


    @CommandHandler    public ContractAggregate(CreateContractCommand command, MetaData metaData, UIDGenerator generator) {        if (null == command.getIdentifier()) {            command.setIdentifier(generator.getId());        }        AggregateLifecycle.apply(new ContractCreatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData);    }
@CommandHandler private void on(UpdateContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractUpdatedEvent(command.getIdentifier(), command.getName(), command.getPartyA(), command.getPartyB()), metaData); }
@CommandHandler private void on(DeleteContractCommand command, MetaData metaData) { AggregateLifecycle.apply(new ContractDeletedEvent(command.getIdentifier()), metaData); }
@EventSourcingHandler private void on(ContractCreatedEvent event) { this.setIdentifier(event.getIdentifier()); this.onUpdate(event); }
@EventSourcingHandler private void onUpdate(ContractUpdatedEvent event) { this.setName(event.getName()); this.setPartyA(event.getPartyA()); this.setPartyB(event.getPartyB()); }
@EventSourcingHandler(payloadType = ContractDeletedEvent.class) private void on() { this.setDeleted(true); }
复制代码


  • 这里看到有一个 CommandHandler 注解写在了构造方法上,那么在处理这个 Command 时,将会自动创建一个对象。另外,这里的 MetaData 是在 command 发送时顺带的附加信息,可以是用户信息,机器信息等,后续也会涉及这部分,这里就不深入探讨了。

  • 启动项目后,JPA 应该会在数据库中生成几张表,其中 worker_id 是之前我们编写的 Spring Cloud 的 ID 生成器所产生的,剩余的表都是 Axon 自己产生的,这里我使用的数据库并没有使用mb4编码,因为在mb4编码下 Axon 的索引会过长,这个也不是问题,因为实际开发过程中,我们可以将生成的语句自己修改了下,将主键的长度改小一点即可,后面在完善过程也会涉及。

编写接口

aggreate 以及各 handler 都已经写完了,那么我们开始编写接口,让工程可以顺利跑起来。


@RestController@RequestMapping("/contracts")@AllArgsConstructorpublic class ContractController {
private final CommandGateway commandGateway;
@PostMapping public void createContract(@RequestBody @Valid CreateContractCommand command) { commandGateway.send(command); }
@PutMapping("/{id}") public void updateContract(@PathVariable("id") Long id, @RequestBody @Valid UpdateContractCommand command) { command.setIdentifier(id); commandGateway.send(command); }
@DeleteMapping("/{id}") public void deleteContract(@PathVariable("id") Long id) { commandGateway.send(new DeleteContractCommand(id)); }}
复制代码


启动工程并顺序执行 POST UPDATE DELETE 操作,你会发现domain_event_entry中多了三条记录,这张表就是用来记录事件的,可以看到这里详细的记录了每个事件的发生时间、内容、附加信息、类型等信息。至此本次 Event Soucing 的例子就结束了,主要将传统的增删改操作改造成了事件记录的形式。下次将会从 CQRS 角度实现数据读取。完整示例 - branch session4


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


2019-07-02 09:098985

评论 1 条评论

发布
用户头像
不错哦 DDD&CQRS&ES 大利器
2019-09-20 15:23
回复
没有更多了
发现更多内容

高性能网络SIG月度动态:自研 IPPROTO_SMC 贡献 Linux 社区,virtio 增加多项优化

OpenAnolis小助手

高性能网络 龙蜥社区 龙蜥社区SIG

mac单机游戏推荐:星际争霸母巢之战 for Mac v1.16.1汉化版

你的猪会飞吗

Mac游戏下载 Mac游戏推荐

软件测试学习笔记丨redis的穿透、击穿、雪崩有什么不同点?

测试人

redis 软件测试 测试开发

KDD 2024 | 专业实力再获认可!网易伏羲四篇论文入选

网易伏羲

人工智能 论文 KDD 网易伏羲

Lightroom Classic 2020 for mac/win (lrC 2020) 中文直装版

你的猪会飞吗

Mac电脑软件 苹果电脑软件下载

长文本创作者福音来了,百度文库新产品「橙篇」一口气生成10万字

极客天地

Flutter中的异步和多进程

凌宇之蓝

Leangoo一站式敏捷研发协同平台,助力敏捷企业高效协同

顿顿顿

敏捷开发 敏捷工具 scrum工具

Telegram的强大社交属性下TON链项目的潜力及开发前景

区块链软件开发推广运营

dapp开发 区块链开发 NFT开发 公链开发

Apache Paimon统一大数据湖存储底座

Apache Flink

大数据 flink 流批一体 paimon

软件测试学习笔记丨测试体系与测试方案设计

测试人

软件测试 测试开发

百度二面,有点小激动!附面试题

王磊

Java

开始报名啦!智能可观测运维技术 MeetUp 议题硬核来袭

OpenAnolis小助手

操作系统 系统运维 可观测运维技术

晶澳太阳能选择 TDengine 加强数据管理,助力实现双碳目标

TDengine

数据库 tdengine 时序数据库

一文读懂Lumoz节点的潜力与收益,加密收益新范式

加密眼界

迁移方案详解 | 使用YMP从异构数据库迁移到YashanDB

极客天地

Event Sourcing 和 CQRS落地(二):Event-Sourcing 实现_文化 & 方法_周国勇_InfoQ精选文章