2天时间,聊今年最热的 Agent、上下文工程、AI 产品创新等话题。2025 年最后一场~ 了解详情
写点什么

如何在 kafka-python 和 confluent-kafka 之间做出选择?

  • 2017-09-10
  • 本文字数:2226 字

    阅读完需:约 7 分钟

在 Data Syndrome,我们使用并喜爱 Kafka。它使我们能够以最少的努力和复杂性将批处理变为实时处理。然而,在最近的一个项目中,我们学到了有关 kafka-python 软件包的惨痛教训,该软件包促使我思考该如何选择开源工具。本文将反思我们的开源决策过程,介绍两个用于 Python 的 Kafka 客户端、我们遇到的问题及我们将采用的解决方案。

kafka-python:蛮荒的西部

kafka-python 是最受欢迎的 Kafka Python 客户端。我们过去使用时从未出现过任何问题,在我的《敏捷数据科学 2.0》一书中我也用过它。然而在最近这个项目中,它却出现了一个严重的问题。我们发现,当以文档化的方式使用KafkaConsumer、Consumer 迭代式地从消息队列中获取消息时,最终到达主题topic 的由Consumer 携带的消息通常会丢失。我们通过控制台Consumer 的分析验证了这一点。

需要更详细说明的是,kafka-python 和KafkaConsumer 是与一个由SSL 保护的Kafka 服务(如 Aiven Kafka )一同使用的,如下面这样:

复制代码
kafka_consumer = KafkaConsumer(
topic,
enable_auto_commit=True,
group_id=group_id,
bootstrap_servers=config.kafka.host,
api_version=(0, 10),
security_protocol='SSL',
ssl_check_hostname=True,
ssl_cafile=config.kafka.ca_pem,
ssl_certfile=config.kafka.service_cert,
ssl_keyfile=config.kafka.service_key
)
for message in kafka_consumer:
application_message = json.loads(message.value.decode())
...

当以这样的推荐方式使用时,KafkaConsumer 会丢失消息。但有一个变通方案,就是保留所有消息。这个方案是 Kafka 服务提供商 Aiven support 提供给我们的。它看起来像这样:

复制代码
while True:
raw_messages = consumer.poll(timeout_ms=1000, max_records=5000)
for topic_partition, messages in raw_messages.items():
application_message = json.loads(message.value.decode())
...

虽然这个变通方案可能有用,但 README 中的方法会丢弃消息使我对其失去兴趣。所以我找到了一个替代方案。

confluent-kafka:企业支持

发现 coufluent-kafka Python 模块时,我感到无比惊喜。它既能做 librdkafka 的外封装,又非常小巧。librdkafka 是一个用 C 语言写的 kafka 库,它是 Go 和.NET 的基础。更重要的是,它由 Confluent 公司支持。我爱开源,但是当“由非正式社区拥有或支持”这种方式效果不行的时候,或许该考虑给替代方案印上公章、即该由某个公司拥有或支持了。不过,我们并未购买商业支持。我们知道有人会维护这个库的软件质量,而且可以选择买或不买商业支持,这一点真是太棒了。

用 confluent-kafka 替换 kafka-python 非常简单。confluent-kafka 使用 poll 方法,它类似于上面提到的访问 kafka-python 的变通方案。

复制代码
kafka_consumer = Consumer(
{
"api.version.request": True,
"enable.auto.commit": True,
"group.id": group_id,
"bootstrap.servers": config.kafka.host,
"security.protocol": "ssl",
"ssl.ca.location": config.kafka.ca_pem,
"ssl.certificate.location": config.kafka.service_cert,
"ssl.key.location": config.kafka.service_key,
"default.topic.config": {"auto.offset.reset": "smallest"}
}
)
consumer.subscribe([topic])
# Now loop on the consumer to read messages
running = True
while running:
message = kafka_consumer.poll()
application_message = json.load(message.value.decode())
kafka_consumer.close()

现在我们能收到所有消息了。我并不是说 kafka-python 工具不好,我相信社区会对它的问题做出反应并解决。但从现在开始,我会一直坚持使用 confluent-kafka。

开源治理

开源是强大的,但是涉及到复杂的“大数据”和 NoSQL 工具时,通常需要有一家大公司在背后推动工具的开发。这样你就知道,如果那个公司可以使用工具,那么该工具应该拥有很好的基本功能。它的出现可能是非正式的,就像某公司发布类似 FOSS 的项目一样,但也可能是正式的,就像某公司为工具提供商业支持一样。当然,从另一个角度来看,如果一家与开源社区作对的公司负责开发某个工具,你便失去了控制权。你的意见可能无关紧要,除非你是付费客户。

理想情况是采取开源治理,就像 Apache 基金会一样,还有就是增加可用的商业支持选项。这对互联网上大部分的免费软件来说根本不可能。限制自己只使用那些公司盖章批准后的工具将非常限制你的自由。这对于一些商店可能是正确选择,但对于我们不是。我喜欢工具测试,如果工具很小,而且只专心做一件事,我就会使用它。

信任开源

对于更大型的工具,以上决策评估过程更为复杂。通常,我会看一下提交问题和贡献者的数量,以及最后一次 commit 的日期。我可能会问朋友某个工具的情况,有时也会在推特上问。当你进行嗅探检查后从 Github 选择了一个项目,即说明你信任社区可以产出好的工具。对于大多数工具来说,这是没问题的。

但信任社区可能存在问题。对于某个特定的工具,可能并没有充分的理由让你信任社区可以产出好的软件。社区在目标、经验和开源项目的投入时间方面各不相同。选择工具时保持审慎态度十分重要,不要让理想蒙蔽了判断。

英文原文链接 https://blog.datasyndrome.com/a-tale-of-two-kafka-clients-c613efab49df


感谢蔡芳芳对本文的审校。

给InfoQ 中文站投稿或者参与内容翻译工作,请邮件至 editors@cn.infoq.com 。也欢迎大家通过新浪微博( @InfoQ @丁晓昀),微信(微信号: InfoQChina )关注我们。

2017-09-10 19:0024599

评论

发布
暂无评论
发现更多内容

Flink+Clickhouse构建实时数仓的最佳实践

Wping

大数据 flink 实时数仓 Clickhouse

KubeVela + KEDA:为应用带来“与生俱来”的弹性伸缩能力

阿里巴巴云原生

容器 开发者 云原生 监控 中间件

双非渣硕,是如何拿到苏宁、阿里的offer的?(分享学习心得)

Java 程序员 架构 面试

音视频实战(6)- RTSP媒体协议流的录制方案及其覆盖策略详解

liuzhen007

音视频 5月日更 签约计划第二季

精选Hadoop高频面试题17道,附答案详细解析

五分钟学大数据

大数据 hadoop 5月日更

hive的主流文件存储格式对比实验

大数据技术指南

大数据 hive 5月日更

Golang command source code

escray

学习 极客时间 Go 语言 5月日更

Java开发5年,我为什么选择从蚂蚁金服离职?浅谈经历和经验!

Java架构追梦

Java 架构 面试 蚂蚁金服 经历分享

抱歉,“行业毒瘤”这个锅,低/无代码不背

陈思

低代码 无代码 低代码平台 无代码平台

NetWebCore实现文件上传功能

happlyfox

学习 .net core 5月日更

交通流量预测,EasyDL带你零代码实战

百度大脑

零代码 EasyDL

NAT穿透原理详解

IT酷盖

音视频 p2p NAT

v03.06 鸿蒙内核源码分析(时钟任务) | 触发调度谁的贡献最大 | 百篇博客分析HarmonyOS源码

鸿蒙研究站

鸿蒙内核源码分析 百篇博客分析鸿蒙

Iceberg0.11与Spark3.0结合

InfoQ_Springup

iceberg

这个好用的分布式应用配置中心,我们把它开源了

百度Geek说

分布式 大前端 服务器

iOS 面试策略之语言工具-Xcode使用

iOSer

ios xcode 语言 & 开发

引荐好友成为推广者还能拿额外奖励?!华为云引荐奖励计划来啦!

华为云开发者联盟

文章 返现奖励 推广计划 返利 团长

如何从一段视频中一次性修整多个片段

奈奈的杂社

视频剪辑 视频后期 视频处理

XDPool比特兄弟矿场系统开发|XDPool比特兄弟矿场APP软件开发

【实战问题】-- 布隆过滤器的三种实践:手写,Redission以及Guava(2)

秦怀杂货店

Java 布隆过滤器

参照STM32时钟树配置STM32CubeMX Clock Configuration(STM32L011G4U6为例)

不脱发的程序猿

单片机 STM32微控制器 时钟树 STM32CubeMX STM32时钟配置

【死磕JVM】用Arthas排查JVM内存 真爽!我从小用到大

牧小农

JVM

雀食蟀!Java Netty实战入门

北游学Java

Java Netty 网络 框架

SecSolar:为代码“捉虫”,让你能更专心写代码

华为云开发者联盟

代码 华为云 CloudIDE 代码安全检测 SecSolar

微服务化转型,拆就行了?这样做很危险...

BoCloud博云

微服务

这是我金三银四收到的第6个Offer:美团+阿里Java研发岗

Java 程序员 架构 面试

数据产品经理的圣经,送你一份真贵的礼物 ~~ 年度数据产品经理们的总结

金松(李博源)

大数据 数据产品经理 数据产品

51CTO熊平:HarmonyOS是大势所趋

科技汇

🕋【Redis干货领域】彻底走进主从架构的世界(入门篇)

码界西柚

主从同步 Redis 核心技术与实战 5月日更 Redis系列专题 原理篇

灵魂拷问:后端业务开发要会用 K8s 到什么程度?

阿里巴巴云原生

容器 运维 云原生 k8s 存储

兄弟矿场系统开发|兄弟矿场软件APP开发

如何在kafka-python和confluent-kafka之间做出选择?_语言 & 开发_Russell Jurney_InfoQ精选文章