我有python脚本,需要使用kafka1代理集群检索从kafka主题读取的一组消费者的当前消费者组偏移量。这些是本地的kafka消费者,它们将偏移量存储在kafka集群中,而不是存储在zookeeper中。在
脚本本身不需要消耗任何消息,只需为其他使用者读取当前偏移量。我知道使用kafka-consumer-groups.sh
可以做到这一点,但理想情况下,我希望避免依赖shell命令。在
我已经可以使用dpkp/kafka-python
客户机来完成这项工作,但只能通过创建一个使用者并将其分配给组,然后通过取消分配一些分区来影响使用该组的现有使用者。我需要脚本是完全被动的,不执行任何会打断其他消费者的操作。在
linkedin/kafka-tools
有一个函数get_offsets_for_group()
,用于获取组偏移量。它可以传递一个组名和主题名,或者只传递一个组名来检索该组的所有主题的已提交偏移量。在使用
dpkp/kafka-python
,可以通过发送OffsetFetchRequest
来检索特定组的已提交偏移量。如果使用OffsetFetchRequest_v3
,则可以为topics参数传递None
,以获取组已存储偏移量的所有主题/分区的偏移量。在例如:
如果
^{pr2}$mygroup
已提交topic
和topic2
的偏移量,它将打印如下内容:相关问题 更多 >
编程相关推荐