{Django>我已经和一些小虫子一起工作了一周。在
例如,我有一个MQTT客户机,它在接收到消息basic时在通道中发布。在
async def treat_message(msg):
channel_layer = get_channel_layer()
payload = json.loads(msg.payload, encoding="utf-8")
await channel_layer.send("mqtt", {
"type": "value.change",
"message": payload
})
这很好。我可以发送多少我想要的,它会被发送到redis队列。到频道mqtt
。在
然后运行worker,它将重定向mqtt
队列中的消息:
这就是问题的开始。以下是异步使用者读取数据的内容:
class MQTTConsumer(AsyncConsumer):
async def value_change(self, event):
await asyncio.sleep(5)
print("I received changes : {}".format(event["message"]))
我睡了一觉,以便模拟任务的执行情况。这就是我要说的:异步使用者不是多线程的!当我向通道发送两条消息时,消费者需要10秒来处理第二条消息,而不是多线程的5秒。如下所示。在
2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}
任何关于这一主题的信息都将是一个很大的帮助,提前谢谢!在
编辑:我发现管理它的唯一方法是创建一个executor,它将包含异步执行它的worker。但我不确定它在部署方面的效率
def handle_mqtt(event):
time.sleep(3)
logger.info("I received changes : {}".format(event["message"]))
class MQTTConsumer(AsyncConsumer):
def __init__(self, scope):
super().__init__(scope)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
async def value_change(self, event):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(self.executor, handle_mqtt, event)
这是目前设计的
https://github.com/django/channels/issues/1203
相关问题 更多 >
编程相关推荐