python-kafka包,用于heroku的kafka。

heroku-kafka的Python项目详细描述


英雄卡夫卡·CircleCI

这是一个非正式的包

heroku-kafka是一个python包,可以帮助您快速轻松地在heroku上安装kafka。有一个offical package可能更安全,但是它没有被正确地更新以支持python 3,并且似乎不再被维护。

安装

安装包的最简单方法是通过pip。

pip install heroku-kafka

用法

这个包使用kafka-python packageHerokuKafkaProducerHerokuKafkaConsumer类都继承自kafka python基类,并且将包含所有相同的方法。

注意:您可以在本地使用这个包,方法是使用相同的kafka设置一个.env文件 你在Heroku网站上的环境变量。

注意:要测试它是否在本地工作,我将安装heroku-kafka-util以便您可以看到正在发送的消息等。

生产者

fromheroku_kafkaimportHerokuKafkaProducer"""All the variable names here match the heroku env variable names.Just pass the env values straight in and it will work."""producer=HerokuKafkaProducer(url:KAFKA_URL,# Url string provided by herokussl_cert:KAFKA_CLIENT_CERT,# Client cert stringssl_key:KAFKA_CLIENT_CERT_KEY,# Client cert key stringssl_ca:KAFKA_TRUSTED_CERT,# Client trusted cert stringprefix:KAFKA_PREFIX# Prefix provided by heroku)"""The .send method will automatically prefix your topic with the KAFKA_PREFIXNOTE: If the message doesn't seem to be sending try `producer.flush()` to force send."""producer.send('topic_without_prefix',b"some message")

有关所有其他方法和属性,请参阅:KafkaProducer Docs

消费者

fromheroku_kafkaimportHerokuKafkaConsumer"""All the variable names here match the heroku env variable names,just pass the env values straight in and it will work.*topics are optional and you can pass as many as you want in for the consumer to track,however if you want to subscribe after creation just use .subscribe as shown below.Note: The KAFKA_PREFIX will be added on automatically so don't worry about passing it in."""consumer=HerokuKafkaConsumer('topic_without_prefix_1',# Optional: You don't need to pass any topic at all'topic_without_prefix_2',# You can list as many topics as you want to consumeurl:KAFKA_URL,# Url string provided by herokussl_cert:KAFKA_CLIENT_CERT,# Client cert stringssl_key:KAFKA_CLIENT_CERT_KEY,# Client cert key stringssl_ca:KAFKA_TRUSTED_CERT,# Client trusted cert stringprefix:KAFKA_PREFIX# Prefix provided by heroku)"""To subscribe to topic(s) after creating a consumer pass in a list of topics without theKAFKA_PREFIX."""consumer.subscribe(topics=('topic_without_prefix'))""".assign requires a full topic name with prefix"""consumer.assign([TopicPartition('topic_with_prefix',2)])"""Listening to events it is exactly the same as in kafka_python.Read the documention linked below for more info!"""formsginconsumer:print(msg)

有关所有其他方法和属性,请参阅:KafkaConsumer Docs

已知问题

  • .assign不添加主题前缀。
  • .namedtemporaryfile可能无法在Windows系统上正常工作

贡献

如果你遇到任何问题,请随意叉和创造公关!

设置

分叉回购,需要Docker

>>> git clone git@github.com:<fork-repo>.git
>>> cd <fork-repo>
>>> make dev-build

创建一个包含工作kafka信息的.env文件(目前包含两个工作主题)。

KAFKA_URL=
KAFKA_CLIENT_CERT=
KAFKA_CLIENT_CERT_KEY=
KAFKA_TRUSTED_CERT=
KAFKA_PREFIX=

TOPIC1=
TOPIC2=

注意:Docker读取.env文件的方式有点奇怪。变量值周围不能有引号,不能有新行,请用\n替换所有新行。

测试

检查包是否工作的唯一方法是运行测试。

请确保您编写的任何额外代码都附带了一个测试,它不需要超过顶部,而只需检查您所编写的工作。

目前所有的测试都需要一个工作的kafka设置,因为没有它们很难检查它是否正确连接。这意味着它还需要互联网连接。您可以从heroku的kafka环境变量中复制连接的详细信息-还要注意,您需要两个测试主题。

运行测试:

>>> make dev-test

分布

创建和上载包:

>>> make package
>>> make upload

注意:您需要登录到pip才能上载包。

https://packaging.python.org/tutorials/packaging-projects/

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Android Studio gradle版本错误   odata如何使用SAP Cloud SDK 3.2.0在Java中检索目标?   具有包含地址的路径的java执行进程   java日志中充斥着AWS Http调用   java notifyDataSetChanged不使用线程   java Android位图:中心裁剪+创建位图的圆形   在解组Json对象时,java“prolog中不允许内容”   java getResources()来自Android Studio中的枚举   java EclipseLink相当于Hibernate@naturaid   java如何用相似但不同的对象填充列表?   xml java SAXParser忽略异常并继续解析   如何从java控制台运行控制台应用程序?   java如何通过一个jsp中的链接将注册表切换为登录表单(或其他方式)?   这可能是java/lang/Runtime的用法。exec([Ljava/lang/String;)Ljava/lang/Process;可能容易受到命令注入的攻击   java这本教科书中的链表是否包含“递归构造函数”?   java将Arraylist<Integer>转换为char[]的最佳方式是什么