重新启动Kafka(python)使用者将再次使用队列中的所有消息

2024-05-16 23:19:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,我有两个卡夫卡经纪人设置。当我运行kafka消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都太好了!

我的问题是,当我重新启动消费者时,它从一开始就开始消费消息。我所期望的是,在重启时,消费者会开始从它死前停止的地方消费消息。

我确实尝试跟踪Redis中的消息偏移量,然后在从队列中读取消息之前调用consumer.seek,以确保我只获取了以前从未见过的消息。在部署这个解决方案之前,我想和你们确认一下。。。也许我对卡夫卡或者python卡夫卡客户端有点误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。

谢谢!


Tags: kafkaredis消息客户端代理队列consumer部署
3条回答

注意kafka python库。它有一些小问题。

如果速度对消费者来说不是问题,那么可以在每条消息中设置自动提交。它应该有用。

SimpleConsumer提供了一个seek方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185),允许您在任何需要的地方开始使用消息。

最常见的电话是:

  • consumer.seek(0, 0)从队列的开头开始读取。
  • consumer.seek(0, 1)从当前偏移量开始读取。
  • consumer.seek(0, 2)跳过所有挂起的消息并开始只读取新消息。

第一个参数是这些位置的偏移量。这样,如果您调用consumer.seek(5, 0),您将跳过队列中的前5条消息。

另外,不要忘记,偏移量是为消费者组存储的。一定要一直用同一个。

kafka python存储与kafka服务器的偏移量,而不是单独的zookeeper连接。不幸的是,直到apache kafka 0.8.1.1,支持提交/获取偏移量的kafka服务器api才完全起作用。如果您升级了kafka服务器,那么您的设置应该可以工作。我还建议将kafka python升级到0.9.4。

[卡夫卡python维护者]

卡夫卡消费者is able to store offsets in Zookeeper。在Java API中,我们有two options-高级使用者,它为我们管理状态,并在重新启动后开始使用它离开的位置,而无状态的低级使用者没有这个超级能力。

据我在Python的消费代码(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py)中的理解,SimpleConsumerMultiProcessConsumer都是有状态的,并跟踪Zookeeper中的当前偏移量,所以很奇怪您遇到了这个重新计算的问题。

确保重新启动时具有相同的使用者组ID(可能是随机设置的?)并选中以下选项:

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
                     before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
                     wait before commit

您可能正在使用<;100条消息或<;5000毫秒?

相关问题 更多 >