我在做Python-Kafka-consumer(尝试在http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html中使用Kafka.consumer.SimpleConsumer或Kafka.consumer.simple.SimpleConsumer)。当我运行下面的代码时,它将一直运行,即使所有消息都已消耗。如果消费者消费了所有信息,我希望他们会停止。怎么做?我也不知道如何使用stop()函数(在基类kafka.consumer.base.consumer中)。
更新
我使用信号处理程序调用consumer.stop()。一些错误信息被打印到屏幕上。但是这个程序仍然停留在for循环中。当新的信息出现时,消费者消费并打印出来。我还尝试了client.close()。但同样的结果。
我需要一些方法来优雅地停止for循环。
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
欢迎任何帮助。谢谢。
我们可以先检查主题中最后一条消息的偏移量。 当我们到达那个偏移量时,停止循环。
使用iter_timeout参数设置等待时间。如果设置为10,就像下面的代码一样,如果10秒内没有新消息出现,它将退出。默认值为None,这意味着即使没有新消息传入,使用者也将在此阻塞。
更新
以上不是一个好方法。当大量消息进入时,很难设置足够小的iter_超时来保证停止。所以,现在,我使用get_message()函数,它尝试使用一条消息并停止。没有新消息时不返回任何消息。
类似于Mohit的答案,但是使用了消费者的
end_offsets
函数。相关问题 更多 >
编程相关推荐