如何在多处理中访问processsafe容器

2024-05-08 04:53:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下设置:

import multiprocessing
def heavy_work(args):
    queue, data, idx = args
    state = 0
    for elem in queue:
        #  decide how to process current data point, store collected information in 'state'
    else:
        queue.put(modify(data[idx], state))


def mp_heavy_work(data):
    queue = multiprocessing.Manager().Queue(len(data))
    pool = multiprocessing.Pool(processes=4)
    pool.map(heavy_work, ((queue, data, i) for i in range(len(data))))

问题是,排队是不可容忍的,所以5号线不起作用。我需要知道以前的数据点是如何修改的以便决定新的数据点,所以需要读取写访问共享容器(当前是queue)。我想依赖于“原始”进程安全类型而不是锁,因为主要工作是在循环内完成的——因此每次进程进入时锁定它都会使多处理变得多余。在

有办法吗?在


Tags: 数据infordatalenqueue进程def
1条回答
网友
1楼 · 发布于 2024-05-08 04:53:26

为什么你需要它是可接受的?您可以使用普通的queue.get()。在

try:
    while True:
        elem = queue.get(False)
        # do stuff with elem
except Queue.Empty:
    queue.put(modify(data[idx], state))

如果确实需要iterable,可以包装队列对象:

^{pr2}$

这会打电话的队列.get在每次迭代中,直到它碰到一个“sentinal”对象,然后它将StopIteration。这样可以手动关闭队列。在

如果要在队列为空时执行else块(不手动结束迭代),则必须依赖队列。空例外情况:

try:
    for elem in iter(lambda:queue.get(False), 'sentinal'):
        # do stuff        
except Queue.Empty:
    # do else stuff

你甚至可以用你自己的定制包装来包装:

def wrap(queue):
    try:
        for elem in iter(lambda:queue.get(False), 'sentinal'):
            yield elem
        else:
            # this is executed if the 'sentinal' is sent
            # may want to special handle this (throw custom exception?)
    except Queue.Empty:
        pass # just enter iteration

...

    for elem in wrap(queue):
        # do stuff
    else:
        # do more stuff

相关问题 更多 >