在Python多进程中使用列表
有没有人能帮我解决在多个Python进程之间共享一个列表的问题?我需要让self.ID_List和self.mps_in_process在下面的代码中正常工作。
import time, random
from multiprocessing import Process #, Manager, Array, Queue
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*10) # simulate data processing
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = IDs
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def doMP(self):
for tmp in range(3): # nr of concurrent processes
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)
简单来说,这段代码的作用是根据self.ID_List中的数据ID处理一些数据(这里是MP_Stuff中的随机时间)。为了知道有多少数据ID正在处理,使用了self.mps_in_process(这里的进程数量是固定的,但实际上是动态的)。
问题在于如何在多个进程之间共享mps_in_process和ID_List。目前的代码进入了一个几乎无尽的循环。出错的原因在多进程库中有很好的描述:
“如果在子进程中运行的代码试图访问一个全局变量,那么它看到的值(如果有的话)可能与调用Process.start()时父进程中的值不同。”
然而,我无法弄明白如何让mps_in_process和ID_List正常工作。我不能使用Queue,因为从mps_in_process中取出元素的方式是随机的。我也不能使用Array,因为.pop(0)不管用。我不能使用Manager().list(),因为那样的话.remove()和len(ID_List)都无法正常工作。使用线程而不是多进程也不是解决办法,因为后面必须使用freeze_support()。
所以,任何关于如何在进程之间共享列表的帮助都非常欢迎!
2 个回答
很遗憾,你已经设置了你的选项。
Array()
和 Manager().list()
都可以做到这一点,不过你可能需要多花点功夫。
- 你可以通过把数量存储在一个
Value()
中,然后进行加一或减一的操作,来模拟len(ID_List)
的功能。 - 要模拟
remove()
,你可以用一个循环来实现,然后在循环后面删除它(当然这样会慢一些)。
这个管理器工作得很好(包括len()函数)。你代码的问题在于,在主进程中,你没有等到处理结束,所以主进程就结束了,管理器就无法再访问了。另外,我不太清楚ListProxy的pop操作是否是原子性的,所以可能需要加个锁。
解决办法是用 p.join()
。
不过,我有点困惑,为什么在 doFirstMP
的最后调用 p.join
就足够了。我希望有人能解释一下,为什么第一个p的join是在所有计算完成后返回,而不是在第一个doMP返回后就结束。
我的代码:
import time, random
from multiprocessing import Process, Manager
class MP_Stuff():
def __init__(self, parent, id):
time.sleep(1 + random.random()*5) # simulate data processing
print id , "done"
parent.killMP(id)
class ParamHandler():
def doFirstMP(self, IDs):
self.mps_in_process = []
self.ID_List = Manager().list(IDs)
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
p.join()
print "joined"
def doMP(self):
for tmp in range(3): # nr of concurrent processes
print self.ID_List
if len(self.ID_List) > 0:
id = self.ID_List.pop(0)
p = Process(target=MP_Stuff, args=(self, id))
self.mps_in_process.append(id)
p.start()
def killMP(self, kill_id):
print "kill", kill_id
self.mps_in_process.remove(kill_id)
self.doMP()
if __name__ == '__main__':
ID_List = [1,2,3,4,5,6]
paramSet = ParamHandler()
paramSet.doFirstMP(ID_List)