我试图以经典的推特流媒体为例来学习卡夫卡。我正在尝试使用我的制作人将基于2个过滤器的twitter数据流到同一主题的不同分区。例如,将带有track='Google'的twitter数据发送到一个分区,并将track='Apple'发送到另一个分区
class Producer(StreamListener):
def __init__(self, producer):
self.producer = producer
def on_data(self, data):
self.producer.send(topic_name, value=data)
return True
def on_error(self, error):
print(error)
twitter_stream = Stream(auth, Producer(producer))
twitter_stream.filter(track=["Google"])
如何添加另一个曲目并将该数据流到另一个分区
同样,如何让我的消费者从特定分区消费
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
auto_commit_interval_ms = 5000,
max_poll_records = 100,
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
经过一些研究,我能够解决这个问题:
在生产者端,指定分区:
在消费者方面,
Kafka在消息的键上对数据进行分区。在给定的代码中,您只将
value
传递给生产者消息,因此密钥将为null,因此将在所有分区之间进行循环请参阅卡夫卡库的文档,了解如何为每条消息提供密钥
相关问题 更多 >
编程相关推荐