将multiprocessing.Queue导出为列表
我想把一个 multiprocessing.Queue
的内容放到一个列表里。为此,我写了下面这个函数:
import Queue
def dump_queue(queue):
"""
Empties all pending items in a queue and returns them in a list.
"""
result = []
# START DEBUG CODE
initial_size = queue.qsize()
print("Queue has %s items initially." % initial_size)
# END DEBUG CODE
while True:
try:
thing = queue.get(block=False)
result.append(thing)
except Queue.Empty:
# START DEBUG CODE
current_size = queue.qsize()
total_size = current_size + len(result)
print("Dumping complete:")
if current_size == initial_size:
print("No items were added to the queue.")
else:
print("%s items were added to the queue." % \
(total_size - initial_size))
print("Extracted %s items from the queue, queue has %s items \
left" % (len(result), current_size))
# END DEBUG CODE
return result
但是不知道为什么它不管用。
看看下面这个命令行的操作:
>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
... q.put([range(200) for j in range(100)])
...
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>>
这到底是怎么回事?为什么不是所有的项目都被放进去了?
2 个回答
# in theory:
def dump_queue(q):
q.put(None)
return list(iter(q.get, None))
# in practice this might be more resilient:
def dump_queue(q):
q.put(None)
return list(iter(lambda : q.get(timeout=0.00001), None))
# but neither case handles all the ways things can break
# for that you need 'managers' and 'futures' ... see Commentary
我更喜欢用 None
作为哨兵值,但我同意 jnoller 的看法,mp.queue 确实需要一个安全简单的哨兵。他提到的关于提早获取空值的风险也是有道理的,见下文。
评论:
这段内容虽然有点老,但 Python 已经改变了。不过,如果你在使用列表和队列时遇到问题,这个话题还是会被提到。所以,我们来深入了解一下:
首先,这不是一个错误,而是一个特性:https://bugs.python.org/issue20147。为了节省你阅读讨论和更多细节的时间,这里有一些重点(虽然有点哲学,但我觉得对刚开始接触 Python 的人可能有帮助):
- MP 队列是可以在不同线程、同一系统上不同进程之间进行通信的结构,实际上还可以在不同的(联网)计算机上使用。
- 一般来说,在并行或分布式系统中,严格同步是很耗资源的,所以每次使用 MP/MT 数据结构的 API 时,你都需要查看文档,看看它承诺做什么,或者不做什么。提示:如果一个函数没有包含“锁”、“信号量”或“屏障”等字眼,那么它的行为可能是“异步”和“尽力而为”(近似),或者你可以称之为“脆弱”。
- 具体到这个情况:Python 是一种解释型语言,只有一个著名的解释器线程,还有它著名的“全局解释器锁”(GIL)。如果你的整个程序是单进程、单线程的,那一切都很好。如果不是(在 MP 中这显然不是),你需要给解释器一些喘息的空间。time.sleep() 是你的好朋友。在这种情况下,就是设置超时。
在你的解决方案中,你只使用了脆弱的函数 - get() 和 qsize()。而且代码实际上比你想象的还要糟糕 - 如果你增加队列的大小和对象的大小,你可能会出问题:
现在,你可以使用脆弱的例程,但你需要给它们留出空间。在你的例子中,你只是不断地冲击那个队列。你只需要把这一行 thing = queue.get(block=False)
改成 thing = queue.get(block=True,timeout=0.00001)
,这样就可以了。
时间 0.00001 是经过仔细选择的(10^-5),这是你可以安全设置的最小值(这就是艺术与科学的结合)。
关于为什么需要超时的几点说明:这与 MP 队列的内部工作原理有关。当你把东西放入 MP 队列时,它实际上并没有立即放入队列,而是排队等待最终放入。这就是为什么 qsize()
能给你正确结果的原因 - 代码的那部分知道队列中有一堆东西“在等待”。你只需要明白,队列中的一个对象并不等于“我现在可以读取它”。可以把 MP 队列想象成用 USPS 或 FedEx 发送的信件 - 你可能有收据和追踪号码显示“它在邮寄中”,但收件人还不能打开它。更具体地说,在你的情况下,你立即得到的可访问项目是 '0'。这是因为你运行的单个解释器线程没有机会处理“排队”的东西,所以你的第一个循环只是把一堆东西排队,但你立即强迫你的单线程尝试执行 get(),而它甚至还没有机会为你排队一个对象。
有人可能会争辩说,设置这些超时会让代码变慢。其实并不会 - MP 队列是重量级的结构,你应该只在传递比较重的“东西”时使用它们,无论是大块数据,还是至少复杂的计算。添加 10^-5 秒的作用实际上是给解释器一个进行线程调度的机会 - 到那时它会看到你排队的 put()
操作。
注意事项
以上内容并不完全正确,这(可以说)是 get() 函数设计上的一个问题。设置超时为非零的语义是,get() 函数在返回 Empty 之前不会阻塞超过这个时间。但它可能实际上还没有 Empty(还没到)。所以如果你知道你的队列中有一堆东西可以获取,那么上面的第二个解决方案效果更好,或者使用更长的超时。就我个人而言,我认为他们应该保持 timeout=0 的行为,但应该有一些实际的容忍度,比如 1e-5,因为很多人会对 MP 结构的 get 和 put 之间可能发生的事情感到困惑。
在你的示例代码中,你实际上并没有启动并行进程。如果我们这样做,那么你会开始得到一些随机结果 - 有时只有部分队列对象会被移除,有时会挂起,有时会崩溃,有时会发生多种情况。在下面的例子中,一个进程崩溃,另一个挂起:
根本问题是,当你插入哨兵时,你需要知道队列已经完成。这应该作为队列逻辑的一部分来处理 - 如果你有一个经典的主-工作者设计,那么主进程在最后一个任务添加后需要推送一个哨兵(结束)。否则你会遇到竞争条件。
import multiprocessing
import concurrent.futures
def fill_queue(q):
for i in range(5000):
q.put([range(200) for j in range(100)])
def dump_queue(q):
q.put(None)
return list(iter(q.get, None))
with multiprocessing.Manager() as manager:
q = manager.Queue()
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.submit(fill_queue, q) # add stuff
executor.submit(fill_queue, q) # add more stuff
executor.submit(fill_queue, q) # ... and more
# 'step out' of the executor
l = dump_queue(q)
# 'step out' of the manager
print(f"Saw {len(l)} items")
让管理器处理你的 MP 结构(队列、字典等),在此基础上让未来处理你的进程(如果需要,还可以让另一个未来处理线程)。这样可以确保在你“解开”工作时,所有东西都能得到清理。
试试这个:
import Queue
import time
def dump_queue(queue):
"""
Empties all pending items in a queue and returns them in a list.
"""
result = []
for i in iter(queue.get, 'STOP'):
result.append(i)
time.sleep(.1)
return result
import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)
多进程队列有一个内部缓冲区,这里有一个线程负责从缓冲区取出任务并把它们送到管道里。如果不是所有的对象都已经被送走,我可能会遇到提早出现“空”的情况。使用一个哨兵值来表示队列的结束是安全且可靠的。而且,使用 iter(get, sentinel) 这种写法比单纯依赖“空”要好。
我不喜欢因为发送的时机问题而导致出现“空”的情况(我加了 time.sleep(.1) 是为了让线程有机会切换到那个负责发送的线程,你可能不需要这个,它在没有这个的情况下也能工作——这是我释放全局解释器锁的习惯)。