我正在尝试基于输入生成队列对象的不同组合,输入保存在varibalemode
中。当mode
设置为'distributed'
时,程序的行为是不可预测的。有时它会运行,有时它会给我一些奇怪的错误(TypeError('cannot unpack non-iterable NoneType object')
),但大多数情况下它只是在实例化data_queue1
、data_queue2
和output_queue
时挂起。(参见下面的代码。)
两个data_queue
对象由类ModeQueue
生成,该类使用Manager
来保存队列。output_queue
是在从BaseManager
继承的类QueueManager
的帮助下实例化的。必须从远程计算机访问此队列。你知道吗
import multiprocessing as mp
from multiprocessing.managers import BaseManager
from lib_a import ModeQueue
class QueueManager(BaseManager):
pass
def serve_queue(queue):
"""Start a queue server"""
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('localhost', 50000), authkey=b'key')
s = m.get_server()
s.serve_forever()
def distributed_queue():
"""Connect to distributed queue and return object"""
QueueManager.register('get_queue')
m = QueueManager(address=('localhost', 50000), authkey=b'key')
m.connect()
return m.get_queue()
def main():
mode = 'distributed'
if mode == 'distributed':
q = mp.Queue(maxsize=10)
q_server = mp.Process(target=serve_queue, args=(q,))
q_server.start()
data_queue1 = ModeQueue(mode, 20)
data_queue2 = ModeQueue(mode, 10)
output_queue = distributed_queue()
else:
pass
# more code...
if __name__ == '__main__':
main()
# lib_a
class ModeQueue:
def __new__(cls, mode, maxsize):
from multiprocessing import Manager
q = Manager().Queue(maxsize=maxsize)
setattr(q, 'mode', mode)
setattr(q, 'maxsize', maxsize)
return q
我真的不明白这种不可预知的行为从何而来。我试过用大量的调试日志记录查找错误,但每次程序冻结时,都不会将任何输出写入我的日志。你能指出一个错误或解释为什么程序不能按预期工作吗?你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐