我有以下设置:
import multiprocessing
def heavy_work(args):
queue, data, idx = args
state = 0
for elem in queue:
# decide how to process current data point, store collected information in 'state'
else:
queue.put(modify(data[idx], state))
def mp_heavy_work(data):
queue = multiprocessing.Manager().Queue(len(data))
pool = multiprocessing.Pool(processes=4)
pool.map(heavy_work, ((queue, data, i) for i in range(len(data))))
问题是,排队是不可容忍的,所以5号线不起作用。我需要知道以前的数据点是如何修改的以便决定新的数据点,所以需要读取写访问共享容器(当前是queue
)。我想依赖于“原始”进程安全类型而不是锁,因为主要工作是在循环内完成的——因此每次进程进入时锁定它都会使多处理变得多余。在
有办法吗?在
为什么你需要它是可接受的?您可以使用普通的
queue.get()
。在如果确实需要iterable,可以包装队列对象:
^{pr2}$这会打电话的队列.get在每次迭代中,直到它碰到一个“sentinal”对象,然后它将
StopIteration
。这样可以手动关闭队列。在如果要在队列为空时执行
else
块(不手动结束迭代),则必须依赖队列。空例外情况:你甚至可以用你自己的定制包装来包装:
相关问题 更多 >
编程相关推荐