Python生成不同的Kafka分区

2024-03-29 21:36:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图以经典的推特流媒体为例来学习卡夫卡。我正在尝试使用我的制作人将基于2个过滤器的twitter数据流到同一主题的不同分区。例如,将带有track='Google'的twitter数据发送到一个分区,并将track='Apple'发送到另一个分区

class Producer(StreamListener):
    def __init__(self, producer):
        self.producer = producer

    def on_data(self, data):
        self.producer.send(topic_name, value=data)
        return True

    def on_error(self, error):
        print(error)


twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])

如何添加另一个曲目并将该数据流到另一个分区

同样,如何让我的消费者从特定分区消费

consumer = KafkaConsumer(
    topic_name,
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='latest',
     enable_auto_commit=True,
     auto_commit_interval_ms =  5000,
     max_poll_records = 100,
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

Tags: producernameselfautodatatopicondef
2条回答

经过一些研究,我能够解决这个问题:

在生产者端,指定分区:

self.producer.send(topic_name, value=data,partition=0)

在消费者方面,

consumer = KafkaConsumer(
       bootstrap_servers=['localhost:9092'],
     auto_offset_reset='latest',
     enable_auto_commit=True,
     auto_commit_interval_ms =  5000,
     max_poll_records = 100,
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.assign([TopicPartition('trial', 0)])

Kafka在消息的键上对数据进行分区。在给定的代码中,您只将value传递给生产者消息,因此密钥将为null,因此将在所有分区之间进行循环

请参阅卡夫卡库的文档,了解如何为每条消息提供密钥

相关问题 更多 >