如何检查卡夫卡中的分区数(合流卡夫卡)

2024-05-14 14:44:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过每天0***使用合流的kafka Python包进行批处理etl。我知道我的流中有4个分区,但它可以更改,所以有没有办法检查特定主题中的分区总数? 我的消费者喜欢这样

from confluent_kafka import Consumer, KafkaError

    messages = list()
    partition_counter = 0
    tnof_partition = 4

    while True:
        msg = self.consumer.poll(0.1)
        if msg is None:
            continue
        elif not msg.error():
            event = json.loads(msg.value().decode('utf-8'))

        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print("End of partition reached {0}/{1}"
                .format(msg.topic(), msg.partition()))
            
            partition_counter += 1
            if(partition_counter == tnof_partition):
                self.consumer.commit()
                self.consumer.close()
                break

如果您能展示实现批量消费的其他方法,我也将不胜感激。谢谢


Tags: kafkaselfifconsumercounteretlmsgerror

热门问题