我试图通过每天0***使用合流的kafka Python包进行批处理etl。我知道我的流中有4个分区,但它可以更改,所以有没有办法检查特定主题中的分区总数? 我的消费者喜欢这样
from confluent_kafka import Consumer, KafkaError
messages = list()
partition_counter = 0
tnof_partition = 4
while True:
msg = self.consumer.poll(0.1)
if msg is None:
continue
elif not msg.error():
event = json.loads(msg.value().decode('utf-8'))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print("End of partition reached {0}/{1}"
.format(msg.topic(), msg.partition()))
partition_counter += 1
if(partition_counter == tnof_partition):
self.consumer.commit()
self.consumer.close()
break
如果您能展示实现批量消费的其他方法,我也将不胜感激。谢谢
消费者的
list_topics()
方法可以提供由TopicMetadata
组成的Topics
的映射,该映射最终包含partitions
参考:https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.list_topics
相关问题 更多 >
编程相关推荐