当kafka队列中超过一个晚上没有任何条目时,服务器上的Python使用者脚本将退出

2024-04-24 05:22:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我在不同的服务器上部署了Kafka和python使用者。我正在使用GCP VM实例进行部署

Python脚本在循环中运行,并与kafka一起检查它是否有作业。如果它找到了一个作业,它将启动操作,否则它将继续与卡夫卡进行检查

定期在卡夫卡中创建条目时,脚本工作正常,但在一定时间内没有条目添加到卡夫卡队列时,退出。因此,当我将某个内容推入kafka队列时,python脚本无法检测到它,因为它已经退出

下面是在python上运行的代码,用于不断检查kafka是否有新作业:

    consumer = KafkaConsumer(bootstrap_servers=['ip-address'], group_id='grp1',
                              value_deserializer=lambda m: json.loads(m.decode('utf8')),
                              partition_assignment_strategy=[RoundRobinPartitionAssignor])
    listener = Listener(consumer1)
    consumer1.subscribe(['topic'], listener=listener)
    while True:
        messages = consumer1.poll(100, max_records=1)
        for tp, msg1 in messages.items():
            # Check partition(s)
            print(tp)
            print(msg1[0])
            print("consumer1")
            print(tp.partition)
            print(msg1[0].value)
            listener.add_offset(tp.topic, tp.partition, msg1[0].offset)
            main() # does some work
            print("done")

有人能提出问题是什么吗