在python多处理中使用list

2024-06-16 12:27:35 发布

您现在位置:Python中文网/ 问答频道 /正文

有谁能帮我在多个python进程之间共享一个列表吗。问题是在下面的代码中获取self.ID_List和self.mps_in_进程。

import time, random
from multiprocessing import Process #, Manager, Array, Queue

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*10) # simulate data processing
        parent.killMP(id)

class ParamHandler():
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = IDs
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)

很快,代码所做的就是根据self.id_List中的数据id处理一些数据(这里是MP_Stuff中的随机时间)。为了知道进程中有多少数据id是self.mps_In_process(nr process s在这里是硬编码的,但实际上它是动态的)。

问题是在多个进程之间共享mps_in_进程和ID_列表。当前代码进入了几乎无止境的循环。出错的地方实际上在多处理库中有很好的描述:

"if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start() was called."

但是,我不知道如何让mps-in-u进程和ID-u列表工作。我不能使用Queue,因为从mps_in_进程中取出元素的方式是随机的。我无法使用数组,因为.pop(0)不起作用。我不能使用Manager().list(),因为.remove()和len(ID_list)那时不起作用。使用线程而不是多处理不是解决方案,因为必须使用稍后的freeze_support()。

因此,任何关于如何在进程之间共享列表的帮助都是非常受欢迎的!


Tags: theinselfid列表if进程def
2条回答

经理工作正常(包括len())。代码的问题在于,在主进程中,您不会等到处理结束,因此主进程结束,管理器不再可访问。我也不知道ListProxy的pop原子性,所以也许一个锁会很方便。

解决方案是p.join()

然而,我很困惑,为什么在doFirstMP结尾处做p.join就足够了。如果有人能解释为什么在完成所有计算之后,而不是在第一个doMP返回之后,第一个p返回join,我会很高兴。

我的代码:

import time, random
from multiprocessing import Process, Manager

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*5) # simulate data processing
        print id , "done"
        parent.killMP(id)

class ParamHandler():    
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = Manager().list(IDs)
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()
        p.join()
        print "joined"

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            print self.ID_List
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        print "kill", kill_id
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)

很遗憾,您已经指定了选项。

Array()Manager().list()都应该能够做到,尽管您可能需要一些额外的工作。

  • 您可以模拟一个len(ID_List),方法是将数量存储在Value()中并递增/递减。
  • remove()可以很容易地通过一个循环和之后的一个delete来模拟(当然速度较慢)。

相关问题 更多 >