每当Kafka中创建新主题时,我都尝试启动动态消费者,但动态启动的消费者总是缺少启动/第一条消息,而是从那里开始消费消息。我正在使用kafka python模块,并使用更新的KafkaConsumer和KafkaProducer。在
生产商代码为
producer = KafkaProducer(bootstrap_servers='localhost:9092')
record_metadata = producer.send(topic, data)
消费者的代码是
^{pr2}$请建议解决这个问题的方法,或者我必须在producer和consumer实例中包含的任何配置。在
你能把自动偏移重设为最早吗。在
创建新的使用者流时,它从最新偏移量(这是auto_offset_reset的默认值)开始,并且您将错过在使用者未启动时发送的消息。在
你可以在kafka python doc中阅读。相关部分如下
相关问题 更多 >
编程相关推荐