在本系列的第二篇文章中,主要介绍了如何实现一个将增删改操作使用 EventSoucing取代的例子,本文将以数据库为例介绍CQRS实现。
实现 CQRS
在实现了 EventSoucing 之后,亟待解决的问题是查询了,理论上同一 Service 可以做到多数据源,甚至多数据库,这篇文章就暂时以同一个数据库为例子,同样使用 JPA 去做 View 的 ORM。
建立 entity
第一步当然是建立对应的 Entity 和 Repository :
@Entity@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class ContractView implements ContractInterface {
@Id @Column(length = 64) private Long id;
private String name;
private String partyA;
private String partyB;
private boolean deleted = false;
}
@Repositorypublic interface ContractViewRepository extends JpaRepository<ContractView, Long> {
}
复制代码
实现 view 存储
接下来我们就实现事件发生之后,view 的存储,这个过程是由一个独立的 EventHandler 来实现的,新建ContractViewHandler:
@Component@Slf4j@AllArgsConstructor@Transactionalpublic class ContractViewHandler {
private final EventSourcingRepository<ContractAggregate> customEventSourcingRepository;
private final ContractViewRepository contractViewRepository;
* 任何 contract 事件发生之后,重新计算 aggregate 的最新状态,转换成 view 之后存储到本地 * * @param event any event from contract * @param message domain event wrapper */ @EventHandler public void on(AbstractContractEvent event, DomainEventMessage<AbstractContractEvent> message) {
log.info(MessageFormat.format("{0}: {1} , seq: {2}, payload: {3}", message.getType(), message.getAggregateIdentifier(), message.getSequenceNumber(), message.getPayload()));
updateContractView(message.getAggregateIdentifier()); }
@Transactional public void updateContractView(String id) { LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = customEventSourcingRepository.load(id); ContractAggregate aggregate = lockAwareAggregate.getWrappedAggregate().getAggregateRoot();
ContractView contractView = contractViewRepository.findById(Long.valueOf(id)).orElse(new ContractView()); contractView.setId(aggregate.getIdentifier()); contractView.setDeleted(aggregate.isDeleted()); contractView.setName(aggregate.getName()); contractView.setPartyA(aggregate.getPartyA()); contractView.setPartyB(aggregate.getPartyB());
contractViewRepository.save(contractView); }}
复制代码
接收到事件之后,把 aggregate 使用mapstruct转换到 view ,然后直接保存就完成了 view 的存储。看起来好像事情干完了,然而一启动就发现EventSourcingRepository这个 bean 找不到,实际上这个 bean 是需要我们自己定义的,在 config 中加入:
@Bean public EventSourcingRepository<ContractAggregate> contractAggregateRepository(EventStore eventStore) { return new EventSourcingRepository<>(ContractAggregate.class, eventStore); }
复制代码
启动并发送一个 POST 请求,在查看contract_view表,已经有一条记录被插入。
让 view 存储过程异步
似乎看起来,上面的流程已经没什么问题了,然而实际上是有问题的,我们先来看一张图:
这张图描述了一个 Event Soucing 和 CQRS 的理想模型,可以看到 event 存储和 view 的存储应该是分离的,view 的存储是等 event 存储之后异步进行存储,同理事件的发送也是这样,那么我们上面的例子有没有实现 view 的存储是分离的呢,很遗憾,Axon 默认的实现并不是这样的,当 event handler 丢出 runtime exception 之后,事件并没有被存储,也就是说他们应该是都在一个 tranaction 里面。而 Axon 确实提供了 event 的异步处理,官方文档 里有提到过,但是实际上仍然没有解决问题,因为 Axon 在 Subscribe 模式的 handler 调用过程中,并不会等事件事务存储之后再去调用,而是存储的同时依次调用,也就是说虽然可以做到 view 的 handler 异步化,但仍然做不到保证事件的存储之后再去更新 view(理论上 axon 的tracking event 也是可以保证消息发送和 view handler 异步的,但是这个模式下,Axon 会隔一段时间去扫表,并且只有一个节点可以处理,所以我觉得这种方式不太好就没有用 )。所以我们改进一下 view 的流程:
这样 view 的存储是分离了,但是事件是否发出却没有得到保证,也就是可靠消息,关于可靠消息这一块,后面将专门搞个话题处理可靠消息以及可靠消息下 view 层的实现,这里为了只涉及 CQRS 就暂时不做深入。
基于 Aggregate 的查询实现
有了 view 的存储之后,查询方式就和以前传统的 jpa 方式一样了,那么我们有些时候需要从 aggregate 查询最新的状态,比如在 view 处理错误的时候,其实 Axon 也提供了相关的实现,从上面的 view handler 也可以看到,我们可以通过一个 repository 去 load 一个 aggregate,那么通过 aggregate 查询的实现也就比较简单了,这里就不贴代码了。
让 Aggregate 可以查询历史状态
首先我们先看以下 Axon 存储的整体结构:
Repository ->EventStore->EventStorageEngine
其中,EventStorageEngine 提供了最底层的查询存储功能,EventStore进行封装、过滤和优化,整个 aggregate 的过程就是从 DB 中 fetch 所有的事件(这里会做一个 snapshot 的优化),然后将事件发送出去,那么我们为了尽量不去改写底层的查询,可以在EventStore做一个内存过滤然后向 Repository 输出接口,其实这个效率也还行,因为本身一个对象的事件有限,并不会吃掉很多资源去做过滤这件事情。
自定义 EventStore
这里比较简单,只要参照原来EmbeddedEventStore里面的 readEvents 方法,然后将里面的内容按照时间过滤一下。
@Slf4jpublic class CustomEmbeddedEventStore extends EmbeddedEventStore {
public CustomEmbeddedEventStore(Builder builder) { super(builder); }
public static CustomEmbeddedEventStore.Builder builder() { return new CustomEmbeddedEventStore.Builder(); }
public DomainEventStream readEvents(String aggregateIdentifier, Instant timestamp) { Optional<DomainEventMessage<?>> optionalSnapshot; try { optionalSnapshot = storageEngine().readSnapshot(aggregateIdentifier); } catch (Exception | LinkageError e) { log.warn("Error reading snapshot. Reconstructing aggregate from entire event stream.", e); optionalSnapshot = Optional.empty(); } DomainEventStream eventStream; if (optionalSnapshot.isPresent() && optionalSnapshot.get().getTimestamp().compareTo(timestamp) <= 0) { DomainEventMessage<?> snapshot = optionalSnapshot.get(); eventStream = DomainEventStream.concat(DomainEventStream.of(snapshot), storageEngine().readEvents(aggregateIdentifier, snapshot.getSequenceNumber() + 1)); } else { eventStream = storageEngine().readEvents(aggregateIdentifier); }
eventStream = new IteratorBackedDomainEventStream(eventStream.asStream().filter(m -> m.getTimestamp().compareTo(timestamp) <= 0).iterator());
Stream<? extends DomainEventMessage<?>> domainEventMessages = stagedDomainEventMessages(aggregateIdentifier); return DomainEventStream.concat(eventStream, DomainEventStream.of(domainEventMessages)); } public static class Builder extends EmbeddedEventStore.Builder { ... }}
复制代码
自定义 Repository
首先分析一下EventSourcingRepository的继承关系:
EventSourcingRepository->LockingRepository->AbstractRepository->Repository 其中Repository是个 interface
其实最好的方式是在 Repository interface 中加一个 load(Long, Instant) 方法,但是这样我们需要改造的地方就有点多了,因为要一层层的添加实现,还是考虑接地气一点,直接在 EventSourcingRepository 中实现这一功能(相当于把几个父类的事情一起干了)。
@Slf4jpublic class CustomEventSourcingRepository<T> extends EventSourcingRepository<T> {
private final CustomEmbeddedEventStore eventStore; private final SnapshotTriggerDefinition snapshotTriggerDefinition; private final AggregateFactory<T> aggregateFactory; private final LockFactory lockFactory;
public static <T> Builder<T> builder(Class<T> aggregateType) { return new Builder<>(aggregateType); }
protected CustomEventSourcingRepository(Builder<T> builder) { super(builder); this.eventStore = builder.eventStore; this.aggregateFactory = builder.buildAggregateFactory(); this.snapshotTriggerDefinition = builder.snapshotTriggerDefinition; this.lockFactory = builder.lockFactory; }
protected EventSourcedAggregate<T> doLoadWithLock(String aggregateIdentifier, Instant timestamp) { DomainEventStream eventStream = eventStore.readEvents(aggregateIdentifier, timestamp);
SnapshotTrigger trigger = snapshotTriggerDefinition.prepareTrigger(aggregateFactory.getAggregateType()); if (!eventStream.hasNext()) { throw new AggregateNotFoundException(aggregateIdentifier, "The aggregate was not found in the event store"); } EventSourcedAggregate<T> aggregate = EventSourcedAggregate .initialize(aggregateFactory.createAggregateRoot(aggregateIdentifier, eventStream.peek()), aggregateModel(), eventStore, trigger); aggregate.initializeState(eventStream); if (aggregate.isDeleted()) { throw new AggregateDeletedException(aggregateIdentifier); } return aggregate; }
protected LockAwareAggregate<T, EventSourcedAggregate<T>> doLoad(String aggregateIdentifier, Instant timestamp) { Lock lock = lockFactory.obtainLock(aggregateIdentifier); try { final EventSourcedAggregate<T> aggregate = doLoadWithLock(aggregateIdentifier, timestamp); CurrentUnitOfWork.get().onCleanup(u -> lock.release()); return new LockAwareAggregate<>(aggregate, lock); } catch (Exception ex) { log.debug("Exception occurred while trying to load an aggregate. Releasing lock.", ex); lock.release(); throw ex; } }
public LockAwareAggregate<T, EventSourcedAggregate<T>> load(String aggregateIdentifier, Instant timestamp) {
if (timestamp == null) { return this.load(aggregateIdentifier); }
UnitOfWork<?> uow = CurrentUnitOfWork.get(); Map<String, LockAwareAggregate<T, EventSourcedAggregate<T>>> aggregates = managedAggregates(uow); LockAwareAggregate<T, EventSourcedAggregate<T>> aggregate = aggregates.computeIfAbsent(aggregateIdentifier, s -> doLoad(aggregateIdentifier, timestamp)); uow.onRollback(u -> aggregates.remove(aggregateIdentifier)); prepareForCommit(aggregate);
return aggregate; }
public static class Builder<T> extends EventSourcingRepository.Builder<T> { ... }}
复制代码
可以看到这里我们在 timestamp 为空的情况下,直接走了原来的 load 方法,剩下的就是参照原来的写法,直接在 CustomEventSourcingRepository 实现几个父类的 loadXXX 方法即可。
另外配置中更新下我们需要的 Repository Bean:
@Configuration@RegisterDefaultEntities(packages = { "org.axonframework.eventsourcing.eventstore.jpa"})public class AxonContinueConfiguration {
@Bean public CustomEmbeddedEventStore eventStore(EventStorageEngine storageEngine, AxonConfiguration configuration) { return CustomEmbeddedEventStore.builder() .storageEngine(storageEngine) .messageMonitor(configuration.messageMonitor(EventStore.class, "eventStore")) .build(); }
@Bean public CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository(CustomEmbeddedEventStore eventStore, ParameterResolverFactory parameterResolverFactory) { return CustomEventSourcingRepository.builder(ContractAggregate.class) .eventStore(eventStore) .parameterResolverFactory(parameterResolverFactory) .build(); }
@Bean public EventStorageEngine eventStorageEngine(Serializer defaultSerializer, PersistenceExceptionResolver persistenceExceptionResolver, @Qualifier("eventSerializer") Serializer eventSerializer, AxonConfiguration configuration, EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) { return JpaEventStorageEngine.builder() .snapshotSerializer(defaultSerializer) .upcasterChain(configuration.upcasterChain()) .persistenceExceptionResolver(persistenceExceptionResolver) .eventSerializer(eventSerializer) .entityManagerProvider(entityManagerProvider) .transactionManager(transactionManager) .build(); }
@Bean public EventProcessingConfigurer eventProcessingConfigurer(EventProcessingConfigurer eventProcessingConfigurer) { eventProcessingConfigurer.usingSubscribingEventProcessors(); return eventProcessingConfigurer; }}
复制代码
这里由于 EventStorageEngine 的 Auto Config 在自定义了 eventStore 之后就不起作用了,所以这里把 JpaEventStoreAutoConfiguration 中的内容搬过来了。另外在自定义上面三个 bean 之后默认的 event mode 也莫名其妙的变成了 tracking event,所以这里第四个 bean 对默认的 processor 做了修改。
Query Command 的应用
Axon 基于 Command 这种模式,将查询也做了一部分,这里我们尝试下用它的 Query Command 来实现各种查询。
添加 Query :
@Getter@Setter@AllArgsConstructor@NoArgsConstructorpublic class QueryContractCommand { @NotBlank @NotNull private String id;
private Instant endDate;}
复制代码
添加 Handler :
@Component@AllArgsConstructor@Slf4jpublic class ContractQueryHandler { private final CustomEventSourcingRepository<ContractAggregate> contractAggregateRepository;
@QueryHandler public ContractAggregate on(QueryContractCommand command) { LockAwareAggregate<ContractAggregate, EventSourcedAggregate<ContractAggregate>> lockAwareAggregate = contractAggregateRepository.load(command.getId().toString(), command.getEndDate()); return lockAwareAggregate.getWrappedAggregate().getAggregateRoot(); }}
复制代码
Controller 中发送命令:
private final QueryGateway queryGateway;
@GetMapping("/{id}") public ContractAggregate getContract(@PathVariable("id") Long id) { QueryContractCommand command = new QueryContractCommand(id, Instant.now());
return queryGateway.query(command, ContractAggregate.class).join(); }
复制代码
到这里 CQRS + EventSoucing 的模型就做完了。完整的例子 - branch session3
相关文章:
《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》
《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》
评论 1 条评论