启动Python队列时的不可预测行为

2024-04-19 09:01:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试基于输入生成队列对象的不同组合,输入保存在varibalemode中。当mode设置为'distributed'时,程序的行为是不可预测的。有时它会运行,有时它会给我一些奇怪的错误(TypeError('cannot unpack non-iterable NoneType object')),但大多数情况下它只是在实例化data_queue1data_queue2output_queue时挂起。(参见下面的代码。)

两个data_queue对象由类ModeQueue生成,该类使用Manager来保存队列。output_queue是在从BaseManager继承的类QueueManager的帮助下实例化的。必须从远程计算机访问此队列。你知道吗

import multiprocessing as mp
from multiprocessing.managers import BaseManager

from lib_a import ModeQueue


class QueueManager(BaseManager):
    pass


def serve_queue(queue):
    """Start a queue server"""
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('localhost', 50000), authkey=b'key')
    s = m.get_server()
    s.serve_forever()


def distributed_queue():
    """Connect to distributed queue and return object"""
    QueueManager.register('get_queue')
    m = QueueManager(address=('localhost', 50000), authkey=b'key')
    m.connect()
    return m.get_queue()


def main():
    mode = 'distributed'
    if mode == 'distributed':
        q = mp.Queue(maxsize=10)
        q_server = mp.Process(target=serve_queue, args=(q,))
        q_server.start()

        data_queue1 = ModeQueue(mode, 20)
        data_queue2 = ModeQueue(mode, 10)
        output_queue = distributed_queue()
    else:
        pass
    # more code...


if __name__ == '__main__':
    main()
# lib_a
class ModeQueue:
    def __new__(cls, mode, maxsize):
        from multiprocessing import Manager
        q = Manager().Queue(maxsize=maxsize)
        setattr(q, 'mode', mode)
        setattr(q, 'maxsize', maxsize)
        return q

我真的不明白这种不可预知的行为从何而来。我试过用大量的调试日志记录查找错误,但每次程序冻结时,都不会将任何输出写入我的日志。你能指出一个错误或解释为什么程序不能按预期工作吗?你知道吗


Tags: import程序outputdatagetserver队列queue