擅长:python、mysql、java
<p>使用<code>dpkp/kafka-python</code>,可以通过发送<code>OffsetFetchRequest</code>来检索特定组的已提交偏移量。如果使用<code>OffsetFetchRequest_v3</code>,则可以为topics参数传递<code>None</code>,以获取组已存储偏移量的所有主题/分区的偏移量。在</p>
<p>例如:</p>
<pre class="lang-python prettyprint-override"><code>from kafka import BrokerConnection
from kafka.protocol.commit import *
import socket
group = 'mygroup'
bc = BrokerConnection('localhost', 9092, socket.AF_INET)
bc.connect_blocking()
fetch_offset_request = OffsetFetchRequest_v3(group, None)
future = bc.send(fetch_offset_request)
while not future.is_done:
for resp, f in bc.recv():
f.success(resp)
for topic in future.value.topics:
print('offsets for {0}'.format(topic[0]))
for partition in topic[1]:
print('- partition {0}, offset: {1}'.format(partition[0], partition[1]))
</code></pre>
<p>如果<code>mygroup</code>已提交<code>topic</code>和<code>topic2</code>的偏移量,它将打印如下内容:</p>
^{pr2}$