用Futu包装队列

2024-05-16 18:06:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用python3.7编写一个Tornado web服务器,以显示multiprocessing库运行的进程的状态。你知道吗

下面的代码可以工作,但我希望能够使用Tornado的内置库而不是在线程库中进行黑客攻击。我还没有弄清楚如何在queue.get期间不阻止龙卷风。我认为正确的解决方案是将get调用打包到将来的某个地方。我试了好几个小时了,但还没想好怎么做。你知道吗

在我的多处理脚本中:

class ProcessToMonitor(multiprocessing.Process)

def __init__(self):
    multiprocessing.Process.__init__(self)
    self.queue = multiprocessing.Queue()

def run():
    while True:
        # do stuff
        self.queue.put(value)

然后,在我的龙卷风剧本里

class MyWebSocket(tornado.websocket.WebSocketHandler):
    connections = set()

    def open(self):
        self.connections.add(self)

    def close(self):
        self.connections.remove(self)

    @classmethod
    def emit(self, message):
        [client.write_message(message) for client in self.connections]

def worker():
    ptm = ProcessToMonitor()
    ptm.start()
    while True:
        message = ptm.queue.get()
        MyWebSocket.emit(message)

if __name__ == '__main__':
    app = tornado.web.Application([
        (r'/', MainHandler), # Not shown
        (r'/websocket', MyWebSocket)
    ])
    app.listen(8888)

    threading.Thread(target=worker)

    ioloop = tornado.ioloop.IOLoop.current()
    ioloop.start()

Tags: selfwebmessagegetqueuedefconnectionsmultiprocessing
1条回答
网友
1楼 · 发布于 2024-05-16 18:06:24

queue.get不是阻塞函数,它只是等待队列中出现一个项目,以防队列为空。我可以从您的代码中看出,queue.get非常适合while循环中的用例。你知道吗

我想你可能用错了。必须使worker函数成为协同程序(async/await语法):

async def worker():
    ...
    while True:
        message = await queue.get()
        ...

但是,如果您不想等待某个项并希望立即继续,那么它的替代方法是^{}。你知道吗

这里需要注意的一点是,如果队列为空,queue.get_nowait将引发名为^{}的异常。所以,你需要处理这个异常。你知道吗

示例:

while True:
    try:
        message = queue.get_nowait()
    except QueueEmpty:
        # wait for some time before
        # next iteration
        # otherwise this loop will
        # keep running for no reason

    MyWebSocket.emit(message)

如您所见,如果队列为空,您将不得不使用pause-while循环一段时间,以防止它淹没系统。你知道吗

那么为什么不首先使用queue.get?你知道吗

相关问题 更多 >