擅长:python、mysql、java
<p>我们可以先检查主题中最后一条消息的偏移量。
当我们到达那个偏移量时,停止循环。</p>
<pre><code> client = "localhost:9092"
consumer = KafkaConsumer(client)
topic = 'test'
tp = TopicPartition(topic,0)
#register to the topic
consumer.assign([tp])
# obtain the last offset value
consumer.seek_to_end(tp)
lastOffset = consumer.position(tp)
consumer.seek_to_beginning(tp)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
if message.offset == lastOffset - 1:
break
</code></pre>