使用multiprocessing.Queue的分层数据加载有时会导致项目被错序消费
我正在写一个脚本,用来动画处理图像数据。我有一些大的图像立方体(可以理解为三维数组)。对于每一个立方体,我会逐帧处理,当快到最后一帧时,我就会加载下一个立方体,继续动画。由于每个立方体都很大,加载时间比较长(大约5秒)。我希望在立方体之间的动画过渡能够无缝衔接(同时也要节省内存),所以我在加载过程中采取了错开处理的方式。我在解决这个问题上已经有了一些进展,但仍然存在一些问题。
下面的代码会加载每个数据立方体,把它分成帧,然后放入一个 multiprocessing.Queue
中。一旦队列中的帧数低于某个阈值,就会触发下一个加载过程,加载另一个立方体并将其解包到队列中。
看看下面的代码:
import numpy as np
import multiprocessing as mp
import logging
logger = mp.log_to_stderr(logging.INFO)
import time
def data_loader(event, queue, **kw):
'''loads data from 3D image cube'''
event.wait() #wait for trigger before loading
logger.info( 'Loading data' )
time.sleep(3) #pretend to take long to load the data
n = 100
data = np.ones((n,20,20))*np.arange(n)[:,None,None] #imaginary 3D image cube (increasing numbers so that we can track the data ordering)
logger.info( 'Adding data to queue' )
for d in data:
queue.put(d)
logger.info( 'Done adding to queue!' )
def queue_monitor(queue, triggers, threshold=50, interval=5):
'''
Triggers the load events once the number of data in the queue falls below
threshold, then doesn't trigger again until the interval has passed.
Note: interval should be larger than data load time.
'''
while len(triggers):
if queue.qsize() < threshold:
logger.info( 'Triggering next load' )
triggers.pop(0).set()
time.sleep(interval)
if __name__ == '__main__':
logger.info( "Starting" )
out_queue = mp.Queue()
#Initialise the load processes
nprocs, procs = 3, []
triggers = [mp.Event() for _ in range(nprocs)]
triggers[0].set() #set the first process to trigger immediately
for i, trigger in enumerate(triggers):
p = mp.Process( name='data_loader %d'%i, target=data_loader,
args=(trigger, out_queue) )
procs.append( p )
for p in procs:
p.start()
#Monitoring process
qm = mp.Process( name='queue_monitor', target=queue_monitor,
args=(out_queue, triggers) )
qm.start()
#consume data
while out_queue.empty():
pass
else:
for d in iter( out_queue.get, None ):
time.sleep(0.2) #pretend to take some time to process/animate the data
logger.info( 'data: %i' %d[0,0] ) #just to keep track of data ordering
在某些情况下,这个方法效果很好,但有时候在触发新的加载过程后,数据的顺序会乱掉。我搞不懂为什么会这样——mp.Queue应该是先进先出(FIFO)吧?!例如,直接运行上面的代码,输出队列中的顺序就不对了,但如果把阈值改成一个更低的值,比如30,就能解决这个问题。*真让人困惑……
所以问题是:我该如何正确地在Python中实现这种错开加载的策略,使用 multiprocessing
呢?
1 个回答
这看起来像是一个缓冲问题。内部的 multiprocessing.Queue
使用一个缓冲区来暂时存储你放入队列的项目,最后会在后台线程中将这些项目发送到一个 Pipe
。只有在这些项目被发送之后,它们才会真正传送到其他进程。因为你在队列中放入了大对象,所以缓冲的过程会比较多。这导致加载的进程实际上是重叠的,尽管你的日志显示一个进程在另一个开始之前已经完成了。文档中实际上对此场景有警告:
当一个对象被放入队列时,这个对象会被序列化(也就是“打包”),然后一个后台线程会将这些序列化的数据发送到一个底层的管道。这会带来一些意想不到的后果,但通常不会造成实际的问题——如果这些问题真的让你困扰,你可以使用一个管理器来创建队列。
- 在将一个对象放入空队列后,可能会有一个极小的延迟,直到队列的
empty()
方法返回 False,并且get_nowait()
可以在不抛出Queue.Empty
的情况下返回。- 如果多个进程同时向队列中放入对象,接收端可能会收到顺序错乱的对象。不过,同一个进程放入的对象之间会保持正确的顺序。
我建议按照文档的说明,使用 multiprocessing.Manager
来创建你的队列:
m = mp.Manager()
out_queue = m.Queue()
这样可以让你完全避免这个问题。
另一种选择是只使用一个进程来完成所有的数据加载,并让它在一个循环中运行,循环的顶部调用 event.wait()
。