Python3 多进程共享对象

1 投票
1 回答
2459 浏览
提问于 2025-04-18 15:02

我在使用Python 3.2.3的multiprocessing模块时,遇到了一个关于共享对象的同步问题(我是在Debian 7.5上)。为了说明这个问题,我做了一个简单的例子,它的功能类似于multiprocessing.Pool.map(这是我能想到的最简单的方式)。我使用了multiprocessing.Manager,因为我原来的代码是用它的(在网络上进行同步)。不过,如果我用简单的multiprocessing.Value来做计数器变量,行为也是一样的。

import os as os
import sys as sys
import multiprocessing as mp

def mp_map(function, obj_list, num_workers):
    """ 
    """
    mang = mp.Manager()
    jobq = mang.Queue()
    resq = mang.Queue()
    counter = mp.Value('i', num_workers, lock=True)
    finished = mang.Event()
    processes = []
    try:
        for i in range(num_workers):
            p = mp.Process(target=_parallel_execute, kwargs={'execfun':function, 'jobq':jobq, 'resq':resq, 'counter':counter, 'finished':finished})
            p.start()
            p.join(0)
            processes.append(p)
        for item in obj_list:
            jobq.put(item)
        for i in range(len(processes)):
            jobq.put('SENTINEL')
        finished.wait()
        for p in processes:
            if p.is_alive():
                p.join(1)
                p.terminate()
    except Exception as e:
        for p in processes:
            p.terminate()
        raise e
    results = []
    for item in iter(resq.get, 'DONE'):
        results.append(item)
    return results

def _parallel_execute(execfun, jobq, resq, counter, finished):
    """
    """
    for item in iter(jobq.get, 'SENTINEL'):
        item = execfun(item)
        resq.put(item)
    counter.value -= 1
    print('C: {}'.format(counter.value))
    if counter.value <= 0:
        resq.put('DONE')
        finished.set()
    return


if __name__ == '__main__':
    l = list(range(50))
    l = mp_map(id, l, 2)
    print('done')
    sys.exit(0)

运行上面的代码几次后,结果如下:

wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 0
done
wks:~$ python3 mpmap.py 
C: 1
C: 1
Traceback (most recent call last):
  File "mpmap.py", line 55, in <module>
    l = mp_map(id, l, 2)
  File "mpmap.py", line 25, in mp_map
    finished.wait()
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 1013, in wait
    return self._callmethod('wait', (timeout,))
  File "/usr/lib/python3.2/multiprocessing/managers.py", line 762, in _callmethod
    kind, result = conn.recv()
KeyboardInterrupt

根据multiprocessing模块的文档,我不明白为什么counter不是进程安全的,因为对它的访问是通过Manager来管理的,而且它明显是用lock=True初始化的。由于死锁只偶尔发生,我也不太确定该如何理解这种行为。任何有用的见解都非常感谢,谢谢。

编辑: 我刚好在网上搜索了一下,找到了一个解释;如果有人感兴趣,我会在这里分享:根据下面链接的这篇博客文章1,Python中的锁(即在multiprocessing.[Manager].Value中使用lock=True)并不会像例子中那样对共享值进行原子操作。解决方案是使用另一个在进程之间共享的锁,用来控制对共享对象的访问。

[http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/]

1 个回答

0

根据Ross的建议,我在这里重复一下答案:简单来说,lock=Truemultiprocessing.Valuemultiprocessing.Manager.Value 中,并不能让对值的增加(或减少)操作变成一个原子操作,也就是说,这个操作并不是一个不可分割的整体;你需要一个单独的锁来把整个操作包裹起来。想要看代码示例,可以参考这个答案 https://stackoverflow.com/a/1233363/3826372 或者前面提到的博客文章 http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

撰写回答