python-kafka包,用于heroku的kafka。
heroku-kafka的Python项目详细描述
英雄卡夫卡·
这是一个非正式的包
heroku-kafka是一个python包,可以帮助您快速轻松地在heroku上安装kafka。有一个offical package可能更安全,但是它没有被正确地更新以支持python 3,并且似乎不再被维护。
安装
安装包的最简单方法是通过pip。
pip install heroku-kafka
用法
这个包使用kafka-python package、HerokuKafkaProducer
和HerokuKafkaConsumer
类都继承自kafka python基类,并且将包含所有相同的方法。
注意:您可以在本地使用这个包,方法是使用相同的kafka设置一个.env文件 你在Heroku网站上的环境变量。
注意:要测试它是否在本地工作,我将安装heroku-kafka-util以便您可以看到正在发送的消息等。
生产者
fromheroku_kafkaimportHerokuKafkaProducer"""All the variable names here match the heroku env variable names.Just pass the env values straight in and it will work."""producer=HerokuKafkaProducer(url:KAFKA_URL,# Url string provided by herokussl_cert:KAFKA_CLIENT_CERT,# Client cert stringssl_key:KAFKA_CLIENT_CERT_KEY,# Client cert key stringssl_ca:KAFKA_TRUSTED_CERT,# Client trusted cert stringprefix:KAFKA_PREFIX# Prefix provided by heroku)"""The .send method will automatically prefix your topic with the KAFKA_PREFIXNOTE: If the message doesn't seem to be sending try `producer.flush()` to force send."""producer.send('topic_without_prefix',b"some message")
有关所有其他方法和属性,请参阅:KafkaProducer Docs。
消费者
fromheroku_kafkaimportHerokuKafkaConsumer"""All the variable names here match the heroku env variable names,just pass the env values straight in and it will work.*topics are optional and you can pass as many as you want in for the consumer to track,however if you want to subscribe after creation just use .subscribe as shown below.Note: The KAFKA_PREFIX will be added on automatically so don't worry about passing it in."""consumer=HerokuKafkaConsumer('topic_without_prefix_1',# Optional: You don't need to pass any topic at all'topic_without_prefix_2',# You can list as many topics as you want to consumeurl:KAFKA_URL,# Url string provided by herokussl_cert:KAFKA_CLIENT_CERT,# Client cert stringssl_key:KAFKA_CLIENT_CERT_KEY,# Client cert key stringssl_ca:KAFKA_TRUSTED_CERT,# Client trusted cert stringprefix:KAFKA_PREFIX# Prefix provided by heroku)"""To subscribe to topic(s) after creating a consumer pass in a list of topics without theKAFKA_PREFIX."""consumer.subscribe(topics=('topic_without_prefix'))""".assign requires a full topic name with prefix"""consumer.assign([TopicPartition('topic_with_prefix',2)])"""Listening to events it is exactly the same as in kafka_python.Read the documention linked below for more info!"""formsginconsumer:print(msg)
有关所有其他方法和属性,请参阅:KafkaConsumer Docs。
已知问题
.assign
不添加主题前缀。- .namedtemporaryfile可能无法在Windows系统上正常工作
贡献
如果你遇到任何问题,请随意叉和创造公关!
设置
分叉回购,需要Docker
>>> git clone git@github.com:<fork-repo>.git
>>> cd <fork-repo>
>>> make dev-build
创建一个包含工作kafka信息的.env文件(目前包含两个工作主题)。
KAFKA_URL=
KAFKA_CLIENT_CERT=
KAFKA_CLIENT_CERT_KEY=
KAFKA_TRUSTED_CERT=
KAFKA_PREFIX=
TOPIC1=
TOPIC2=
注意:Docker读取.env文件的方式有点奇怪。变量值周围不能有引号,不能有新行,请用\n
替换所有新行。
测试
检查包是否工作的唯一方法是运行测试。
请确保您编写的任何额外代码都附带了一个测试,它不需要超过顶部,而只需检查您所编写的工作。
目前所有的测试都需要一个工作的kafka设置,因为没有它们很难检查它是否正确连接。这意味着它还需要互联网连接。您可以从heroku的kafka环境变量中复制连接的详细信息-还要注意,您需要两个测试主题。
运行测试:
>>> make dev-test
分布
创建和上载包:
>>> make package >>> make upload
注意:您需要登录到pip才能上载包。
https://packaging.python.org/tutorials/packaging-projects/