如何对来自未绑定Queu的消息进行多处理

2024-04-19 21:11:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在写一个Python应用程序。它用一个消费者读出卡夫卡的主题。对于每条消息,它会做一些事情,这些事情可能需要一段时间才能完成,然后再对下一条消息做一些事情。你知道吗

大多数使用多处理库的应用程序都需要传递一些有限的iterable来映射\u async或应用\u async。我用这两个函数来解决这个问题的尝试似乎不起作用,我想是因为在这个例子中我们的iterable是kafka主题,它是一个未绑定的队列。在这种情况下,有没有办法以非阻塞的方式“做一些事情”?你知道吗


Tags: kafka函数应用程序消息主题async队列方式
1条回答
网友
1楼 · 发布于 2024-04-19 21:11:05

您可以创建一个子进程并将消息传递给它以处理某些内容:

from confluent_kafka import Consumer, KafkaError
from multiprocessing import Process


def do_stuff(msg):
    my_stuff = 'is doing here as a non-blocking way'

c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    process = Process(target=do_stuff, args=(msg.value().decode('utf-8'), ))
    process.start()

c.close()

相关问题 更多 >