写点什么

KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用

作者:Guilherme Ferreira

  • 2023-10-11
    北京
  • 本文字数:5379 字

    阅读完需:约 18 分钟

大小:2.06M时长:12:00
KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用

AI 大模型超全落地场景&金融应用实践,8 月 16 - 19 日 FCon x AICon 大会联诀来袭、干货翻倍!

在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。

为何要关注它?


KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。


假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。


常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。


采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:


  1. 中间件(Middleware):KafkaFlow 允许开发人员创建中间件来处理消息,从而实现对 Kafka 生产者/消费者管道的更多控制和定制。

  2. 处理器(Handler):引入了处理器的概念,允许开发人员将主题中的消息处理转发给专用消息类型的处理器。

  3. 反序列化算法(Deserialization Algorithms):提供了一套开箱即用的序列化和反序列化算法。

  4. 多线程消费者:提供了保证消息处理顺序的多线程功能,有助于优化系统资源的使用。

  5. 管理 API 和仪表盘:提供了 API 和仪表盘来管理消费者和消费者群组,可以在运行时进行暂停、恢复或倒回偏移。

  6. 消费者限流:提供了一种简便的方式,为主题的消费提供优先级。接下来,我们探讨一下这些特性,看看它们在解决类似问题方面的潜力。

KafkaFlow 生产者:简化消息的生成


我们从消息的生产者开始。


向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。


下面是一个如何使用 KafkaFlow 生产者发送消息的样例:


await _producers["my-topic-events"]    .ProduceAsync("my-topic", message.Id.ToString(), message);
复制代码


这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddProducer(            "product-events",            producer =>                producer            ...        )    ));
复制代码


生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。

在 KafkaFlow 中自定义序列化/反序列化


在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。


KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。


.AddProducer<ProductEventsProducer>(producer => producer       ...       .AddMiddlewares(middlewares => middleware           ...           .AddSerializer<JsonMessageSerializer>()       ))
复制代码


鉴于我们可以为消息使用自定义的序列化器/反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化/反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:


public class MySerializer : ISerializer{       public Task SerializeAsync(object message, Stream output, ISerializerContext context)       {             // 序列化逻辑在这里       }
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context) { // 反序列化逻辑在这里 }}
// 在设置Kafka消费者/生产者的时候,注册自定义的序列化器
.AddProducer<MyProducer>(producer => producer ... .AddMiddlewares(middlewares => middleware ... .AddSerializer<MySerializer>() ))
复制代码

KafkaFlow 中的消息处理


消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”


我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求/消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。


KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。


如下是一个消息处理器的示例:


public class MyMessageHandler : IMessageHandler<MyMessageType>{    public Task Handle(IMessageContext context, MyMessageType message)    {        // 消息处理逻辑在这里    }}
复制代码


这个处理器可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer...       .AddMiddlewares(middlewares => middlewares           ...             .AddTypedHandlers(handlers => handlers                     .AddHandler<MyMessageHandler>()              )       ))
复制代码


通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。

KafkaFlow 中的中间件


KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。


中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。


KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。


如下是一个中间件的样例:


public class MyMiddleware : IMessageMiddleware{    public async Task Invoke(IMessageContext context, MiddlewareDelegate next)    {         // 预处理逻辑位于这里                  await next(context);                   // 后处理逻辑位于这里           }}
复制代码


要使用该中间件,可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer       ...       .AddMiddlewares(middlewares => middlewares             ...             .Add<MyMiddleware>()         ))   
复制代码


通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。

在 KafkaFlow 中处理并发


一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。


KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。


如下是一个如何为消费者设置并发 worker 数量的样例:


.AddConsumer(consumer => consumer.Topic("topic-name")       .WithGroupId("sample-group")       .WithBufferSize(100)       .WithWorkersCount(10) // 设置worker的数量       .AddMiddlewares(middlewares => middlewares        ...        ))
复制代码


即便有并发 worker,KafkaFlow 也能确保顺序。

批处理


随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。

什么是批量消费?


在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。

KafkaFlow 的批量消费方式


KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddConsumer(            consumerBuilder => consumerBuilder            ...            .AddMiddlewares(                middlewares => middlewares                    ...                    .BatchConsume(100, TimeSpan.FromSeconds(10))                    .Add<HandlingMiddleware>()            )        )    ));
复制代码

批量消费对性能的影响


通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。


同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。

KafkaFlow 的消费者管理


KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。


管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。



KafkaFlow 的管理仪表盘

消费者限流


通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。


消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。

优先级


假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。


在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。


那结果是什么呢?高效、灵活和优化的消费流程。


借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:


.AddConsumer(    consumer => consumer        .Topic("bulk-topic")        .WithName("bulkConsumer")        .AddMiddlewares(            middlewares => middlewares                .ThrottleConsumer(                    t => t                        .ByOtherConsumersLag("singleConsumer")                        .WithInterval(TimeSpan.FromSeconds(5))                        .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))                        .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))                        .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))                .AddSerializer<JsonCoreSerializer>()        ))
复制代码

KafkaFlow:展望未来


目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。


项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。


由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。

结论


KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。


除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。


原文链接:

Building Kafka Event-Driven Applications with KafkaFlow

2023-10-11 08:0010845

评论

发布
暂无评论
发现更多内容

Spring Boot + EasyExcel 导入导出,好用到爆!

Java 程序员 后端

Spring Boot 中三种跨域场景总结,这篇必看!不看后悔系列

Java 程序员 后端

Spring MVC框架:第六章:传统增删改查

Java 程序员 后端

Serverless 如何在阿里巴巴实现规模化落地?

Java 程序员 后端

Spring Boot 接入 GitHub 第三方登录,只要两行配置!

Java 程序员 后端

Spring Cloud Gateway限流实战

Java 程序员 后端

Spring Cloud:第二章:eureka服务发现

Java 程序员 后端

Spring Boot 操作 Redis 的各种实现

Java 程序员 后端

Spring Boot+Mybatis+thymeleaf整合

Java 程序员 后端

Spring Cloud 2020 版本最佳实践,你落伍了

Java 程序员 后端

Spring 三级缓存和循环依赖 思考和总结

Java 程序员 后端

SAP为Java 16贡献JEP 387 “弹性元空间”

Java 程序员 后端

spring boot 自定义配置文件&参数绑定

Java 程序员 后端

Spring Boot 集成 Elasticsearch 实战

Java 程序员 后端

Vue进阶(幺柒伍):色彩搭配

No Silver Bullet

Vue 11月日更

Go WebSocket开发与测试实践【/net/websocket】

FunTester

Java websocket 接口测试 Go 语言 FunTester

Spring Boot 核心的 25 个注解

Java 程序员 后端

Spring Boot在微服务中的最佳实践

Java 程序员 后端

Spring MVC框架:第七章:REST架构风格

Java 程序员 后端

【LeetCode】合并两个有序数组Java题解

Albert

算法 LeetCode 11月日更

Spring MVC温故而知新 – 从零开始

Java 程序员 后端

Servlet 入门

Java 程序员 后端

Spring @Lookup实现单例bean依赖注入原型bean

Java 程序员 后端

Spring Boot 实战(11)整合MyBatis-Plus

Java 程序员 后端

Spring Boot 快速入门(一)

Java 程序员 后端

Spring Cloud Gateway修改请求和响应body的内容

Java 程序员 后端

RocketMQ源码分析之NameServer

Java 程序员 后端

Spring boot —— 创建parent工程

Java 程序员 后端

fastposter 2.2.0 新版本发布 电商级海报生成器

物有本末

Java Vue 海报 fastposter 海报生成器

Spring MVC框架:第七章:REST架构风格(1)

Java 程序员 后端

外包学生管理系统详细架构设计文档

Beyond Ryan

KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用_业务架构_InfoQ精选文章