使用`event`实现多进程暂停-重启功能
我正在使用下面的代码来实现暂停-重启的功能,适用于multiprocessing
的池。
我想请你解释一下,为什么event
变量必须作为参数传递给setup()
函数。还有,为什么在setup()
函数内部声明了一个全局变量unpaused
,并且它被设置为和event
变量相同:
def setup(event):
global unpaused
unpaused = event
我还想了解一下下面这段声明的逻辑:
pool=mp.Pool(2, setup, (event,))
第一个提交的参数是池要使用的CPU核心数量。第二个提交的参数是上面提到的函数setup()
。
为什么不可以这样完成所有的事情:
global event
event=mp.Event()
pool = mp.Pool(processes=2)
每当我们需要暂停或重启一个任务时,我们只需使用:
暂停时:
event.clear()
重启时:
event.set()
为什么我们需要一个全局变量unpaused
? 我不太明白!请给点建议。
import time
import multiprocessing as mp
def myFunct(arg):
proc=mp.current_process()
print 'starting:', proc.name, proc.pid,'...\n'
for i in range(110):
for n in range(500000):
pass
print '\t ...', proc.name, proc.pid, 'completed\n'
def setup(event):
global unpaused
unpaused = event
def pauseJob():
event.clear()
def continueJob():
event.set()
event=mp.Event()
pool=mp.Pool(2, setup, (event,))
pool.map_async(myFunct, [1,2,3])
event.set()
pool.close()
pool.join()
1 个回答
你对 Event
的工作原理有些误解。首先,我来解释一下 setup
是干什么的。
setup
函数会在池中的每个子进程启动时执行。所以,你在每个进程里设置了一个叫 event
的全局变量,这个变量指向你在主进程中创建的同一个 multiprocessing.Event
对象。这样,每个子进程都有一个叫 event
的全局变量,它们都指向同一个 multiprocessing.Event
对象。这就能让你从主进程向子进程发送信号,正如你想要的那样。看看这个例子:
import multiprocessing
event = None
def my_setup(event_):
global event
event = event_
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
p = multiprocessing.Pool(2, my_setup, (event,))
print "event is %s in parent" % event
p.close()
p.join()
输出:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in child
event is <multiprocessing.synchronize.Event object at 0x7f93cd7a48d0> in parent
如你所见,两个子进程和父进程中的 event
是一样的,正如你所希望的。
不过,把 event
传给 setup
实际上并不是必要的。你可以直接从父进程继承 event
实例:
import multiprocessing
event = None
def my_worker(num):
print "event is %s in child" % event
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool(2)
pool.map_async(my_worker, [i for i in range(pool._processes)]) # Just call my_worker for every process in the pool.
pool.close()
pool.join()
print "event is %s in parent" % event
输出:
dan@dantop2:~$ ./mult.py
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in child
event is <multiprocessing.synchronize.Event object at 0x7fea3b1dc8d0> in parent
这样做简单多了,也是父子进程之间传递信号的推荐方式。实际上,如果你试图直接把 event
传给工作函数,你会遇到错误:
RuntimeError: Semaphore objects should only be shared between processes through inheritance
现在,回到你对 Event
工作方式的误解。Event
应该这样使用:
import time
import multiprocessing
def event_func(num):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
if __name__ == "__main__":
event = multiprocessing.Event()
pool = multiprocessing.Pool()
a = pool.map_async(event_func, [i for i in range(pool._processes)])
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
pool.close()
pool.join()
输出:
main is sleeping
<Process(PoolWorker-1, started daemon)> is waiting
<Process(PoolWorker-2, started daemon)> is waiting
<Process(PoolWorker-4, started daemon)> is waiting
<Process(PoolWorker-3, started daemon)> is waiting
main is setting event
<Process(PoolWorker-2, started daemon)> has woken up
<Process(PoolWorker-1, started daemon)> has woken up
<Process(PoolWorker-4, started daemon)> has woken up
<Process(PoolWorker-3, started daemon)> has woken up
如你所见,子进程需要明确调用 event.wait()
才能暂停它们。当主进程调用 event.set
时,它们就会被唤醒。现在你的工作进程都没有调用 event.wait
,所以它们永远不会被暂停。我建议你查看一下 threading.Event
的文档,因为 multiprocessing.Event
是它的复制品。