如何使用Django通道多线程AsyncConsumer

2024-05-13 13:55:55 发布

您现在位置:Python中文网/ 问答频道 /正文

{Django>我已经和一些小虫子一起工作了一周。在

例如,我有一个MQTT客户机,它在接收到消息basic时在通道中发布。在

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

这很好。我可以发送多少我想要的,它会被发送到redis队列。到频道mqtt。在

然后运行worker,它将重定向mqtt队列中的消息:

^{pr2}$

这就是问题的开始。以下是异步使用者读取数据的内容:

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

我睡了一觉,以便模拟任务的执行情况。这就是我要说的:异步使用者不是多线程的!当我向通道发送两条消息时,消费者需要10秒来处理第二条消息,而不是多线程的5秒。如下所示。在

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

任何关于这一主题的信息都将是一个很大的帮助,提前谢谢!在

编辑:我发现管理它的唯一方法是创建一个executor,它将包含异步执行它的worker。但我不确定它在部署方面的效率

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

Tags: selfinfoeventlayer消息messageasyncdef
1条回答
网友
1楼 · 发布于 2024-05-13 13:55:55

这是目前设计的

Yes, that is the intended design, as it's the safest way (it prevents race conditions if you're not aware of it). If you are happy to run messages in parallel, just spin off your own coroutines whenever you need them (using asyncio.create_task), making sure that you clean them up and wait for them on shutdown. It's quite a lot of overhead, so hopefully we'll ship an opt-in mode for that in the consumer in future, but for now all we ship with is the safe option.

https://github.com/django/channels/issues/1203

相关问题 更多 >