我在不同的服务器上部署了Kafka和python使用者。我正在使用GCP VM实例进行部署
Python脚本在循环中运行,并与kafka一起检查它是否有作业。如果它找到了一个作业,它将启动操作,否则它将继续与卡夫卡进行检查
定期在卡夫卡中创建条目时,脚本工作正常,但在一定时间内没有条目添加到卡夫卡队列时,退出。因此,当我将某个内容推入kafka队列时,python脚本无法检测到它,因为它已经退出
下面是在python上运行的代码,用于不断检查kafka是否有新作业:
consumer = KafkaConsumer(bootstrap_servers=['ip-address'], group_id='grp1',
value_deserializer=lambda m: json.loads(m.decode('utf8')),
partition_assignment_strategy=[RoundRobinPartitionAssignor])
listener = Listener(consumer1)
consumer1.subscribe(['topic'], listener=listener)
while True:
messages = consumer1.poll(100, max_records=1)
for tp, msg1 in messages.items():
# Check partition(s)
print(tp)
print(msg1[0])
print("consumer1")
print(tp.partition)
print(msg1[0].value)
listener.add_offset(tp.topic, tp.partition, msg1[0].offset)
main() # does some work
print("done")
有人能提出问题是什么吗
目前没有回答
相关问题 更多 >
编程相关推荐