全功能纯python kafka客户端

pykafka的Python项目详细描述


皮卡夫卡

http://i.imgur.com/ztyl4lg.jpg

pykafka是一个面向python的程序员友好的kafka客户机。它包括python Kafka生产者和消费者的实现,它们可以选择支持 通过在librdkafka上构建的c扩展。它在Python2.7+、Python3.4+下运行, 和pypypy,并支持kafka 0.8.2及更高版本。

Pykafka的主要目标是为 jvm kafka client 使用python程序员熟悉的习惯用法并公开 尽可能多的pythonic api。

您可以使用

从pypi安装pykafka 啊!

或从Conda-Forge使用

啊!

有关Pykafka的完整文档和使用示例,请参见 readthedocs

通过克隆此存储库和 运行

啊!

入门

假设至少有一个kafka实例在本地主机上运行,则可以使用pykafka 连接到它。

啊!

或者,对于tls连接,您可以编写(也可以参见 sslconfig docs 有关详细信息:

啊!

如果连接到的群集上定义了任何主题,则可以列出 用:

啊!

一旦有了 主题,就可以为它创建一个 生产者并开始 生成消息。

啊!

上面的例子将同时产生对卡夫卡的调用 在确认消息已到达群集后返回。

为了获得更高的吞吐量,我们建议使用 异步模式,这样 product() 调用将立即返回,并且 生产商可以选择批量发送消息。制作人收集 在发送每个批之前,在内部队列中为 延迟ms 生成消息。 这种延迟可以通过 延迟ms来消除或改变,但会降低效率, min queued u messages 和其他关键字参数(请参见readthedocs)。你仍然可以获得 消息的传递确认,通过队列接口,可以 通过设置 Delivery/u reports=true来启用。下面是一个粗略的用法示例:

AAAAAAAAA 8

注意,传递报告队列是线程本地的:它只为报告提供服务 对于从当前线程生成的消息。另外,如果你使用 传递报告=true ,未能使用传递报告队列将导致 Pykafka的内存使用量增长到无限。

您还可以使用 consumer 实例使用本主题中的消息。

啊!

这个 simpleconsumer 不可缩放-如果有两个 simpleconsumer 使用相同的主题,他们将收到重复的消息在美国四处走动 这样,您就可以使用 BalancedConsumer

$ pip install pykafka
0

您可以让尽可能多的 balancedconsumer 实例使用这样的主题 主题有分区。如果它们都连接到同一个zookeeper实例, 它们将与它通信以自动平衡 他们自己。balancedConsumer使用的分区分配策略是 默认的"范围"策略。该策略可通过 成员协议进行切换 关键字参数,可以是由 pykafka.membershipprotocol 公开的对象,也可以是 pykafka.membershipprotocol.groupmembershipprotocol的自定义实例

您还可以将kafka 0.9组成员资格api与 托管的 关键字参数on get_balanced_consumer

使用librdkafka扩展名

pykafka包含一个c扩展,它使用librdkafka来加速producer 以及消费者运营。要使用librdkafka扩展,需要确保头 文件和共享库是python在构建时可以找到它们的地方 扩展(由 setup.py development 负责)和运行时。 通常,这意味着您需要在一个地方安装librdkafka 对于您的系统来说是传统的,或者声明 c\u include\u path library\u path , 以及shell环境中指向安装位置的ld library路径 librdkafka共享对象的。您可以使用 locate librdkafka.so

之后,只需要传递一个额外的参数 使用'u rdkafka=true 主题。获取'u producer() 主题。获取简单消费者() ,或 主题。获取均衡消费者() 。注释 一些配置选项可能有不同的最优值;它可能是 值得参考librdkafka的配置说明。

皮卡夫卡还是卡夫卡蟒蛇?

这是两个不同的项目。 有关比较,请参见此处的讨论 在两个项目之间。

支持

如果您需要使用pykafka的帮助,可以使用很多资源。 有关用法问题或常见配方,请查看stackoverflow标签。 google group 可用于更深入的问题或调查 您想直接发送给pykafka维护人员。如果你相信你 在pykafka中发现一个bug,请在阅读完 贡献指南

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java Date作为MyBatis中Oracle SELECT查询的参数[not get Response]   来自两个独立线程的并行java访问队列   如果数据已经存在,java Android Studio SQLite不会将数据插入数据库   mysql java spring项目仅在第一次运行时显示错误,再次运行后运行正常。为什么呢?   java SQL错误:1364,SQLState:HY000字段“rating_id”没有默认值/保存具有onetoone关系的子实体时   Tomcat中无cookie的java支持会话#重复   JAVAlang.RuntimeException:Android Studio   java CheckboxMultipleChice存储在SQL中   Kafka Java SimpleConsumer奇怪的编码   使用Hibernate保存servlet中处理的数据时遇到java错误   JavaSpring在运行时添加数据源   java使用一个类中另一个类的方法   java空值随Spring Rest资源更新   java Spring引导:为什么要使用OncePerRequestFilter?   java Android异步任务重用   java JTextField未按预期填充列?