在Python多进程中使用列表

8 投票
2 回答
12802 浏览
提问于 2025-04-16 07:53

有没有人能帮我解决在多个Python进程之间共享一个列表的问题?我需要让self.ID_List和self.mps_in_process在下面的代码中正常工作。

import time, random
from multiprocessing import Process #, Manager, Array, Queue

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*10) # simulate data processing
        parent.killMP(id)

class ParamHandler():
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = IDs
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)

简单来说,这段代码的作用是根据self.ID_List中的数据ID处理一些数据(这里是MP_Stuff中的随机时间)。为了知道有多少数据ID正在处理,使用了self.mps_in_process(这里的进程数量是固定的,但实际上是动态的)。

问题在于如何在多个进程之间共享mps_in_process和ID_List。目前的代码进入了一个几乎无尽的循环。出错的原因在多进程库中有很好的描述:

“如果在子进程中运行的代码试图访问一个全局变量,那么它看到的值(如果有的话)可能与调用Process.start()时父进程中的值不同。”

然而,我无法弄明白如何让mps_in_process和ID_List正常工作。我不能使用Queue,因为从mps_in_process中取出元素的方式是随机的。我也不能使用Array,因为.pop(0)不管用。我不能使用Manager().list(),因为那样的话.remove()和len(ID_List)都无法正常工作。使用线程而不是多进程也不是解决办法,因为后面必须使用freeze_support()。

所以,任何关于如何在进程之间共享列表的帮助都非常欢迎!

2 个回答

0

很遗憾,你已经设置了你的选项。

Array()Manager().list() 都可以做到这一点,不过你可能需要多花点功夫。

  • 你可以通过把数量存储在一个 Value() 中,然后进行加一或减一的操作,来模拟 len(ID_List) 的功能。
  • 要模拟 remove(),你可以用一个循环来实现,然后在循环后面删除它(当然这样会慢一些)。
3

这个管理器工作得很好(包括len()函数)。你代码的问题在于,在主进程中,你没有等到处理结束,所以主进程就结束了,管理器就无法再访问了。另外,我不太清楚ListProxy的pop操作是否是原子性的,所以可能需要加个锁。

解决办法是用 p.join()

不过,我有点困惑,为什么在 doFirstMP 的最后调用 p.join 就足够了。我希望有人能解释一下,为什么第一个p的join是在所有计算完成后返回,而不是在第一个doMP返回后就结束。

我的代码:

import time, random
from multiprocessing import Process, Manager

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*5) # simulate data processing
        print id , "done"
        parent.killMP(id)

class ParamHandler():    
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = Manager().list(IDs)
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()
        p.join()
        print "joined"

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            print self.ID_List
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        print "kill", kill_id
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)

撰写回答