Python 多进程管理器与组合模式共享

1 投票
3 回答
1586 浏览
提问于 2025-04-15 14:35

我正在尝试通过一个多进程管理器来共享一个复合结构,但在使用复合类的方法时遇到了一个问题,出现了RuntimeError: 最大递归深度超出的错误。

这个类是从code.activestate上找到的,我在加入管理器之前已经测试过了。

当我在一个进程中调用这个类的addChild()方法时,依然出现了RunTimeError,而在进程外部调用时却没有问题。

这个复合类是从一个叫做SpecialDict的类继承而来的,后者实现了一个** ____getattr()____ **的方法。

可能是因为在调用addChild()时,Python解释器在寻找一个不同的** ____getattr()____ **方法,因为正确的方法没有被管理器代理吗?

如果是这样的话,我不太清楚如何正确地为这个类/方法创建一个代理。

以下代码正好重现了这个问题:

1) 这是manager.py:

from multiprocessing.managers import BaseManager
from CompositeDict import *

class PlantPurchaser():

    def __init__(self):
        self.comp  = CompositeDict('Comp')

    def get_cp(self):
        return self.comp

class Manager():

    def __init__(self):

        self.comp  = QueuePurchaser().get_cp()

        BaseManager.register('get_comp', callable=lambda:self.comp)

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.s = self.m.get_server()

        self.s.serve_forever()

2) 我想在这个consumer.py中使用复合结构:

from multiprocessing.managers import BaseManager

class Consumer():

    def __init__(self):

        BaseManager.register('get_comp')

        self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        self.m.connect()

        self.comp = self.m.get_comp()
        ret = self.comp.addChild('consumer')

3) 通过controller.py来启动所有内容:

from multiprocessing import Process

class Controller():
    def __init__(self):
        for child in _run_children():
            child.join()

def _run_children():

    from manager import Manager
    from consumer import Consumer as Consumer

procs = (
         Process(target=Manager,  name='Manager' ),
         Process(target=Consumer, name='Consumer'),
        )

for proc in procs:
    proc.daemon = 1
    proc.start()
return procs

c = Controller()

可以看看这个相关问题,了解如何为CompositeDict()类创建代理,正如AlberT所建议的那样。

tgray提供的解决方案有效,但无法避免竞争条件。

3 个回答

0

Python 默认的最大递归深度是 1000(或者 999,我记不太清了...)。不过你可以这样来改变这个默认设置:

import sys
sys.setrecursionlimit(n)

这里的 n 是你希望允许的递归次数。

补充:

上面的回答并没有解决这个问题的根本原因(评论中有人提到过)。只有在你确实需要递归超过 1000 次的时候,才需要使用这个设置。如果你陷入了无限循环(就像这个问题一样),无论你设置的限制是多少,最终都会达到这个限制。

为了处理你实际的问题,我从头开始重写了你的代码,尽量简化,然后逐步构建到我认为你想要的样子:

import sys
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from CompositDict import *

class Shared():
    def __init__(self):
        self.comp = CompositeDict('Comp')

    def get_comp(self):
        return self.comp

    def set_comp(self, c):
        self.comp = c

class Manager():
    def __init__(self):
        shared = Shared()
        BaseManager.register('get_shared', callable=lambda:shared)
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        srv = mgr.get_server()
        srv.serve_forever()

class Consumer():
    def __init__(self, child_name):
        BaseManager.register('get_shared')
        mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
        mgr.connect()

        shared = mgr.get_shared()
        comp = shared.get_comp()
        child = comp.addChild(child_name)
        shared.set_comp(comp)
        print comp

class Controller():
    def __init__(self):
        pass

    def main(self):
        m = Process(target=Manager, name='Manager')
        m.daemon = True
        m.start()

        consumers = []
        for i in xrange(3):
            p = Process(target=Consumer, name='Consumer', args=('Consumer_' + str(i),))
            p.daemon = True
            consumers.append(p)

        for c in consumers:
            c.start()
        for c in consumers:
            c.join()
        return 0


if __name__ == '__main__':
    con = Controller()
    sys.exit(con.main())

我把这些都放在一个文件里,但你应该可以轻松地把它拆分开。

我为你的消费者添加了一个 child_name 参数,这样我就可以检查 CompositDict 是否被更新了。

注意,你的 CompositDict 对象有一个获取器和一个设置器。当我只有获取器的时候,每个消费者在添加子项时都会覆盖 CompositDict

这也是我把你注册的方法改成 get_shared 而不是 get_comp 的原因,因为你希望在消费者类中同时访问设置器和获取器。

另外,我觉得你不应该尝试去连接你的管理进程,因为它会“永远服务”。如果你查看 BaseManager 的源代码(./Lib/multiprocessing/managers.py:Line 144),你会发现 serve_forever() 函数会让你进入一个无限循环,只有通过 KeyboardInterruptSystemExit 才能打断。

总之,这段代码在我看来是没有任何递归循环的,但如果你仍然遇到错误,请告诉我。

0

有没有可能在这些类之间存在循环引用?举个例子,外部类引用了一个组合类,而这个组合类又引用回外部类。

多进程管理器运行得不错,但当你有很大、很复杂的类结构时,可能会遇到一个错误,就是某种类型或引用无法正确地被序列化。还有一个问题是,多进程管理器的错误信息非常难懂,这让调试失败的情况变得更加困难。

0

我觉得问题在于,你需要告诉管理者怎么管理你的对象,而这个对象不是标准的 Python 类型。

换句话说,你需要为你的 CompositeDict 创建一个代理

你可以看看这个文档,里面有个例子:http://ruffus.googlecode.com/svn/trunk/doc/html/sharing_data_across_jobs_example.html

撰写回答