Event Sourcing和CQRS落地(三):CQRS实现

2019 年 7 月 03 日

Event Sourcing和CQRS落地(三):CQRS实现

在本系列的第二篇文章中,主要介绍了如何实现一个将增删改操作使用 EventSoucing取代的例子,本文将以数据库为例介绍CQRS实现。


实现 CQRS


在实现了 EventSoucing 之后,亟待解决的问题是查询了,理论上同一 Service 可以做到多数据源,甚至多数据库,这篇文章就暂时以同一个数据库为例子,同样使用 JPA 去做 View 的 ORM。


建立 entity


第一步当然是建立对应的 Entity 和 Repository :


@Entity@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class ContractView implements ContractInterface {
@Id @Column(length = 64) private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;
}
@Repositorypublic interface ContractViewRepository extends JpaRepository<ContractView, Long> {
}
复制代码


实现 view 存储


接下来我们就实现事件发生之后,view 的存储,这个过程是由一个独立的 EventHandler 来实现的,新建ContractViewHandler:


@Component@Slf4j@AllArgsConstructor@Transactionalpublic class ContractViewHandler {
private final EventSourcingRepository<ContractAggregate> customEventSourcingRepository;
private final ContractViewRepository contractViewRepository;

/** * 任何 contract 事件发生之后,重新计算 aggregate 的最新状态,转换成 view 之后存储到本地 * * @param event any event from contract * @param message domain event wrapper */ @EventHandler public void on(AbstractContractEvent event, DomainEventMessage<AbstractContractEvent> message) {

log.info(MessageFormat.format("{0}: {1} , seq: {2}, payload: {3}", message.getType(), message.getAggregateIdentifier(), message.getSequenceNumber(), message.getPayload()));
updateContractView(message.getAggregateIdentifier()); }
@Transactional public void updateContractView(String id) { LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = customEventSourcingRepository.load(id); ContractAggregate aggregate = lockAwareAggregate.getWrappedAggregate().getAggregateRoot();

ContractView contractView = contractViewRepository.findById(Long.valueOf(id)).orElse(new ContractView()); contractView.setId(aggregate.getIdentifier()); contractView.setDeleted(aggregate.isDeleted()); contractView.setName(aggregate.getName()); contractView.setPartyA(aggregate.getPartyA()); contractView.setPartyB(aggregate.getPartyB());
contractViewRepository.save(contractView); }}
复制代码


接收到事件之后,把 aggregate 使用mapstruct转换到 view ,然后直接保存就完成了 view 的存储。看起来好像事情干完了,然而一启动就发现EventSourcingRepository这个 bean 找不到,实际上这个 bean 是需要我们自己定义的,在 config 中加入:


    @Bean    public EventSourcingRepository<ContractAggregate> contractAggregateRepository(EventStore eventStore) {        return new EventSourcingRepository<>(ContractAggregate.class, eventStore);    }
复制代码


启动并发送一个 POST 请求,在查看contract_view表,已经有一条记录被插入。


让 view 存储过程异步


似乎看起来,上面的流程已经没什么问题了,然而实际上是有问题的,我们先来看一张图:



这张图描述了一个 Event Soucing 和 CQRS 的理想模型,可以看到 event 存储和 view 的存储应该是分离的,view 的存储是等 event 存储之后异步进行存储,同理事件的发送也是这样,那么我们上面的例子有没有实现 view 的存储是分离的呢,很遗憾,Axon 默认的实现并不是这样的,当 event handler 丢出 runtime exception 之后,事件并没有被存储,也就是说他们应该是都在一个 tranaction 里面。而 Axon 确实提供了 event 的异步处理,官方文档 里有提到过,但是实际上仍然没有解决问题,因为 Axon 在 Subscribe 模式的 handler 调用过程中,并不会等事件事务存储之后再去调用,而是存储的同时依次调用,也就是说虽然可以做到 view 的 handler 异步化,但仍然做不到保证事件的存储之后再去更新 view(理论上 axon 的tracking event 也是可以保证消息发送和 view handler 异步的,但是这个模式下,Axon 会隔一段时间去扫表,并且只有一个节点可以处理,所以我觉得这种方式不太好就没有用 )。所以我们改进一下 view 的流程:



这样 view 的存储是分离了,但是事件是否发出却没有得到保证,也就是可靠消息,关于可靠消息这一块,后面将专门搞个话题处理可靠消息以及可靠消息下 view 层的实现,这里为了只涉及 CQRS 就暂时不做深入。


基于 Aggregate 的查询实现


有了 view 的存储之后,查询方式就和以前传统的 jpa 方式一样了,那么我们有些时候需要从 aggregate 查询最新的状态,比如在 view 处理错误的时候,其实 Axon 也提供了相关的实现,从上面的 view handler 也可以看到,我们可以通过一个 repository 去 load 一个 aggregate,那么通过 aggregate 查询的实现也就比较简单了,这里就不贴代码了。


让 Aggregate 可以查询历史状态


首先我们先看以下 Axon 存储的整体结构:


Repository ->EventStore->EventStorageEngine


其中,EventStorageEngine 提供了最底层的查询存储功能,EventStore进行封装、过滤和优化,整个 aggregate 的过程就是从 DB 中 fetch 所有的事件(这里会做一个 snapshot 的优化),然后将事件发送出去,那么我们为了尽量不去改写底层的查询,可以在EventStore做一个内存过滤然后向 Repository 输出接口,其实这个效率也还行,因为本身一个对象的事件有限,并不会吃掉很多资源去做过滤这件事情。


自定义 EventStore


这里比较简单,只要参照原来EmbeddedEventStore里面的 readEvents 方法,然后将里面的内容按照时间过滤一下。



@Slf4jpublic class CustomEmbeddedEventStore extends EmbeddedEventStore {
public CustomEmbeddedEventStore(Builder builder) { super(builder); }
public static CustomEmbeddedEventStore.Builder builder() { return new CustomEmbeddedEventStore.Builder(); }
public DomainEventStream readEvents(String aggregateIdentifier, Instant timestamp) { Optional<DomainEventMessage<?>> optionalSnapshot; try { optionalSnapshot = storageEngine().readSnapshot(aggregateIdentifier); } catch (Exception | LinkageError e) { log.warn("Error reading snapshot. Reconstructing aggregate from entire event stream.", e); optionalSnapshot = Optional.empty(); } DomainEventStream eventStream; // 加上时间判断,如果 snapshot 在指定的时间之间,那么可以使用,否则直接读取所有的 events if (optionalSnapshot.isPresent() && optionalSnapshot.get().getTimestamp().compareTo(timestamp) <= 0) { DomainEventMessage<?> snapshot = optionalSnapshot.get(); eventStream = DomainEventStream.concat(DomainEventStream.of(snapshot), storageEngine().readEvents(aggregateIdentifier, snapshot.getSequenceNumber() + 1)); } else { eventStream = storageEngine().readEvents(aggregateIdentifier); }
eventStream = new IteratorBackedDomainEventStream(eventStream.asStream().filter(m -> m.getTimestamp().compareTo(timestamp) <= 0).iterator());
Stream<? extends DomainEventMessage<?>> domainEventMessages = stagedDomainEventMessages(aggregateIdentifier); return DomainEventStream.concat(eventStream, DomainEventStream.of(domainEventMessages)); } // 这里为了构建自己的 eventStore 就把 builder 搬过来了,略去了内部实现,具体可以看源码 public static class Builder extends EmbeddedEventStore.Builder { ... }}
复制代码


自定义 Repository


首先分析一下EventSourcingRepository的继承关系:


EventSourcingRepository->LockingRepository->AbstractRepository->Repository 其中Repository是个 interface


其实最好的方式是在 Repository interface 中加一个 load(Long, Instant) 方法,但是这样我们需要改造的地方就有点多了,因为要一层层的添加实现,还是考虑接地气一点,直接在 EventSourcingRepository 中实现这一功能(相当于把几个父类的事情一起干了)。


@Slf4jpublic class CustomEventSourcingRepository<T> extends EventSourcingRepository<T> {
private final CustomEmbeddedEventStore eventStore; private final SnapshotTriggerDefinition snapshotTriggerDefinition; private final AggregateFactory<T> aggregateFactory; private final LockFactory lockFactory;
public static <T> Builder<T> builder(Class<T> aggregateType) { return new Builder<>(aggregateType); }
protected CustomEventSourcingRepository(Builder<T> builder) { super(builder); this.eventStore = builder.eventStore; this.aggregateFactory = builder.buildAggregateFactory(); this.snapshotTriggerDefinition = builder.snapshotTriggerDefinition; this.lockFactory = builder.lockFactory; }
protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Instant timestamp) { DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, timestamp);
SnapshotTrigger trigger = snapshotTriggerDefinition.prepareTrigger(aggregateFactory.getAggregateType()); if (!eventStream.hasNext()) { throw new AggregateNotFoundException(aggregateIdentifier, "The aggregate was not found in the event store"); } EventSourcedAggregate<T> aggregate = EventSourcedAggregate .initialize(aggregateFactory.createAggregateRoot(aggregateIdentifier, eventStream.peek()), aggregateModel(), eventStore, trigger); aggregate.initializeState(eventStream); if (aggregate.isDeleted()) { throw new AggregateDeletedException(aggregateIdentifier); } return aggregate; }
protected LockAwareAggregate<T, EventSourcedAggregate<T>> doLoad(String aggregateIdentifier, Instant timestamp) { Lock lock = lockFactory.obtainLock(aggregateIdentifier); try { final EventSourcedAggregate<T> aggregate = doLoadWithLock(aggregateIdentifier, timestamp); CurrentUnitOfWork.get().onCleanup(u -> lock.release()); return new LockAwareAggregate<>(aggregate, lock); } catch (Exception ex) { log.debug("Exception occurred while trying to load an aggregate. Releasing lock.", ex); lock.release(); throw ex; } }
public LockAwareAggregate<T, EventSourcedAggregate<T>> load(String aggregateIdentifier, Instant timestamp) {
if (timestamp == null) { return this.load(aggregateIdentifier); }
UnitOfWork<?> uow = CurrentUnitOfWork.get(); Map<String, LockAwareAggregate<T, EventSourcedAggregate<T>>> aggregates = managedAggregates(uow); LockAwareAggregate<T, EventSourcedAggregate<T>> aggregate = aggregates.computeIfAbsent(aggregateIdentifier, s -> doLoad(aggregateIdentifier, timestamp)); uow.onRollback(u -> aggregates.remove(aggregateIdentifier)); prepareForCommit(aggregate);
return aggregate; }
public static class Builder<T> extends EventSourcingRepository.Builder<T> { ...// 这里我略去了 builder class 内部的方法,具体的可以看源码。 }}
复制代码


可以看到这里我们在 timestamp 为空的情况下,直接走了原来的 load 方法,剩下的就是参照原来的写法,直接在 CustomEventSourcingRepository 实现几个父类的 loadXXX 方法即可。


另外配置中更新下我们需要的 Repository Bean:


@Configuration@RegisterDefaultEntities(packages = {    "org.axonframework.eventsourcing.eventstore.jpa"})public class AxonContinueConfiguration {
@Bean public CustomEmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) { return CustomEmbeddedEventStore.builder() .storageEngine(storageEngine) .messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore")) .build(); }
@Bean public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore, ParameterResolverFactory parameterResolverFactory) { return CustomEventSourcingRepository.builder(ContractAggregate.class) .eventStore(eventStore) .parameterResolverFactory(parameterResolverFactory) .build(); }
@Bean public EventStorageEngine eventStorageEngine(Serializer defaultSerializer, PersistenceExceptionResolver persistenceExceptionResolver, @Qualifier("eventSerializer") Serializer eventSerializer, AxonConfiguration configuration, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) { return JpaEventStorageEngine.builder() .snapshotSerializer(defaultSerializer) .upcasterChain(configuration.upcasterChain()) .persistenceExceptionResolver(persistenceExceptionResolver) .eventSerializer(eventSerializer) .entityManagerProvider(entityManagerProvider) .transactionManager(transactionManager) .build(); }
@Bean public EventProcessingConfigurer eventProcessingConfigurer(EventProcessingConfigurer eventProcessingConfigurer) { eventProcessingConfigurer.usingSubscribingEventProcessors(); return eventProcessingConfigurer; }}
复制代码


这里由于 EventStorageEngine 的 Auto Config 在自定义了 eventStore 之后就不起作用了,所以这里把 JpaEventStoreAutoConfiguration 中的内容搬过来了。另外在自定义上面三个 bean 之后默认的 event mode 也莫名其妙的变成了 tracking event,所以这里第四个 bean 对默认的 processor 做了修改。


Query Command 的应用


Axon 基于 Command 这种模式,将查询也做了一部分,这里我们尝试下用它的 Query Command 来实现各种查询。


  1. 添加 Query :


@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class QueryContractCommand {    @NotBlank    @NotNull    private String id;
private Instant endDate;}
复制代码


  1. 添加 Handler :


@Component@AllArgsConstructor@Slf4jpublic class ContractQueryHandler {    private final CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository;
@QueryHandler public ContractAggregate on(QueryContractCommand command) { LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = contractAggregateRepository.load(command.getId().toString(), command.getEndDate()); return lockAwareAggregate.getWrappedAggregate().getAggregateRoot(); }}

复制代码


  1. Controller 中发送命令:


    private final QueryGateway queryGateway;
@GetMapping("/{id}") public ContractAggregate getContract(@PathVariable("id") Long id) { QueryContractCommand command = new QueryContractCommand(id, Instant.now());
return queryGateway.query(command, ContractAggregate.class).join(); }
复制代码


到这里 CQRS + EventSoucing 的模型就做完了。完整的例子 - branch session3


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


2019 年 7 月 03 日 08:396918

评论 1 条评论

发布
用户头像
ContractViewHandler 中 为啥还要注入 customEventSourcingRepository?
按说设计的消息中,应该包含足够的信息来更新view了吧。
2020 年 08 月 29 日 15:49
回复
没有更多评论了
发现更多内容

【架构师训练营-作业-1】食堂就餐卡系统设计

Andy

架构师如何做架构(训练营第一课)

看山是山

学习 极客大学架构师训练营 UML

关于架构设计的学习记录

imicode

第一周学习总结

G小调

架构师训练营第一周总结

草原上的奔跑

极客大学架构师训练营

架构师-第一课总结

free[啤酒]

架构师训练营-Week 01 学习总结

华乐彬

学习 架构 架构师 极客大学架构师训练营

第一周-学习总结

molly

极客大学架构师训练营

课后总结1-架构师训练营

进击的炮灰

第一周作业

qqq

极客大学架构师训练营

week1《作业二:根据当周学习情况,完成一篇学习总结》

任鑫

初识架构师

eazonshaw

极客大学架构师训练营

食堂就餐卡系统设计

imicode

食堂就餐卡系统设计文档

JUN

架构师训练营第一周学习总结

a晖

第一周感想

数字

「架构师训练营」作业1:食堂就餐卡系统设计

Amy

极客大学架构师训练营 作业

第一周培训心得

史慧君

架构师如何做架构(第1周学习总结)

李德政

极客大学架构师训练营

如何开始成为一名架构师

Ph0rse

极客大学架构师训练营

架构师训练营第1周作业

无名氏

食堂就餐卡系统设计

chinsun1

架构文档

架构师第一周总结

suke

极客大学架构师训练营

架构师训练营-Week 01 命题作业

华乐彬

极客大学架构师训练营 架构文档 作业

架构师如何去编写设计文档?

阿飞

架构 架构是训练营

食堂就餐卡系统设计

G小调

【架构训练营】第一期

云064

架构师0期 | 食堂就餐卡系统架构设计文档

刁架构

极客大学架构师训练营

第一周作业

Jeremy

食堂就餐卡系统设计

olderwei

食堂就餐卡系统架构设计文档

AIK

极客大学架构师训练营

Event Sourcing和CQRS落地(三):CQRS实现-InfoQ