在进程之间共享复杂对象?

77 投票
6 回答
129186 浏览
提问于 2025-04-16 03:52

我有一个比较复杂的Python对象,需要在多个进程之间共享。我是通过使用 multiprocessing.Process 来启动这些进程的。当我用 multiprocessing.Queuemultiprocessing.Pipe 来共享对象时,一切都很顺利。但是,当我尝试与其他不是multiprocessing模块的对象共享时,似乎Python会把这些对象复制一份。这是真的吗?

我试着使用 multiprocessing.Value,但我不确定应该用什么类型?我的对象类叫做MyClass。但是当我尝试 multiprocess.Value(MyClass, instance) 时,它失败了,报错信息是:

TypeError: this type has no size

有人知道这是怎么回事吗?

6 个回答

8

在Python 3.6的文档中提到:

在3.6版本中进行了更改:共享对象现在可以嵌套。例如,一个共享的容器对象,比如共享列表,可以包含其他共享对象,这些对象都会由SyncManager来管理和同步。

只要通过SyncManager创建实例,你就可以让这些对象互相引用。不过,在一种对象的方法中动态创建另一种对象可能还是不太可能或者会很复杂。

编辑:我碰到了这个问题 多进程管理器和自定义类,是在python 3.6.5和3.6.7中。需要检查一下python 3.7。

编辑2:由于其他一些问题,我目前无法用python 3.7进行测试。链接中提供的解决方法 https://stackoverflow.com/a/50878600/7541006 对我来说很好用。

78

经过大量的研究和测试,我发现“Manager”在处理简单对象时效果不错。

下面的代码展示了对象inst在不同进程之间是共享的,这意味着当子进程修改inst的属性var时,外部也会看到这个变化。

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

如果你只需要共享简单对象,上面的代码就足够了。

为什么不适合复杂对象呢?因为如果你的对象是嵌套的(也就是一个对象里面还有另一个对象),这可能会失败

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var
        

class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

我认为这种情况发生的主要原因是因为Manager其实是建立在一些低级通信工具(比如管道/队列)之上的一种“糖果条”。

所以,这种方法在多进程的情况下不太推荐。如果可以的话,使用低级工具,比如锁/信号量/管道/队列,或者高层工具,比如Redis队列Redis发布/订阅,会更好,尤其是对于复杂的使用场景(这只是我的建议,哈哈)。

47

你可以使用Python的 multiprocessing 中的 "Manager" 类和你自己定义的代理类来实现这个功能。具体的内容可以参考Python文档中的 Manager 部分。

你需要做的是为你的自定义对象定义一个代理类,然后通过一个“远程管理器”来共享这个对象。可以查看文档中 "使用远程管理器" 这一节的例子,文档里展示了如何共享一个远程队列。你要做的事情类似,不过在调用 your_manager_instance.register() 时,你需要把你的自定义代理类作为参数传进去。

这样一来,你就建立了一个服务器,用来通过自定义代理共享自定义对象。你的客户端需要能够访问这个服务器(同样,建议查看文档中关于如何设置客户端/服务器访问远程队列的优秀示例,不过你要共享的是你特定类的访问权限,而不是 Queue)。

撰写回答