Kafka消费者:如何在Python中从最后一条消息开始消费

9 投票
5 回答
20950 浏览
提问于 2025-04-18 12:41

我正在使用Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,有两个Kafka代理。当我运行我的Kafka消费者时,我可以看到它从队列中获取消息,并且能够跟踪两个代理的偏移量。一切都运行得很好!

我的问题是,当我重启消费者时,它会从头开始消费消息。我原本期待的是,重启后消费者能从它之前停止的地方继续消费消息。

我确实尝试过在Redis中跟踪消息的偏移量,然后在从队列读取消息之前调用consumer.seek,以确保我只获取那些之前没有见过的消息。虽然这样做有效,但在部署这个解决方案之前,我想先问问大家……也许我对Kafka或python-Kafka客户端有些误解。看起来消费者能够从停止的地方重新开始读取是一个相当基本的功能。

谢谢!

5 个回答

1

你只需要确保你的 Kafka 消费者从最新的位置开始读取数据(auto.offset.reset="latest")。同时,要定义一个消费者组,这样可以保存读取的位置,当消费者出现问题时,可以从上次保存的位置继续读取。


使用 confluent-kafka-python

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})

c.subscribe(['my_topic'])

使用 kafka-python

from kafka import KafkaConsumer


consumer = KafkaConsumer(
    'my_topic', 
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest', 
    enable_auto_commit=True,
    group_id='mygroup'
)
1

Kafka的消费者可以把处理进度(也就是偏移量)存储在Zookeeper里。在Java的API中,我们有两种选择:一种是高级消费者,它能帮我们管理状态,并且在重启后能从上次停止的地方继续消费;另一种是低级消费者,它没有这种能力。

根据我对Python消费者代码的理解,SimpleConsumer和MultiProcessConsumer都是有状态的,它们会在Zookeeper中记录当前的偏移量,所以你遇到的重复消费问题就有点奇怪了。

确保你在重启时使用相同的消费者组ID(可能你设置的是随机的?),并检查以下选项:

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
                     before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
                     wait before commit

你可能消费的消息少于100条,或者消费时间少于5000毫秒?

2

首先,你需要设置一个 group_id,这个ID用来记录你消费消息的位置,这样下次就可以从这个位置继续读取消息。

如果你已经把这个组里的所有消息都看完了,但又想再看一遍这些消息,你可以使用 seek 这个方法来实现。

下面是一个例子:

def test_consume_from_offset(offset):
    topic = 'test'
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)   # you can set the offset you want to resume from.
    for msg in consumer:
        # the msg begins with the offset you set
        print(msg)

test_consume_from_offset(10)
3

kafka-python这个库会把数据的偏移量存储在kafka服务器上,而不是单独使用zookeeper。可惜的是,支持提交和获取偏移量的kafka服务器接口在apache kafka 0.8.1.1之前并没有完全正常工作。如果你升级你的kafka服务器,应该就能正常使用了。我还建议你把kafka-python升级到0.9.4版本。

[kafka-python维护者]

6

使用kafka-python这个库时要小心,它有一些小问题。

如果你的消费者对速度要求不高,可以在每条消息中设置自动提交,这样应该可以正常工作。

SimpleConsumer提供了一个叫做seek的方法(点击这里查看),这个方法让你可以从你想要的任何位置开始消费消息。

最常用的调用方式有:

  • consumer.seek(0, 0):从队列的开头开始读取。
  • consumer.seek(0, 1):从当前的位置开始读取。
  • consumer.seek(0, 2):跳过所有未处理的消息,只读取新的消息。

第一个参数是一个偏移量,表示你想要的位置。比如,如果你调用consumer.seek(5, 0),那么你就会跳过队列中的前5条消息。

另外,别忘了,偏移量是为消费者组存储的。确保你一直使用同一个消费者组。

撰写回答