使用python库检索kafka中的用户组偏移量

2024-04-16 05:58:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我有python脚本,需要使用kafka1代理集群检索从kafka主题读取的一组消费者的当前消费者组偏移量。这些是本地的kafka消费者,它们将偏移量存储在kafka集群中,而不是存储在zookeeper中。在

脚本本身不需要消耗任何消息,只需为其他使用者读取当前偏移量。我知道使用kafka-consumer-groups.sh可以做到这一点,但理想情况下,我希望避免依赖shell命令。在

我已经可以使用dpkp/kafka-python客户机来完成这项工作,但只能通过创建一个使用者并将其分配给组,然后通过取消分配一些分区来影响使用该组的现有使用者。我需要脚本是完全被动的,不执行任何会打断其他消费者的操作。在


Tags: kafka脚本消息代理主题consumersh使用者
2条回答

linkedin/kafka-tools有一个函数get_offsets_for_group(),用于获取组偏移量。它可以传递一个组名和主题名,或者只传递一个组名来检索该组的所有主题的已提交偏移量。在

from kafka.tools.client import Client

group='mygroup'

client=Client(broker_list='localhost:9029')
client.connect()

offsets=client.get_offsets_for_group(group)

for topic in offsets:
  for partition_offset in offsets[topic].partitions:
    print("group: {0} - topic: {1} - partition: {2}".format(group,topic,partition_offset))

使用dpkp/kafka-python,可以通过发送OffsetFetchRequest来检索特定组的已提交偏移量。如果使用OffsetFetchRequest_v3,则可以为topics参数传递None,以获取组已存储偏移量的所有主题/分区的偏移量。在

例如:

from kafka import BrokerConnection
from kafka.protocol.commit import *
import socket

group = 'mygroup'

bc = BrokerConnection('localhost', 9092, socket.AF_INET)
bc.connect_blocking()

fetch_offset_request = OffsetFetchRequest_v3(group, None)

future = bc.send(fetch_offset_request)
while not future.is_done:
    for resp, f in bc.recv():
        f.success(resp)

for topic in future.value.topics:
    print('offsets for {0}'.format(topic[0]))
    for partition in topic[1]:
        print('- partition {0}, offset: {1}'.format(partition[0], partition[1]))

如果mygroup已提交topictopic2的偏移量,它将打印如下内容:

^{pr2}$

相关问题 更多 >