写点什么

Event Sourcing 和 CQRS 落地(五):Spring-Cloud-Stream 优化

  • 2019-07-05
  • 本文字数:7749 字

    阅读完需:约 25 分钟

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化

本系列的上一篇文章重点介绍了 Axon 实现,本文将主要介绍Spring Cloud 提供的消息中间件的抽象 Spring Cloud Stream 的优化方法。

Spring Cloud Stream 优化

问题

Spring Cloud Stream(以下简称 SCS )是 Spring Cloud 提供的消息中间件的抽象,但是目前也就支持 kafka 和 rabbitmq,这篇文章主要会讨论一下如何让 SCS 更好的服务我们之前搭建的 Event Sourcing、CQRS 模型。以下是我在使用 SCS 的过程中存在的一些问题:


  1. StreamListener用来做事件路由分发并不是很理想,SPEL 可能会写的很长(我尝试过用自定义注解代替原生的注解,从而达到简化的目的,但是会出现一些莫名其妙的事件混乱)。

  2. 如果配合之前的模型使用,我们需要保证消息的顺序消费,每个方法都需要去 check 事件的当前 seq,很不方便。

  3. 在没有 handler 处理某个 type 的事件时,框架会给出一个 warn,然而这个事件可能在 consumer 这里根本不关心。

解决方案

为了解决上面的问题,我们可以这么处理,先统一一个入口将 SCS 的消息接收,然后我们自己构建一个路由系统,将请求分发到我们自己定义的注解方法上,并且在这个过程中将 seq 的检查也给做了,大体的流程是这个样子的:



这样以上几点问题都会得到解决,下面我们来看看具体如何实现:


  • 首先定义一个注解用于接受自己分发的事件:



@Target( {ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface StreamEventHandler {
String[] payloadTypes() default {""};
String[] types();}

复制代码


types 对应 Stream 本身 Inuput 的类型, payloadTypes 对应事件类型,比如 ContractCreated,我们要做的效果是这个 payloadTypes 可以不写,直接从方法的第一个参数读取 class 的 simapleName。


  • 定义用于记录 aggregate sequenceNumber 的 entity 和 repository :


@Entity@Table(indexes = @Index(columnList = "aggregateIdentifier,type", unique = true))@Getter@Setter@NoArgsConstructorpublic class DomainAggregateSequence {
@Id @GeneratedValue private Long id;
private Long sequenceNumber;
private Long aggregateIdentifier;
private String type;}
@Repositorypublic interface DomainAggregateSequenceRepository extends JpaRepository<DomainAggregateSequence, Long> {
/** * 根据 aggregate id 和 type 找到对应的记录 * * @param identifier * @param type * * @return */ DomainAggregateSequence findByAggregateIdentifierAndType(Long identifier, String type);
}
复制代码


  • 由于暂时没有找到监听所有已绑定 channel 的事件的方法,这里实现一个类提供一个 dispatch 的方法用于分发:


@Slf4j@Component@AllArgsConstructorpublic class StreamDomainEventDispatcher implements BeanPostProcessor {
private final ObjectMapper mapper;
private final DomainAggregateSequenceRepository domainAggregateSequenceRepository;
private HashMap<Object, List<Method>> beanHandlerMap = new HashMap<>();
@Autowired public StreamDomainEventDispatcher(ObjectMapper mapper, DomainAggregateSequenceRepository domainAggregateSequenceRepository) { this.mapper = mapper; this.domainAggregateSequenceRepository = domainAggregateSequenceRepository; }
@Transactional public void dispatchEvent(DomainEvent event, String type) {
log.info(MessageFormat.format("message [{0}] received", event.getEventIdentifier()));
// 1. 检查是否是乱序事件或者重复事件 Long aggregateIdentifier = Long.parseLong(event.getAggregateIdentifier()); String eventType = event.getType(); Long eventSequence = event.getSequenceNumber();
DomainAggregateSequence sequenceObject = domainAggregateSequenceRepository.findByAggregateIdentifierAndType(aggregateIdentifier, eventType);
if (sequenceObject == null) { sequenceObject = new DomainAggregateSequence(); sequenceObject.setSequenceNumber(eventSequence); sequenceObject.setAggregateIdentifier(aggregateIdentifier); sequenceObject.setType(eventType); } else if (sequenceObject.getSequenceNumber() + 1 != eventSequence) { // 重复事件,直接忽略 if (sequenceObject.getSequenceNumber().equals(eventSequence)) { log.warn(MessageFormat.format("repeat event ignored, type[{0}] aggregate[{1}] seq[{2}] , ignored", event.getType(), event.getAggregateIdentifier(), event.getSequenceNumber())); return; } throw new StreamEventSequenceException(MessageFormat.format("sequence error, db [{0}], current [{1}]", sequenceObject.getSequenceNumber(), eventSequence)); } else { sequenceObject.setSequenceNumber(eventSequence); }
domainAggregateSequenceRepository.save(sequenceObject);
// 2. 分发事件到各个 handler beanHandlerMap.forEach((key, value) -> { Optional<Method> matchedMethod = getMatchedMethods(value, type, event.getPayloadType());
matchedMethod.ifPresent(method -> { try { invoke(key, method, event); } catch (IllegalAccessException | InvocationTargetException e) {
throw new StreamHandlerException(MessageFormat.format("[{0}] invoke error", method.getName()), e); } });
if (!matchedMethod.isPresent()) { log.info(MessageFormat.format("message [{0}] has no listener", event.getEventIdentifier())); } });
log.info(MessageFormat.format("message [{0}] handled", event.getEventIdentifier())); }
@Transactional public Optional<Method> getMatchedMethods(List<Method> methods, String type, String payloadType) { // 这里应该只有一个方法,因为将 stream 的单个事件分成多个之后,无法保证一致性 List<Method> results = methods.stream().filter(m -> { StreamEventHandler handler = m.getAnnotation(StreamEventHandler.class); List<String> types = new ArrayList<>(Arrays.asList(handler.types())); List<String> payloadTypes = new ArrayList<>(Arrays.asList(handler.payloadTypes()));
types.removeIf(StringUtils::isBlank); payloadTypes.removeIf(StringUtils::isBlank);
if (CollectionUtils.isEmpty(payloadTypes) && m.getParameterTypes().length != 0) { payloadTypes = Collections.singletonList(m.getParameterTypes()[0].getSimpleName()); }
boolean isTypeMatch = types.contains(type);
String checkedPayloadType = payloadType; if (StringUtils.contains(checkedPayloadType, ".")) { checkedPayloadType = StringUtils.substringAfterLast(checkedPayloadType, "."); } boolean isPayloadTypeMatch = payloadTypes.contains(checkedPayloadType);
return isTypeMatch && isPayloadTypeMatch; }).collect(Collectors.toList());
if (results.size() > 1) { throw new StreamHandlerException(MessageFormat.format("type[{0}] event[{1}] has more than one handler", type, payloadType)); }
return results.size() == 1 ? Optional.of(results.get(0)) : Optional.empty(); }
@Transactional public void invoke(Object bean, Method method, DomainEvent event) throws IllegalAccessException, InvocationTargetException {
int count = method.getParameterCount();
if (count == 0) { method.invoke(bean); } else if (count == 1) { Class<?> payloadType = method.getParameterTypes()[0];
if (payloadType.equals(DomainEvent.class)) { method.invoke(bean, mapper.convertValue(event.getPayload(), DomainEvent.class)); } else { method.invoke(bean, mapper.convertValue(event.getPayload(), payloadType)); }
} else if (count == 2) { Class<?> payloadType0 = method.getParameterTypes()[0]; Class<?> payloadType1 = method.getParameterTypes()[1];
Object firstParameterValue = mapper.convertValue(event.getPayload(), payloadType0); Object secondParameterValue = event.getMetaData();
// 如果是 DomainEvent 类型则优先传递该类型,另外一个参数按照 payloadType > metaData 优先级传入 if (payloadType0.equals(DomainEvent.class)) { firstParameterValue = mapper.convertValue(event, payloadType0); secondParameterValue = mapper.convertValue(event.getPayload(), payloadType1); } if (payloadType1.equals(DomainEvent.class)) { secondParameterValue = mapper.convertValue(event, payloadType1); } method.invoke(bean, firstParameterValue, secondParameterValue); } }

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
List<Method> methods = new ArrayList<>(); for (Method method : uniqueDeclaredMethods) { StreamEventHandler streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamEventHandler.class); if (streamListener != null) { methods.add(method); } } if (!CollectionUtils.isEmpty(methods)) { beanHandlerMap.put(bean, methods); } return bean; }
}
复制代码


这里参照了 SCS 本身手机 handler 的方式,会将有 @StreamEventHandler 注解的方法都找出来做一个记录。在 dispatchEvent 的时候会更新事件的 seq 并且按照 type 去调用各个标有注解的方法。


  • 实现一个比较简单的例子:


@Slf4j@Component@Transactional@AllArgsConstructorpublic class DomainEventDispatcher {
private final StreamDomainEventDispatcher streamDomainEventDispatcher;
@StreamListener(target = ChannelDefinition.CONTRACTS_INPUT, condition = "headers['messageType']=='eventSourcing'") public void handleBuilding(@Payload DomainEvent event) { streamDomainEventDispatcher.dispatchEvent(event, ChannelDefinition.CONTRACTS_INPUT); }}
@Component@AllArgsConstructor@Transactionalpublic class ContractEventHandler { @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT) public void handle(ContractCreatedEvent event) { // 实现你的 view 层更新业务 }}
复制代码


注意:


  • AbstractDomainEventDispatcher中监听所有 bean 加载完成不能用 InitializingBean 接口,否则@Transactional会失效,这个有兴趣的同学可以研究一下@Transactional的机制。


至此以上几点就优化完了。

其他优化

错误处理

基于 SCS 的默认配置,存在一个致命的问题,那就是当消息处理失败(重试三次)之后,消息直接没了,这个相当于就是消息丢失了。那么解决方案其实也是比较简单的,一般有两种解决方案:


  1. 拒绝这个消息,丢在 broker 原先的队列里。

  2. 将这个消息记录到一个错误的 queue 中等待修复,后续可能将消息转发回去,也可能直接就删除了消息(比如重复的消息)。


方案 1 这么做可能会出的问题就是,这个消息反复消费,反复失败,引起循环问题从而导致服务出现问题,这个就需要在 broker 做一些策略配置了,为了让 broker 尽可能的简单,我们这里采用方案 2,要实现的流程是这样的:



  • 首先让 SCS 为我们自动生成一个 DLQ


spring:  application:    name: event-sourcing-service  datasource:    url: jdbc:mysql://localhost:3306/event?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE    username: root    password: root  jpa:    hibernate:      ddl-auto: update      use-new-id-generator-mappings: false    show-sql: false    properties:      hibernate.dialect: org.hibernate.dialect.MySQL55Dialect  rabbitmq:    host: localhost    port: 5672    username: creams_user    password: Souban701  cloud:    stream.bindings:      contract-events: # 这个名字对应代码中@input("value") 的 value        destination: contract-events # 这个对应 rabbit 中的 channel        contentType: application/json # 这个指定传输类型,其实可以默认指定,但是目前每个地方都写了,所以统一下      contract-events-input:        destination: contract-events        contentType: application/json        group: event-sourcing-service        durableSubscription: true    stream.rabbit.bindings.contract-events-input.consumer:      autoBindDlq: true      republishToDlq: true      deadLetterQueueName: contract-error.dlqlogging:  level.org:    springframework:      web: INFO      cloud.sleuth: INFO    apache.ibatis: DEBUG    java.sql: DEBUG    hibernate:      SQL: DEBUG      type.descriptor.sql: TRACE
axon: serializer: general: jackson

复制代码


加上这个配置之后,rabbit 会给这个队列创建一个 .dlq 后缀的队列,异常消息都会被塞到这个队列里面(消息中包含了异常信息以及来源),等待我们处理,deadLetterQueueName指定了 DLQ 的名称,这样所有的失败消息都会存放到同一个 queue 中。大部分的情况下,消息的异常都是由于 consumer 逻辑错误引起的,所以我们需要一个处理这些失败的消息的地方,比如在启动的时候自动拉取 DLQ 中的消息然后转发到原来的 queue 中去远程原有的业务逻辑,如果处理不了那么还是会继续进入到 DLQ 中。


  • 在启动的时候拉取 DLQ 中的消息转发到原来的 queue 中。


@Componentpublic class DLXHandler implements ApplicationListener<ContextRefreshedEvent>, ApplicationContextAware {
private final RabbitTemplate rabbitTemplate;
private ApplicationContext applicationContext;
private static final String DLQ = "contract-error.dlq";
@Autowired public DLXHandler(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
@Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; }
@Override public void onApplicationEvent(ContextRefreshedEvent event) { // SCS 会创建一个 child context ,这里需要判断下正确的 context 初始化完成 if (event.getApplicationContext().equals(this.applicationContext)) { // 启动后获取 dlq 中所有的消息,进行消费 Message message = rabbitTemplate.receive(DLQ); while (message != null) { rabbitTemplate.send(message.getMessageProperties().getReceivedRoutingKey(), message); message = rabbitTemplate.receive(DLQ); } }
}}
复制代码


由于 SCS 没有提供给我们类似的接口,这里使用了 rabbitmq 的接口来获取消息。

完善之前的 CQRS 例子

经常上述这些基础操作之后,汇过来实现 CQRS 就比较清晰了,只需要监听相关的事件,然后更新视图层即可。


  1. 添加时间的监听


    @StreamEventHandler(types = ChannelDefinition.CONTRACTS_INPUT)    public void handle(ContractCreatedEvent event, DomainEvent<ContractCreatedEvent, HashMap> domainEvent) {        QueryContractCommand command = new QueryContractCommand(event.getIdentifier(), domainEvent.getTimestamp());
ContractAggregate aggregate = queryGateway.query(command, ContractAggregate.class).join();
ContractView view = new ContractView(); view.setIndustryName(aggregate.getIndustryName()); view.setId(aggregate.getIdentifier()); view.setPartyB(aggregate.getPartyB()); view.setPartyA(aggregate.getPartyA()); view.setName(aggregate.getName()); view.setDeleted(aggregate.isDeleted());
contractViewRepository.save(view); }
复制代码


StreamDomainEventDispatcher 对传参做了一些处理,当有两个参数的时候会将 DomainEvent 传递,因为有些时候可能会用到一些字段,比如时间、附加信息等等。这里在消费事件的时候,可以根据时间去查询 aggregate 的状态,然后直接做一个映射,也可以根据事件直接对 view 层做 CUD ,个人觉得在性能和速度不存在大问题的时候直接去查询一下 aggregate 当时的状态做一个映射即可,毕竟比较简单。


  1. 删除原来的 ContractViewHandler 即可。完整的例子 - branch session6


作者介绍:


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


相关文章:


《Event Sourcing 和 CQRS 落地(一):UID-Generator 实现》


《Event Sourcing 和 CQRS 落地(二):Event-Sourcing 实现》


《Event Sourcing 和 CQRS 落地(三):CQRS 实现》


《Event Sourcing 和 CQRS 落地(四):深入使用 -Axon》


2019-07-05 09:027680

评论

发布
暂无评论
发现更多内容

想要解析邮件?IMAP协议轻松助你,不再烦恼!

左诗右码

Go imap

为啥你心里想了什么抖音就会给你推什么?

客户在哪儿AI

人工智能 ToB营销 大客户营销

在日本为什么 mysql都被tidb所替换?

TiDB 社区干货传送门

数据库架构设计 8.x 实践

TiDB监控prometheus常用技巧

TiDB 社区干货传送门

监控 实践案例 集群管理 管理与运维

Pinterest 选择采用 TiDB

TiDB 社区干货传送门

MES系统到底能解决企业什么问题?

万界星空科技

制造业 生产管理系统 mes 万界星空科技

网页文本分类题赛后总结(排名第二)

阿里云天池

NocoBase 社区正式上线!

NocoBase

开源 低代码 无代码平台

金融知识挖掘

阿里云天池

能帮你找到大客户的企业全历史行为数据长什么样?

客户在哪儿AI

人工智能 ToB营销 大客户营销

搜款网商品列表数据接口(vvic.item_search)使用指南

tbapi

搜款网 搜款网API接口 搜款网商品列表数据接口 vvic VVIC网数据采集

【TiDB 社区智慧合集】TiDB 在核心场景的实战应用

TiDB 社区干货传送门

iPhone可运行的谷歌Gemma 2 2B模型,性能超GPT-3.5

硅纪元

gpt4o Gemma 2

Datawhale 零基础入门CV赛事-Task4 模型训练与验证

阿里云天池

第三届Apache Flink 极客挑战赛暨AAIG CUP比赛攻略_大浪813团队

阿里云天池

网页文本分类题赛后总结(排名第二)

阿里云天池

大型IM稳定性监测实践:手Q客户端性能防劣化系统的建设之路

JackJiang

即时通讯;IM;网络编程

从代码操作到洞察发现:API 接口中的商品详情数据世界

Noah

淘宝商品详情API返回值中的商品标签与分类

技术冰糖葫芦

API 安全 API 文档 API 测试 pinduoduo API

TiDB CDC 近期遇到问题总结

TiDB 社区干货传送门

监控 迁移 集群管理 管理与运维

瓜子二手车在财务中台结账核心系统 TiDB&TiFlash 实践

TiDB 社区干货传送门

数据库架构选型 HTAP 场景实践 数据中台场景实践

【IT运维】医院IT运维难点解析看这里!

行云管家

医院 IT 运维

支持纳管达梦数据库的堡垒机有哪些?咨询电话多少?

行云管家

数据安全 堡垒机 国产化

tidb8.1的磁盘选择,关于网络ssd,和本地ssd的选择对性能影响很大,差距60倍。

TiDB 社区干货传送门

8.x 实践

望繁信科技CEO索强出席2024新质生产力生态大会,畅谈中国AI聚沙成塔之路

望繁信科技

流程挖掘 流程资产 流程智能 望繁信科技 中国AI

GitHub Star 数量前 12 的开源无代码工具

NocoBase

GitHub 开源 无代码开发 无代码平台

Event Sourcing 和 CQRS落地(五):Spring-Cloud-Stream 优化_文化 & 方法_周国勇_InfoQ精选文章