我使用的是Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,我有两个卡夫卡经纪人设置。当我运行kafka消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都太好了!
我的问题是,当我重新启动消费者时,它从一开始就开始消费消息。我所期望的是,在重启时,消费者会开始从它死前停止的地方消费消息。
我确实尝试跟踪Redis中的消息偏移量,然后在从队列中读取消息之前调用consumer.seek,以确保我只获取了以前从未见过的消息。在部署这个解决方案之前,我想和你们确认一下。。。也许我对卡夫卡或者python卡夫卡客户端有点误解。似乎消费者能够从中断的地方重新开始阅读是非常基本的功能。
谢谢!
注意kafka python库。它有一些小问题。
如果速度对消费者来说不是问题,那么可以在每条消息中设置自动提交。它应该有用。
SimpleConsumer提供了一个
seek
方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185),允许您在任何需要的地方开始使用消息。最常见的电话是:
consumer.seek(0, 0)
从队列的开头开始读取。consumer.seek(0, 1)
从当前偏移量开始读取。consumer.seek(0, 2)
跳过所有挂起的消息并开始只读取新消息。第一个参数是这些位置的偏移量。这样,如果您调用
consumer.seek(5, 0)
,您将跳过队列中的前5条消息。另外,不要忘记,偏移量是为消费者组存储的。一定要一直用同一个。
kafka python存储与kafka服务器的偏移量,而不是单独的zookeeper连接。不幸的是,直到apache kafka 0.8.1.1,支持提交/获取偏移量的kafka服务器api才完全起作用。如果您升级了kafka服务器,那么您的设置应该可以工作。我还建议将kafka python升级到0.9.4。
[卡夫卡python维护者]
卡夫卡消费者is able to store offsets in Zookeeper。在Java API中,我们有two options-高级使用者,它为我们管理状态,并在重新启动后开始使用它离开的位置,而无状态的低级使用者没有这个超级能力。
据我在Python的消费代码(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py)中的理解,SimpleConsumer和MultiProcessConsumer都是有状态的,并跟踪Zookeeper中的当前偏移量,所以很奇怪您遇到了这个重新计算的问题。
确保重新启动时具有相同的使用者组ID(可能是随机设置的?)并选中以下选项:
您可能正在使用<;100条消息或<;5000毫秒?
相关问题 更多 >
编程相关推荐