apache kafka的纯python客户端

timi-kafka的Python项目详细描述


https://img.shields.io/badge/kafka-1.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svghttps://img.shields.io/pypi/pyversions/kafka-python.svghttps://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=githubhttps://travis-ci.org/dpkp/kafka-python.svg?branch=masterhttps://img.shields.io/badge/license-Apache%202-blue.svg

apache-kafka分布式流处理系统的python客户端。 kafka python的设计与官方java客户机非常相似,它使用 喷溅pythonic接口(例如,使用者迭代器)。

kafka python最适合用于较新的代理(0.9+),但向后兼容 旧版本(到0.8.0)。某些功能将仅在较新的代理上启用。 例如,完全协调的消费者群体——即动态分区 分配给同一组中的多个消费者-需要使用0.9+Kafka 经纪人。为早期的代理版本支持此功能需要 撰写和维护客户领导层选举和会员/健康 检查代码(可能使用zookeeper或consul)。对于老经纪人,你可以 通过手动为每个分区分配不同的分区来实现类似的功能 具有配置管理工具(如chef、ansible等)的消费者实例 尽管这种方法不支持在失败时重新平衡,但它仍将发挥良好的作用。 请参见<;https://kafka-python.readthedocs.io/en/master/compatibility.html>; 更多细节。

请注意,主分支可能包含未发布的功能。释放 文档,请参阅阅读文档和/或python的内联帮助。

>>> pip install kafka-python

卡夫卡康苏默

kafkaconsumer是一个高级消息使用者,打算以类似的方式操作 尽可能向官方java客户端发送。全力支持协调 消费者团体需要使用支持团体api的kafka代理:kafka v0.9+。

请参见<;https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>; 有关API和配置的详细信息。

consumer迭代器返回consumerrecords,这是简单的namedtuples 显示基本消息属性:主题、分区、偏移量、键和值:

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
...     print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
...     print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)
>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
...     print (msg.headers)
>>> # Get consumer metrics
>>> metrics = consumer.metrics()

卡夫卡制作人

kafkaproducer是一个高级的异步消息生成器。班级是 旨在尽可能类似于官方java客户机的操作。 请参见<;https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>; 更多细节。

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
...     producer.send('foobar', b'some_message_bytes')
>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
...     producer.send('foobar', b'msg %d' % i)
>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
>>> # Get producer performance metrics
>>> metrics = producer.metrics()

螺纹安全性

与 不能的卡夫卡。

虽然可以以线程本地方式使用kafkaconsumer, 建议进行多处理。

压缩

kafka python本机支持gzip压缩/解压。生产或消费lz4 压缩消息,您应该安装python-lz4(pip install lz4)。 要启用snappy压缩/解压缩,请安装python snappy(还需要snappy库)。 请参见<;https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>; 更多信息。

协议

kafka python的第二个目标是提供一个易于使用的协议层 通过python repl与kafka代理交互。这对 测试、探测和一般实验。协议支持是 用于启用kafkaclient。请检查 探测一个kafka代理并试图确定它运行的是哪个版本 (0.8.0至1.1+)。

低水平

对低级消费者和生产者类保持传统支持, simpleconsumer和simpleproducer。见 <;https://kafka-python.readthedocs.io/en/master/simple.html?highlight=SimpleProducer>;获取API详细信息。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java根据一些规则对地图进行排序   java需要关于突破游戏冲突bug的帮助   在Javaservlet中打印XSLT转换的XML文档   使用volatile'status flag'布尔值的java同步?   java Hibernate 4.2、JPA 2.0关系OnetoMany单向注释   在java中,如何在不使用for循环的情况下找到数组列表中第二高的数字?   排序Java多维数组   安装位置上的java Izpack toggel复选框   使用SQL Server 2008数据库交付Java应用程序   处理如何解决线程“动画线程”java中的异常。lang.NullPointerException?   安卓如何在Java中调用OpenSSL方法?   JAVA使用tomcat在jsf应用程序上运行lang.StackOverflowerError   java隐藏javafx Listview中的垂直滚动条   java如何使用Spring Boot在运行时提供静态资源?   java如何将@EJB与远程接口结合使用”   java哪个版本是Sun/Oracle提供的JavaEEAPIX的最新版本。jar和下载最新版本>7.0   java向JavaFX ToggleSwitch添加更改侦听器   保存在哪里以及如何处理Java应用程序的版本?   java AsyncTask执行doInBackground()时出错