使用`event`实现多进程暂停-重启功能

8 投票
1 回答
16148 浏览
提问于 2025-04-18 08:59

我正在使用下面的代码来实现暂停-重启的功能,适用于multiprocessing的池。

我想请你解释一下,为什么event变量必须作为参数传递给setup()函数。还有,为什么在setup()函数内部声明了一个全局变量unpaused,并且它被设置为和event变量相同:

def setup(event):
    global unpaused
    unpaused = event

我还想了解一下下面这段声明的逻辑:

pool=mp.Pool(2, setup, (event,))

第一个提交的参数是池要使用的CPU核心数量。第二个提交的参数是上面提到的函数setup()

为什么不可以这样完成所有的事情:

global event
event=mp.Event()
pool = mp.Pool(processes=2)

每当我们需要暂停或重启一个任务时,我们只需使用:

暂停时:

event.clear()

重启时:

event.set()

为什么我们需要一个全局变量unpaused? 我不太明白!请给点建议。


import time
import multiprocessing as mp

def myFunct(arg):
    proc=mp.current_process()
    print 'starting:', proc.name, proc.pid,'...\n'
    for i in range(110):
        for n in range(500000):
            pass
    print '\t ...', proc.name, proc.pid, 'completed\n'

def setup(event):
    global unpaused
    unpaused = event

def pauseJob():
    event.clear()

def continueJob():
    event.set()


event=mp.Event()

pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])

event.set()

pool.close()
pool.join()

1 个回答

26

你对 Event 的工作原理有些误解。首先,我来解释一下 setup 是干什么的。

setup 函数会在池中的每个子进程启动时执行。所以,你在每个进程里设置了一个叫 event 的全局变量,这个变量指向你在主进程中创建的同一个 multiprocessing.Event 对象。这样,每个子进程都有一个叫 event 的全局变量,它们都指向同一个 multiprocessing.Event 对象。这就能让你从主进程向子进程发送信号,正如你想要的那样。看看这个例子:

import multiprocessing

event = None
def my_setup(event_):
  global event
  event = event_
  print "event is %s in child" % event


if __name__ == "__main__":
    event = multiprocessing.Event()
    p = multiprocessing.Pool(2, my_setup, (event,))
    print "event is %s in parent" % event
    p.close()
    p.join()

输出:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent

如你所见,两个子进程和父进程中的 event 是一样的,正如你所希望的。

不过,把 event 传给 setup 实际上并不是必要的。你可以直接从父进程继承 event 实例:

import multiprocessing

event = None

def my_worker(num):
    print "event is %s in child" % event

if __name__ == "__main__":
    event = multiprocessing.Event()
    pool = multiprocessing.Pool(2)
    pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.

    pool.close()
    pool.join()
    print "event is %s in parent" % event

输出:

dan@dantop2:~$ ./mult.py 
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent

这样做简单多了,也是父子进程之间传递信号的推荐方式。实际上,如果你试图直接把 event 传给工作函数,你会遇到错误:

RuntimeError: Semaphore objects should only be shared between processes through inheritance

现在,回到你对 Event 工作方式的误解。Event 应该这样使用:

import time
import multiprocessing

def event_func(num):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

if __name__ == "__main__":
    event = multiprocessing.Event()

    pool = multiprocessing.Pool()
    a = pool.map_async(event_func, [i for i in range(pool._processes)])

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    pool.close()
    pool.join()

输出:

main is sleeping
    <Process(PoolWorker-1, started daemon)> is waiting
    <Process(PoolWorker-2, started daemon)> is waiting
    <Process(PoolWorker-4, started daemon)> is waiting
    <Process(PoolWorker-3, started daemon)> is waiting
main is setting event
    <Process(PoolWorker-2, started daemon)> has woken up
    <Process(PoolWorker-1, started daemon)> has woken up
    <Process(PoolWorker-4, started daemon)> has woken up
    <Process(PoolWorker-3, started daemon)> has woken up

如你所见,子进程需要明确调用 event.wait() 才能暂停它们。当主进程调用 event.set 时,它们就会被唤醒。现在你的工作进程都没有调用 event.wait,所以它们永远不会被暂停。我建议你查看一下 threading.Event 的文档,因为 multiprocessing.Event 是它的复制品。

撰写回答