写点什么

KafkaFlow 入门指南:构建可扩展的 Kafka 事件驱动应用

作者:Guilherme Ferreira

  • 2023-10-11
    北京
  • 本文字数:5379 字

    阅读完需:约 18 分钟

大小:2.06M时长:12:00
KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用

在本文中,我们将会探讨 KafkaFlow 提供提供的特性。如果你正在使用.NET 构建 Apache Kafka 生产者和消费者,那么本文将会介绍如何借助 KafkaFlow 来简化你的生活。

为何要关注它?


KafkaFlow 为 Confluent .NET Kafka 客户端提供了一个抽象层。它使得使用、维护和测试 Kafka 消费者和生产者均更加容易。


假设你要为市场营销活动创建一个客户端目录(Client Catalog)。我们需要一项服务来消费那些捕获新客户端的消息。当开始设计所需的服务时,你会发现现有的服务在如何消费消息方面并不一致。


常见的情形是,团队在解决一些简单的问题(如优雅关机)时,往往会陷入困境。你会发现整个组织有四种不同的 JSON 序列化器实现,这只是挑战之一。


采用 KafkaFlow 这样的框架能够简化流程并加快开发周期。KafkaFlow 拥有一系列旨在提升开发人员体验的特性:


  1. 中间件(Middleware):KafkaFlow 允许开发人员创建中间件来处理消息,从而实现对 Kafka 生产者/消费者管道的更多控制和定制。

  2. 处理器(Handler):引入了处理器的概念,允许开发人员将主题中的消息处理转发给专用消息类型的处理器。

  3. 反序列化算法(Deserialization Algorithms):提供了一套开箱即用的序列化和反序列化算法。

  4. 多线程消费者:提供了保证消息处理顺序的多线程功能,有助于优化系统资源的使用。

  5. 管理 API 和仪表盘:提供了 API 和仪表盘来管理消费者和消费者群组,可以在运行时进行暂停、恢复或倒回偏移。

  6. 消费者限流:提供了一种简便的方式,为主题的消费提供优先级。接下来,我们探讨一下这些特性,看看它们在解决类似问题方面的潜力。

KafkaFlow 生产者:简化消息的生成


我们从消息的生产者开始。


向 Kafka 中生成消息并不是什么高难的火箭科学。即便如此,KafkaFlow 还是为 Confluent 的.NET Kafka 客户端的生产者接口提供了更高级别的抽象,从而能够简化代码并提升可维护性。


下面是一个如何使用 KafkaFlow 生产者发送消息的样例:


await _producers["my-topic-events"]    .ProduceAsync("my-topic", message.Id.ToString(), message);
复制代码


这样,我们就可以向 Kafka 生成消息,而无需直接处理序列化或底层 Kafka 客户端的其他复杂问题。不仅如此,定义和管理生产者还可以通过服务配置上的流畅接口(Fluent Interface)轻松实现。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddProducer(            "product-events",            producer =>                producer            ...        )    ));
复制代码


生产者往往很简单,但也有一些常见的问题需要解决,比如压缩或序列化。我们来探讨一下。

在 KafkaFlow 中自定义序列化/反序列化


在 Apache Kafka 中,一个很有吸引力的特性就是与数据格式无关。但是,这就将责任转移给了生产者和消费者。如果考虑不周全,可能会导致在整个系统中出现由多种方式实现同一种结果的现象。因此,序列化显然是一个由客户端框架处理的用例。


KafkaFlow 具有适用于 JSON、Protobuf 甚至 Avro 的序列化器。只需将它们添加到中间件配置中就可以使用。


.AddProducer<ProductEventsProducer>(producer => producer       ...       .AddMiddlewares(middlewares => middleware           ...           .AddSerializer<JsonMessageSerializer>()       ))
复制代码


鉴于我们可以为消息使用自定义的序列化器/反序列化器,所以这个列表并不局限于这三种。虽然 Confluent 的.NET Kafka 客户端已经支持自定义序列化/反序列化,但 KafkaFlow 通过提供更优雅的处理方式简化了这一过程。举例来说,要使用自定义序列化器,我们可以这样做:


public class MySerializer : ISerializer{       public Task SerializeAsync(object message, Stream output, ISerializerContext context)       {             // 序列化逻辑在这里       }
public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context) { // 反序列化逻辑在这里 }}
// 在设置Kafka消费者/生产者的时候,注册自定义的序列化器
.AddProducer<MyProducer>(producer => producer ... .AddMiddlewares(middlewares => middleware ... .AddSerializer<MySerializer>() ))
复制代码

KafkaFlow 中的消息处理


消费者带来了大量的问题和可能性。第一个问题就是“如何处理消息?”


我们从最简单的方式开始。随着像 MediatR 这样的库的出现,CQRS 和 Meditor 模式得到了普及,.NET 开发人员习惯于将消息处理器与请求/消息接收器解耦。KafkaFlow 将同样的原则引入到了 Kafka 消费者中。


KafkaFlow 消息处理器允许开发人员定义特定的逻辑来处理来自 Kafka 主题的消息。按照设计,KafkaFlow 的消息处理器结构能够更好地分离关注点,并使代码更整洁、更易于维护。


如下是一个消息处理器的示例:


public class MyMessageHandler : IMessageHandler<MyMessageType>{    public Task Handle(IMessageContext context, MyMessageType message)    {        // 消息处理逻辑在这里    }}
复制代码


这个处理器可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer...       .AddMiddlewares(middlewares => middlewares           ...             .AddTypedHandlers(handlers => handlers                     .AddHandler<MyMessageHandler>()              )       ))
复制代码


通过这种方式,可以轻松地将消费者和处理器分开,从而提升了可维护性和可测性。如果你的微服务只处理具有一种消息类型的一个主题,这可能会显得引入了不必要的复杂性。在这种情况下,你可以使用中间件。

KafkaFlow 中的中间件


KafkaFlow 是面向中间件的。你可能已经注意到,在消息处理器的代码片段中提到了“中间件”。所以,你可能会问什么是中间件。


中间件使得类型化处理器(Typed Handler)成为可能。消息会被传递到一个中间件管道,该管道将会被依次调用。如果你使用过 MediatR 管道的话,可能会对这一概念有所了解。此外,中间件还可以用来进行一系列的转换。换句话说,给定的中间件可以将传入的消息转换到下一个中间件。


KafkaFlow 中的中间件封装了处理消息的逻辑。管道是可扩展的,允许开发人员在消息处理管道中添加行为。


如下是一个中间件的样例:


public class MyMiddleware : IMessageMiddleware{    public async Task Invoke(IMessageContext context, MiddlewareDelegate next)    {         // 预处理逻辑位于这里                  await next(context);                   // 后处理逻辑位于这里           }}
复制代码


要使用该中间件,可以在消费者配置中进行注册:


.AddConsumer(consumer => consumer       ...       .AddMiddlewares(middlewares => middlewares             ...             .Add<MyMiddleware>()         ))   
复制代码


通过这种方式,开发人员就可以在消息处理管道中插入自定义逻辑,从而提供灵活性和控制力。类型化处理器是中间件的一种形式。所以,你甚至可以在没有类型化处理器的情况下处理消息,实现自己的中间件,或者也可以使用中间件来构建消息管道,在处理消息之前执行校验、丰富化等操作。

在 KafkaFlow 中处理并发


一旦开始思考基础设施的效率,你就会发现许多 Kafka 消费者没有得到充分利用。最常见的实现方式是单线程的,这限制了资源的利用率。因此,当我们需要扩展的时候,只能进行横向扩展,以保持所需的吞吐量。


KafkaFlow 为实现基础设施的高效率带来了另外一种可选方案。KafkaFlow 让开发人员可以控制单个消费者可以并发处理多少消息。它使用了 Worker 的理念,这些 Worker 可以协同消费一个主题。这一功能能够让你优化 Kafka 消费者,使其更好地匹配系统的能力。


如下是一个如何为消费者设置并发 worker 数量的样例:


.AddConsumer(consumer => consumer.Topic("topic-name")       .WithGroupId("sample-group")       .WithBufferSize(100)       .WithWorkersCount(10) // 设置worker的数量       .AddMiddlewares(middlewares => middlewares        ...        ))
复制代码


即便有并发 worker,KafkaFlow 也能确保顺序。

批处理


随着规模的扩大,你将会面临延迟和吞吐量之间的权衡。为了解决这个问题,KafkaFlow 有一个重要的特性,叫做“批量消费”。这个特性满足了以批量方式消费和处理来自 Kafka 的消息时对效率和性能的要求。在需要一起处理一组消息,而不是单个处理消息的场景下,该特性发挥着重要作用。

什么是批量消费?


在批量消费方式中,系统不是在收到消息后对其进行原子性地处理,而是将多条消息分组,然后一次性地对其进行处理。这种方法在处理大量数据时更为有效,尤其是在消息相互独立的情况下。批量执行操作会提高整体性能。

KafkaFlow 的批量消费方式


KafkaFlow 利用中间件系统提供批量处理功能。批量处理中间件能够让你根据批量大小或时间跨度(timespan)对消息进行分组。一旦达到其中的某个条件,中间件就会将这组消息转发给下一个中间件。


services.AddKafka(kafka => kafka    .AddCluster(cluster => cluster        .WithBrokers(new[] { "host:9092" })        .AddConsumer(            consumerBuilder => consumerBuilder            ...            .AddMiddlewares(                middlewares => middlewares                    ...                    .BatchConsume(100, TimeSpan.FromSeconds(10))                    .Add<HandlingMiddleware>()            )        )    ));
复制代码

批量消费对性能的影响


通过批量处理,开发人员可以在基于 Kafka 的应用程序中实现更高的吞吐量。它可以加快处理速度,因为与启动和完成每个处理任务相关的开销会大大减少。这将全面提升系统的性能。


同时,这种方式还能减少网络 I/O 操作,因为数据是以更大的分块获取的,这能够进一步提高处理速度,尤其是在需要关注网络延迟的系统中。

KafkaFlow 的消费者管理


KafkaFlow 还简化了 Kafka 消费者管理相关的任务。通过 KafkaFlow 的管理 API,我们可以启动、停止、暂停消费者以及倒回偏移(rewind offset)。


管理 API 可以在编程接口、REST API 或 Dashboard UI 中使用。



KafkaFlow 的管理仪表盘

消费者限流


通常,底层技术可能无法像 Kafka 消费者那样以相同的方式应对高负载期。这会带来稳定性的问题,而这正是限流的用武之地。


消费者限流是一种管理消息消费的方式,它能够使应用程序根据指标动态调整消息消费的速度。

优先级


假设你正在运行一个应用程序,希望将原子操作和批量操作分隔到不同的消费者和主题中。与批量操作相比,你可能更愿意优先处理原子操作。按照传统方式,由于消息生成的速度可能存在差异,所以管理这种差异化可能很具挑战性。


在这种情况下,消费者限流就很有价值了,它允许我们监控那些负责原子操作的消费者的滞后(lag)情况。根据这一指标,我们可以对处理批量操作的消费者实施限流,确保优先处理原子操作。


那结果是什么呢?高效、灵活和优化的消费流程。


借助 KafkaFlow 的流畅接口,为消费者添加限流功能是非常简单的。下面是一个简单的样例:


.AddConsumer(    consumer => consumer        .Topic("bulk-topic")        .WithName("bulkConsumer")        .AddMiddlewares(            middlewares => middlewares                .ThrottleConsumer(                    t => t                        .ByOtherConsumersLag("singleConsumer")                        .WithInterval(TimeSpan.FromSeconds(5))                        .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))                        .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))                        .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))                .AddSerializer<JsonCoreSerializer>()        ))
复制代码

KafkaFlow:展望未来


目前,KafkaFlow 在 Kafka 的基础上提供了一个健壮的、对开发人员友好的抽象,简化了使用.NET 构建实时数据处理应用程序的过程。但是,与其他活跃的开源项目一样,KafkaFlow 也在不断演进和完善。


项目目前的发展轨迹来看,我们可以预测几个方面的发展方向。例如,KafkaFlow 可能会进一步增强其中间件系统,为消息处理提供更多的控制权和灵活性。我们可能还会看到更广泛的管理 API,为开发人员提供对 Kafka 集群更大的控制权。


由于设计上的可扩展性,我们可以期待 KafkaFlow 社区会不断壮大,带来更多的贡献、创新特性、扩展和支持。随着越来越多的开发人员和组织采用 KafkaFlow,我们会看到学习资源、教程、案例和其他社区内容不断涌现,这些内容可以帮助新用户入门,也可以帮助现有的用户从库中学习更多的知识。

结论


KafkaFlow 是一个便利、对开发人员友好的工具,它简化了在.NET 中使用 Kafka 的工作。在开发人员体验和可用性方面,它均表现出色。该框架的设计非常适合整洁、可读性强的代码。在 Apache Kafka 上构建应用程序时,KafkaFlow 通过中间件、消息处理器以及对复杂问题的抽象,实现了清晰的分离,这有助于保持代码库的可管理性和可理解性。


除此之外,围绕 KafkaFlow 的社区在不断壮大。如果你正在使用 Kafka 并希望提高生产力和可靠性,那 KafkaFlow 绝对值得考虑。


原文链接:

Building Kafka Event-Driven Applications with KafkaFlow

2023-10-11 08:0010944

评论

发布
暂无评论
发现更多内容

KaiwuDB 数据库故障诊断工具详解

KaiwuDB

数据库 故障诊断

CentOS 7 升级 5.4 内核

MatrixOrigin

数据库 分布式 云原生

分析比较微服务框架:Spring Cloud 和 Dubbo

Apifox

程序员 微服务 Spring Cloud dubbo 后端

DTC2024精彩预告 | MatrixOne:超融合数据库新星崛起,掀起全新数据革命

MatrixOrigin

数据库 分布式 云原生

Web3 游戏周报(3.31-4.6)

Footprint Analytics

gamefi\ Web3 游戏

从入门到精通:系统性学习Linux虚拟网络设备的全面指南

GousterCloud

Linux Kenel 虚拟网卡

Microsoft 365 for Mac(原Office 365)

iMac小白

Apache Doris 基于 Job Scheduler 实现秒级触发任务调度能力

SelectDB

数据库 数据分析 大数据 开源 调度平台

API接口在数据分析中的应用:淘宝商品信息获取实例

技术冰糖葫芦

api 货币化 API 文档

Stable diffusion 初学者指南

程序那些事

程序那些事 程序那些事; openai AIGC Stable Diffusion

电力物联网系统设计

能源恒观

物联网 电力 新能源

BOE(京东方)隆重举办周年庆典暨年度“京东方人”奖表彰大会 以文化传承谱写高质量发展新篇章

科技热闻

开源无代码 / 低代码平台 NocoBase 0.21:图表及工作流支持多数据源

NocoBase

开源 开发者 低代码 开发工具 无代码开发

保持人才和技术的新鲜感,倡导数据驱动的创新和财务管理

智达方通

人才培养 全面预算管理 全面预算管理系统 全面预算管理平台

得物千人规模敏捷迭代实践分享

得物技术

项目管理 互联网人 PMO 得物技术 企业号 4 月 PK 榜

KafkaFlow入门指南:构建可扩展的Kafka事件驱动应用_业务架构_InfoQ精选文章