Python中文
首页
教程
问答
标签
搜索
登录
注册
Python生成不同的Kafka分区
回答此问题可获得
20
贡献值,回答如果被采纳可获得
50
分。
<p>我试图以经典的推特流媒体为例来学习卡夫卡。我正在尝试使用我的制作人将基于2个过滤器的twitter数据流到同一主题的不同分区。例如,将带有track='Google'的twitter数据发送到一个分区,并将track='Apple'发送到另一个分区</p> <pre><code>class Producer(StreamListener): def __init__(self, producer): self.producer = producer def on_data(self, data): self.producer.send(topic_name, value=data) return True def on_error(self, error): print(error) twitter_stream = Stream(auth, Producer(producer)) twitter_stream.filter(track=["Google"]) </code></pre> <p>如何添加另一个曲目并将该数据流到另一个分区</p> <p>同样,如何让我的消费者从特定分区消费</p> <pre><code>consumer = KafkaConsumer( topic_name, bootstrap_servers=['localhost:9092'], auto_offset_reset='latest', enable_auto_commit=True, auto_commit_interval_ms = 5000, max_poll_records = 100, value_deserializer=lambda x: json.loads(x.decode('utf-8'))) </code></pre>
0 条评论
分类:
Python问答
请先
登录
后评论
默认排序
时间排序
1 个回答
匿名
1天前
擅长:python、mysql、java
<p>Kafka在消息的键上对数据进行分区。在给定的代码中,您只将<code>value</code>传递给生产者消息,因此密钥将为null,因此将在所有分区之间进行循环</p> <p>请参阅卡夫卡库的文档,了解如何为每条消息提供密钥</p>
请先
登录
后评论
针对此问题:
更多的回答
关注
89
关注
收藏
1
收藏,
216
浏览
网友 提问于 2天前
相关Python问题
将Pandas数据帧转换为PyTorch张量?
5 回答
将Pandas数据帧转换为scipy稀疏矩阵
3 回答
将Pandas数据帧转换为Spark Datafram时出现问题
4 回答
将pandas数据帧转换为spark DataFram时出错
8 回答
将Pandas数据帧转换为spark datafram时收到错误
8 回答
将Pandas数据帧转换为Spark数据帧
7 回答
将Pandas数据帧转换为Tensorflow数据
8 回答
将Pandas数据帧转换为tkinter obj
6 回答
将pandas数据帧转换为XML
4 回答
将Pandas数据帧转换为值sql语句
8 回答
将pandas数据帧转换为元组
5 回答
将pandas数据帧转换为元组列表
3 回答
将pandas数据帧转换为元组列表并删除所有pandas数据类型
1 回答
将pandas数据帧转换为具有头和数据类型的numpy数组
10 回答
将pandas数据帧转换为内存中的拼花地板,并将其加载到Python中的Hadoop中
2 回答
将pandas数据帧转换为内存中类似文件的对象?
4 回答
将Pandas数据帧转换为内存功能(&F)
5 回答
将pandas数据帧转换为列表列表
10 回答
将pandas数据帧转换为列表列表以输入到RNN
4 回答
将Pandas数据帧转换为单行DataFram
2 回答