写点什么

Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化

  • 2019-07-05
  • 本文字数:7749 字

    阅读完需:约 25 分钟

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化

本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。

Spring Cloud Stream 优化

问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:


  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。

  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。

  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。

解决方案

为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:



这样以上几点问题都会得到解决,下面我们来看看具体如何实现:


  • 首先定义一个注解用于接受自己分发的事件:



@Target( {ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface StreamEventHandler {
String[] payloadTypes() default {""};
String[] types();}

复制代码


types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。


  • 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :


@Entity@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))@Getter@Setter@NoArgsConstructorpublic class DomainAggregateSequence {
@Id @GeneratedValue private Long id;
private Long sequenceNumber;
private Long aggregateIdentifier;
private String type;}
@Repositorypublic interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {
/** * 根据 aggregate id 和 type 找到对应的记录 * * @param identifier * @param type * * @return */ DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);
}
复制代码


  • 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:


@Slf4j@Component@AllArgsConstructorpublic class StreamDomainEventDispatcher implements BeanPostProcessor {
private final ObjectMapper mapper;
private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;
private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();
@Autowired public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) { this.mapper = mapper; this.domainAggregateSequenceRepository = domainAggregateSequenceRepository; }
@Transactional public void dispatchEvent(DomainEvent event, String type) {
log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));
// 1. 检查是否是乱序事件或者重复事件 Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier()); String eventType = event.getType(); Long eventSequence = event.getSequenceNumber();
DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);
if (sequenceObject == null) { sequenceObject = new DomainAggregateSequence(); sequenceObject.setSequenceNumber(eventSequence); sequenceObject.setAggregateIdentifier(aggregateIdentifier); sequenceObject.setType(eventType); } else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) { // 重复事件,直接忽略 if (sequenceObject.getSequenceNumber().equals(eventSequence)) { log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber())); return; } throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence)); } else { sequenceObject.setSequenceNumber(eventSequence); }
domainAggregateSequenceRepository.save(sequenceObject);
// 2. 分发事件到各个 handler beanHandlerMap.forEach((key, value) -> { Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());
matchedMethod.ifPresent(method -> { try { invoke(key, method, event); } catch (IllegalAccessException | InvocationTargetException e) {
throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e); } });
if (!matchedMethod.isPresent()) { log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier())); } });
log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier())); }
@Transactional public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) { // 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性 List<Method> results = methods.stream().filter(m -> { StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class); List<String> types = new ArrayList<>(Arrays.asList(handler.types())); List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));
types.removeIf(StringUtils::isBlank); payloadTypes.removeIf(StringUtils::isBlank);
if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) { payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName()); }
boolean isTypeMatch = types.contains(type);
String checkedPayloadType = payloadType; if (StringUtils.contains(checkedPayloadType, ".")) { checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, "."); } boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);
return isTypeMatch && isPayloadTypeMatch; }).collect(Collectors.toList());
if (results.size() > 1) { throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType)); }
return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty(); }
@Transactional public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {
int count = method.getParameterCount();
if (count == 0) { method.invoke(bean); } else if (count == 1) { Class<?> payloadType = method.getParameterTypes()[0];
if (payloadType.equals(DomainEvent.class)) { method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class)); } else { method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType)); }
} else if (count == 2) { Class<?> payloadType0 = method.getParameterTypes()[0]; Class<?> payloadType1 = method.getParameterTypes()[1];
Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0); Object secondParameterValue = event.getMetaData();
// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入 if (payloadType0.equals(DomainEvent.class)) { firstParameterValue = mapper.convertValue(event, payloadType0); secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1); } if (payloadType1.equals(DomainEvent.class)) { secondParameterValue = mapper.convertValue(event, payloadType1); } method.invoke(bean, firstParameterValue, secondParameterValue); } }

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
List<Method> methods = new ArrayList<>(); for (Method method : uniqueDeclaredMethods) { StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamEventHandler.class); if (streamListener != null) { methods.add(method); } } if (!CollectionUtils.isEmpty(methods)) { beanHandlerMap.put(bean, methods); } return bean; }
}
复制代码


这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler 注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。


  • 实现一个比较简单的例子:


@Slf4j@Component@Transactional@AllArgsConstructorpublic class DomainEventDispatcher {
private final StreamDomainEventDispatcher streamDomainEventDispatcher;
@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'") public void handleBuilding(@Payload DomainEvent event) { streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT); }}
@Component@AllArgsConstructor@Transactionalpublic class ContractEventHandler { @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) public void handle(ContractCreatedEvent event) { // 实现你的 view 层更新业务 }}
复制代码


注意:


  • AbstractDomainEventDispatcher中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional会失效,这个有兴趣的同学可以研究一下@Transactional的机制。


至此以上几点就优化完了。

其他优化

错误处理

基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:


  1. 拒绝这个消息,丢在 broker 原先的队列里。

  2. 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。


方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:



  • 首先让 SCS 为我们自动生成一个 DLQ


spring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect  rabbitmq:    host: localhost    port: 5672    username: creams_user    password: Souban701  cloud:    stream.bindings:      contract-events: # 这个名字对应代码中@input("value") 的 value        destination: contract-events # 这个对应 rabbit 中的 channel        contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下      contract-events-input:        destination: contract-events        contentType: application/json        group: event-sourcing-service        durableSubscription: true    stream.rabbit.bindings.contract-events-input.consumer:      autoBindDlq: true      republishToDlq: true      deadLetterQueueName: contract-error.dlqlogging:  level.org:    springframework:      web: INFO      cloud.sleuth: INFO    apache.ibatis: DEBUG    java.sql: DEBUG    hibernate:      SQL: DEBUG      type.descriptor.sql: TRACE
axon: serializer: general: jackson

复制代码


加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。


  • 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。


@Componentpublic class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
private final RabbitTemplate rabbitTemplate;
private ApplicationContext applicationContext;
private static final String DLQ = "contract-error.dlq";
@Autowired public DLXHandler(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override public void onApplicationEvent(ContextRefreshedEvent event) { // SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成 if (event.getApplicationContext().equals(this.applicationContext)) { // 启动后获取 dlq 中所有的消息,进行消费 Message message = rabbitTemplate.receive(DLQ); while (message != null) { rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message); message = rabbitTemplate.receive(DLQ); } }
}}
复制代码


由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。

完善之前的 CQRS 例子

经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。


  1. 添加时间的监听


    @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)    public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {        QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView(); view.setIndustryName(aggregate.getIndustryName()); view.setId(aggregate.getIdentifier()); view.setPartyB(aggregate.getPartyB()); view.setPartyA(aggregate.getPartyA()); view.setName(aggregate.getName()); view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view); }
复制代码


StreamDomainEventDispatcher 对传参做了一些处理,当有两个参数的时候会将 DomainEvent 传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。


  1. 删除原来的 ContractViewHandler 即可。完整的例子 - branch session6


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


2019-07-05 09:027601

评论

发布
暂无评论
发现更多内容

TiEM初级实践

TiDB 社区干货传送门

6.x 实践

tidb-v5.2.3内存使用率高的几个case

TiDB 社区干货传送门

MVCC导致limit 1执行慢测试

TiDB 社区干货传送门

实践案例 管理与运维 性能测评

体验 TiDB v6.0.0 之 Clinic

TiDB 社区干货传送门

实践案例 6.x 实践

TiDB 6.0 的「元功能」:Placement Rules in SQL 是什么?

TiDB 社区干货传送门

6.x 实践

select查询失败,报“no such file or directory”错误

TiDB 社区干货传送门

TiDB 查询优化及调优系列(二)TiDB 查询计划简介

TiDB 社区干货传送门

TiDB 6.0 Placement Rules In SQL 使用实践

TiDB 社区干货传送门

管理与运维 版本测评 新版本/特性解读 6.x 实践

TiDB 4.0 升级 5.1 二三事——避坑指南

TiDB 社区干货传送门

版本升级

一篇文章说透缓存表

TiDB 社区干货传送门

TiDB 源码解读 新版本/特性解读 6.x 实践

初体验之rawkv learner recover灾备切换

TiDB 社区干货传送门

记一次tidb离线环境下安装非本地镜像源组件的过程

TiDB 社区干货传送门

实践案例 管理与运维 安装 & 部署 应用适配

tiup修改参数显示成功但不生效

TiDB 社区干货传送门

Let's go, TiCheck!

TiDB 社区干货传送门

监控

TiDB 集群一次诡异的写入慢问题排查经历

TiDB 社区干货传送门

故障排查/诊断

一次 TiDB 5.1 Write Stall 问题处理

TiDB 社区干货传送门

故障排查/诊断

TiDB 生态工具 -- TiUniManager(原 TiEM)v1.0.0 体验

TiDB 社区干货传送门

6.x 实践

TiFlash 源码阅读(一) TiFlash 存储层概览

TiDB 社区干货传送门

用一个性能提升了666倍的小案例说明在TiDB中正确使用索引的重要性

TiDB 社区干货传送门

性能调优 实践案例 应用适配

体验TiDB v6.0.0 之TiCDC

TiDB 社区干货传送门

实践案例 6.x 实践

TiDB 6.0 新特性解读 | TiFlash 新增算子和函数下推

TiDB 社区干货传送门

6.x 实践

文盘Rust -- 领域交互模式如何实现

TiDB 社区干货传送门

开发语言

一个小操作,SQL查询速度翻了1000倍。

TiDB 社区干货传送门

性能调优 实践案例 管理与运维 故障排查/诊断

我和tidb 的故事 - 我们终会在平行世界相遇

TiDB 社区干货传送门

TiDB上百T数据拆分实践

TiDB 社区干货传送门

迁移 管理与运维

TiDB 6.0 新特性解读 | Collation 规则

TiDB 社区干货传送门

6.x 实践

体验 TiDB v6.0.0 之 TiDB 的数据迁移工具 DM-WebUI

TiDB 社区干货传送门

实践案例 6.x 实践

TiKV缩容不掉如何解决?

TiDB 社区干货传送门

集群管理 故障排查/诊断 扩/缩容

TiDB 查询优化及调优系列(一)TiDB 优化器简介

TiDB 社区干货传送门

TiDB v6.0.0(DMR) 缓存表初试

TiDB 社区干货传送门

6.x 实践

TiDB 5.1 Write Stalls 应急文档

TiDB 社区干货传送门

实践案例

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化_文化 & 方法_周国勇_InfoQ精选文章