apache kafka的纯python客户端
timi-kafka的Python项目详细描述
apache-kafka分布式流处理系统的python客户端。 kafka python的设计与官方java客户机非常相似,它使用 喷溅pythonic接口(例如,使用者迭代器)。
kafka python最适合用于较新的代理(0.9+),但向后兼容 旧版本(到0.8.0)。某些功能将仅在较新的代理上启用。 例如,完全协调的消费者群体——即动态分区 分配给同一组中的多个消费者-需要使用0.9+Kafka 经纪人。为早期的代理版本支持此功能需要 撰写和维护客户领导层选举和会员/健康 检查代码(可能使用zookeeper或consul)。对于老经纪人,你可以 通过手动为每个分区分配不同的分区来实现类似的功能 具有配置管理工具(如chef、ansible等)的消费者实例 尽管这种方法不支持在失败时重新平衡,但它仍将发挥良好的作用。 请参见<;https://kafka-python.readthedocs.io/en/master/compatibility.html>; 更多细节。
请注意,主分支可能包含未发布的功能。释放 文档,请参阅阅读文档和/或python的内联帮助。
>>> pip install kafka-python
卡夫卡康苏默
kafkaconsumer是一个高级消息使用者,打算以类似的方式操作 尽可能向官方java客户端发送。全力支持协调 消费者团体需要使用支持团体api的kafka代理:kafka v0.9+。
请参见<;https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>; 有关API和配置的详细信息。
consumer迭代器返回consumerrecords,这是简单的namedtuples 显示基本消息属性:主题、分区、偏移量、键和值:
>>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic') >>> for msg in consumer: ... print (msg)
>>> # join a consumer group for dynamic partition assignment and offset commits >>> from kafka import KafkaConsumer >>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') >>> for msg in consumer: ... print (msg)
>>> # manually assign the partition list for the consumer >>> from kafka import TopicPartition >>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') >>> consumer.assign([TopicPartition('foobar', 2)]) >>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values >>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) >>> consumer.subscribe(['msgpackfoo']) >>> for msg in consumer: ... assert isinstance(msg.value, dict)
>>> # Access record headers. The returned value is a list of tuples >>> # with str, bytes for key and value >>> for msg in consumer: ... print (msg.headers)
>>> # Get consumer metrics >>> metrics = consumer.metrics()
卡夫卡制作人
kafkaproducer是一个高级的异步消息生成器。班级是 旨在尽可能类似于官方java客户机的操作。 请参见<;https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>; 更多细节。
>>> from kafka import KafkaProducer >>> producer = KafkaProducer(bootstrap_servers='localhost:1234') >>> for _ in range(100): ... producer.send('foobar', b'some_message_bytes')
>>> # Block until a single message is sent (or timeout) >>> future = producer.send('foobar', b'another_message') >>> result = future.get(timeout=60)
>>> # Block until all pending messages are at least put on the network >>> # NOTE: This does not guarantee delivery or success! It is really >>> # only useful if you configure internal batching using linger_ms >>> producer.flush()
>>> # Use a key for hashed-partitioning >>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages >>> import json >>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) >>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys >>> producer = KafkaProducer(key_serializer=str.encode) >>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages >>> producer = KafkaProducer(compression_type='gzip') >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i)
>>> # Include record headers. The format is list of tuples with string key >>> # and bytes value. >>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
>>> # Get producer performance metrics >>> metrics = producer.metrics()
螺纹安全性
与 不能的卡夫卡。
虽然可以以线程本地方式使用kafkaconsumer, 建议进行多处理。
压缩
kafka python本机支持gzip压缩/解压。生产或消费lz4 压缩消息,您应该安装python-lz4(pip install lz4)。 要启用snappy压缩/解压缩,请安装python snappy(还需要snappy库)。 请参见<;https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>; 更多信息。
协议
kafka python的第二个目标是提供一个易于使用的协议层 通过python repl与kafka代理交互。这对 测试、探测和一般实验。协议支持是 用于启用kafkaclient。请检查 探测一个kafka代理并试图确定它运行的是哪个版本 (0.8.0至1.1+)。
低水平
对低级消费者和生产者类保持传统支持, simpleconsumer和simpleproducer。见 <;https://kafka-python.readthedocs.io/en/master/simple.html?highlight=SimpleProducer>;获取API详细信息。