多个子进程锁定互斥量
这个代码里的互斥锁(mutex)工作正常吗?也就是说,sys.stdout.write这个部分有被保护起来吗?
import sys
from multiprocessing import Pool, Lock
class ParentApp():
mutex=Lock()
def report(self,msg):
with ParentApp.mutex:
sys.stdout.write(msg)
class ChildApp1(ParentApp):
def print_report(self):
for i in xrange(100):
ParentApp.report(self, 'BLABLA')
class ChildApp2(ParentApp):
def print_report(self):
for i in xrange(100):
ParentApp.report(self, 'TESTTEST')
def runnable(app):
app.print_report()
def main():
app=[]
app.append(ChildApp1())
app.append(ChildApp2())
pool = Pool(len(apps))
pool.map(runnable, apps)
exit(0)
if __name__ == '__main__':
sys.exit(main())
补充说明:代码也在这里: http://pastebin.com/GyV3w45F
补充说明:我是在Linux主机上运行的
1 个回答
2
如果你在类似Posix的系统上,这个功能才正常工作。如果你在Windows上,每个进程都会有一个完全不同的锁的副本。
你可以通过添加一些额外的跟踪代码和一个sleep
语句来观察这个现象:
class ParentApp():
mutex=Lock()
def report(self,msg):
print("\nGETTING for {}".format(msg))
with self.mutex:
print("GOT for {}".format(msg))
sys.stdout.write(msg)
sys.stdout.flush()
time.sleep(5)
在Linux上:
GETTING for BLABLA
GOT for BLABLA
BLABLA
GETTING for TESTTEST
< 5 second delay here>
在Windows上:
GETTING for BLABLA
GOT for BLABLA
BLABLA
GETTING for TESTTEST
GOT for TESTTEST
TESTTEST
<5 second delay here>
这是因为Posix平台使用os.fork()
来创建新进程,这样在父进程中创建的Lock()
会自动被子进程继承。然而,Windows没有os.fork
,所以它需要先启动一个新进程,然后在子进程中重新导入你的模块。重新导入模块意味着ParentApp
会被重新导入和执行,Lock
这个类属性也会如此。因此,你的父进程和两个子进程各自会有自己独特的Lock
。
要解决这个问题,你需要在父进程中创建一个共享的Lock
,然后把它传递给子进程。实际上,这在你当前的架构下并不是一件简单的事——你把Lock
对象作为参数传递给pool.map
,这不会允许你传递Lock
对象。如果你尝试这样做,会出现异常:
RuntimeError: Lock objects should only be shared between processes through inheritance
你只能在实际启动一个Process
的时候将普通的Lock
对象传递给子进程。一旦它们被启动(比如你调用Pool
的方法时),就会出现那个异常:
l = Lock()
p = Process(target=func, args=(l,)) # ok
p.start()
pool = Pool()
pool.apply(func, args=(l,)) # not ok, raises an exception.
为了将Lock
传递给像map
这样的Pool
函数,你需要使用multiprocessing.Manager
来创建一个共享锁。以下是我建议的做法:
import sys
from multiprocessing import Pool, Lock, get_context, Manager
import time
class ParentApp():
def __init__(self, mutex):
self.mutex = mutex
def report(self,msg):
with self.mutex:
sys.stdout.write(msg)
class ChildApp1(ParentApp):
def print_report(self):
for i in range(100):
ParentApp.report(self, 'BLABLA')
class ChildApp2(ParentApp):
def print_report(self):
for i in range(100):
ParentApp.report(self, 'TESTTEST')
def runnable(app):
app.print_report()
def main():
apps=[]
m = Manager()
lock = m.Lock()
apps.append(ChildApp1(lock))
apps.append(ChildApp2(lock))
pool = Pool(len(apps))
pool.map(runnable, apps)
if __name__ == '__main__':
sys.exit(main())
为了确保Lock
是共享的,我们需要让ParentApp
实际接收锁对象作为参数。这虽然不如将其完全封装在类中那么好,但我认为这是在Windows的限制下我们能做到的最好方法。