多个子进程锁定互斥量

0 投票
1 回答
1109 浏览
提问于 2025-04-19 16:21

这个代码里的互斥锁(mutex)工作正常吗?也就是说,sys.stdout.write这个部分有被保护起来吗?

import sys
from multiprocessing import Pool, Lock

class ParentApp():
    mutex=Lock()
    def report(self,msg):
        with ParentApp.mutex:
            sys.stdout.write(msg)

class ChildApp1(ParentApp):
    def print_report(self):
        for i in xrange(100):
            ParentApp.report(self, 'BLABLA')


class ChildApp2(ParentApp):
    def print_report(self):
        for i in xrange(100):
            ParentApp.report(self, 'TESTTEST')

def runnable(app):
    app.print_report()

def main():
    app=[]
    app.append(ChildApp1())
    app.append(ChildApp2())

    pool = Pool(len(apps))
    pool.map(runnable, apps)

    exit(0)

if __name__ == '__main__':
    sys.exit(main())

补充说明:代码也在这里: http://pastebin.com/GyV3w45F

补充说明:我是在Linux主机上运行的

1 个回答

2

如果你在类似Posix的系统上,这个功能才正常工作。如果你在Windows上,每个进程都会有一个完全不同的锁的副本。

你可以通过添加一些额外的跟踪代码和一个sleep语句来观察这个现象:

class ParentApp():
    mutex=Lock()
    def report(self,msg):
        print("\nGETTING for {}".format(msg))
        with self.mutex:
            print("GOT for {}".format(msg))
            sys.stdout.write(msg)
            sys.stdout.flush()
            time.sleep(5)

在Linux上:

GETTING for BLABLA
GOT for BLABLA
BLABLA
GETTING for TESTTEST
< 5 second delay here>

在Windows上:

GETTING for BLABLA
GOT for BLABLA
BLABLA
GETTING for TESTTEST
GOT for TESTTEST
TESTTEST
<5 second delay here>

这是因为Posix平台使用os.fork()来创建新进程,这样在父进程中创建的Lock()会自动被子进程继承。然而,Windows没有os.fork,所以它需要先启动一个新进程,然后在子进程中重新导入你的模块。重新导入模块意味着ParentApp会被重新导入和执行,Lock这个类属性也会如此。因此,你的父进程和两个子进程各自会有自己独特的Lock

要解决这个问题,你需要在父进程中创建一个共享的Lock,然后把它传递给子进程。实际上,这在你当前的架构下并不是一件简单的事——你把Lock对象作为参数传递给pool.map,这不会允许你传递Lock对象。如果你尝试这样做,会出现异常:

RuntimeError: Lock objects should only be shared between processes through inheritance

你只能在实际启动一个Process的时候将普通的Lock对象传递给子进程。一旦它们被启动(比如你调用Pool的方法时),就会出现那个异常:

l = Lock()
p = Process(target=func, args=(l,)) # ok
p.start()

pool = Pool()
pool.apply(func, args=(l,)) # not ok, raises an exception.

为了将Lock传递给像map这样的Pool函数,你需要使用multiprocessing.Manager来创建一个共享锁。以下是我建议的做法:

import sys 
from multiprocessing import Pool, Lock, get_context, Manager
import time

class ParentApp():
    def __init__(self, mutex):
        self.mutex = mutex

    def report(self,msg):
        with self.mutex:
            sys.stdout.write(msg)

class ChildApp1(ParentApp):
    def print_report(self):
        for i in range(100):
            ParentApp.report(self, 'BLABLA')


class ChildApp2(ParentApp):
    def print_report(self):
        for i in range(100):
            ParentApp.report(self, 'TESTTEST')

def runnable(app):
    app.print_report()

def main():
    apps=[]
    m = Manager()
    lock = m.Lock()
    apps.append(ChildApp1(lock))
    apps.append(ChildApp2(lock))

    pool = Pool(len(apps))
    pool.map(runnable, apps)

if __name__ == '__main__':
    sys.exit(main())

为了确保Lock是共享的,我们需要让ParentApp实际接收锁对象作为参数。这虽然不如将其完全封装在类中那么好,但我认为这是在Windows的限制下我们能做到的最好方法。

撰写回答