使用python的Kafka消费者投票消息

2024-04-29 20:01:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我在消费者群中轮询来自卡夫卡的消息时遇到问题。 我的使用者对象分配给给定的分区

self.ps = TopicPartition(topic, partition )

之后,使用者分配给该分区:

self.consumer.assign([self.ps])

之后,我可以用

self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)

以及self.consumer.seek_to_end(self.ps) .....

在我的主题中有超过30000条信息。 问题是我只收到一条信息。

消费者配置: max_poll_records= 200AUTO_OFFSET_RESET是最早的

这是我的功能,我试图得到信息:

 def poll_messages(self):


    data = []

    messages = self.consumer.poll(timeout_ms=6000)


    for partition, msgs in six.iteritems(messages):

        for msg in msgs:

            data.append(msg)

    return data

即使在开始轮询消息之前转到第一个可用的偏移量 我只收到一条信息。

self.consumer.seek(self.ps, self.get_first_offset())

我希望有人能解释我做错了什么。 提前谢谢。

最良好的祝愿 约恩


Tags: toself信息消息fordataconsumer使用者
1条回答
网友
1楼 · 发布于 2024-04-29 20:01:52

我相信你误解了麦克斯民意测验记录-这并不意味着你每次民意测验能得到200分,只是你可能得到的最多的一个限制。您需要多次调用poll。我将向您介绍文档中的一些简单示例:http://kafka-python.readthedocs.io/en/master/usage.html

我认为一个更标准的实施方式是:

for message in self.consumer:
  # do stuff like:
  print(msg)

相关问题 更多 >