Python 多进程管理器与组合模式共享
我正在尝试通过一个多进程管理器来共享一个复合结构,但在使用复合类的方法时遇到了一个问题,出现了RuntimeError: 最大递归深度超出的错误。
这个类是从code.activestate上找到的,我在加入管理器之前已经测试过了。
当我在一个进程中调用这个类的addChild()方法时,依然出现了RunTimeError,而在进程外部调用时却没有问题。
这个复合类是从一个叫做SpecialDict的类继承而来的,后者实现了一个** ____getattr()____ **的方法。
可能是因为在调用addChild()时,Python解释器在寻找一个不同的** ____getattr()____ **方法,因为正确的方法没有被管理器代理吗?
如果是这样的话,我不太清楚如何正确地为这个类/方法创建一个代理。
以下代码正好重现了这个问题:
1) 这是manager.py:
from multiprocessing.managers import BaseManager
from CompositeDict import *
class PlantPurchaser():
def __init__(self):
self.comp = CompositeDict('Comp')
def get_cp(self):
return self.comp
class Manager():
def __init__(self):
self.comp = QueuePurchaser().get_cp()
BaseManager.register('get_comp', callable=lambda:self.comp)
self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
self.s = self.m.get_server()
self.s.serve_forever()
2) 我想在这个consumer.py中使用复合结构:
from multiprocessing.managers import BaseManager
class Consumer():
def __init__(self):
BaseManager.register('get_comp')
self.m = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
self.m.connect()
self.comp = self.m.get_comp()
ret = self.comp.addChild('consumer')
3) 通过controller.py来启动所有内容:
from multiprocessing import Process
class Controller():
def __init__(self):
for child in _run_children():
child.join()
def _run_children():
from manager import Manager
from consumer import Consumer as Consumer
procs = (
Process(target=Manager, name='Manager' ),
Process(target=Consumer, name='Consumer'),
)
for proc in procs:
proc.daemon = 1
proc.start()
return procs
c = Controller()
可以看看这个相关问题,了解如何为CompositeDict()类创建代理,正如AlberT所建议的那样。
由tgray提供的解决方案有效,但无法避免竞争条件。
3 个回答
Python 默认的最大递归深度是 1000(或者 999,我记不太清了...)。不过你可以这样来改变这个默认设置:
import sys
sys.setrecursionlimit(n)
这里的 n
是你希望允许的递归次数。
补充:
上面的回答并没有解决这个问题的根本原因(评论中有人提到过)。只有在你确实需要递归超过 1000 次的时候,才需要使用这个设置。如果你陷入了无限循环(就像这个问题一样),无论你设置的限制是多少,最终都会达到这个限制。
为了处理你实际的问题,我从头开始重写了你的代码,尽量简化,然后逐步构建到我认为你想要的样子:
import sys
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from CompositDict import *
class Shared():
def __init__(self):
self.comp = CompositeDict('Comp')
def get_comp(self):
return self.comp
def set_comp(self, c):
self.comp = c
class Manager():
def __init__(self):
shared = Shared()
BaseManager.register('get_shared', callable=lambda:shared)
mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
srv = mgr.get_server()
srv.serve_forever()
class Consumer():
def __init__(self, child_name):
BaseManager.register('get_shared')
mgr = BaseManager(address=('127.0.0.1', 50000), authkey='abracadabra')
mgr.connect()
shared = mgr.get_shared()
comp = shared.get_comp()
child = comp.addChild(child_name)
shared.set_comp(comp)
print comp
class Controller():
def __init__(self):
pass
def main(self):
m = Process(target=Manager, name='Manager')
m.daemon = True
m.start()
consumers = []
for i in xrange(3):
p = Process(target=Consumer, name='Consumer', args=('Consumer_' + str(i),))
p.daemon = True
consumers.append(p)
for c in consumers:
c.start()
for c in consumers:
c.join()
return 0
if __name__ == '__main__':
con = Controller()
sys.exit(con.main())
我把这些都放在一个文件里,但你应该可以轻松地把它拆分开。
我为你的消费者添加了一个 child_name
参数,这样我就可以检查 CompositDict
是否被更新了。
注意,你的 CompositDict
对象有一个获取器和一个设置器。当我只有获取器的时候,每个消费者在添加子项时都会覆盖 CompositDict
。
这也是我把你注册的方法改成 get_shared
而不是 get_comp
的原因,因为你希望在消费者类中同时访问设置器和获取器。
另外,我觉得你不应该尝试去连接你的管理进程,因为它会“永远服务”。如果你查看 BaseManager 的源代码(./Lib/multiprocessing/managers.py:Line 144),你会发现 serve_forever()
函数会让你进入一个无限循环,只有通过 KeyboardInterrupt
或 SystemExit
才能打断。
总之,这段代码在我看来是没有任何递归循环的,但如果你仍然遇到错误,请告诉我。
有没有可能在这些类之间存在循环引用?举个例子,外部类引用了一个组合类,而这个组合类又引用回外部类。
多进程管理器运行得不错,但当你有很大、很复杂的类结构时,可能会遇到一个错误,就是某种类型或引用无法正确地被序列化。还有一个问题是,多进程管理器的错误信息非常难懂,这让调试失败的情况变得更加困难。
我觉得问题在于,你需要告诉管理者怎么管理你的对象,而这个对象不是标准的 Python 类型。
换句话说,你需要为你的 CompositeDict 创建一个代理。
你可以看看这个文档,里面有个例子:http://ruffus.googlecode.com/svn/trunk/doc/html/sharing_data_across_jobs_example.html