from confluent_kafka import Consumer, KafkaError
from multiprocessing import Process
def do_stuff(msg):
my_stuff = 'is doing here as a non-blocking way'
c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
process = Process(target=do_stuff, args=(msg.value().decode('utf-8'), ))
process.start()
c.close()
您可以创建一个子进程并将消息传递给它以处理某些内容:
相关问题 更多 >
编程相关推荐