在Linux/Python2.7上,multiprocessing.Pool在terminate()后生成新子进程?
我有一个可执行文件,需要经常运行,并且每次使用不同的参数。为此,我写了一个小的Python(2.7)包装器,使用了多进程模块,按照这里给出的模式。
我的代码大致是这样的:
try:
logging.info("starting pool runs")
pool.map(run_nlin, params)
pool.close()
except KeyboardInterrupt:
logging.info("^C pressed")
pool.terminate()
except Exception, e:
logging.info("exception caught: ", e)
pool.terminate()
finally:
time.sleep(5)
pool.join()
logging.info("done")
我的工作函数在这里:
class KeyboardInterruptError(Exception): pass
def run_nlin((path_config, path_log, path_nlin, update_method)):
try:
with open(path_log, "w") as log_:
cmdline = [path_nlin, path_config]
if update_method:
cmdline += [update_method, ]
sp.call(cmdline, stdout=log_, stderr=log_)
except KeyboardInterrupt:
time.sleep(5)
raise KeyboardInterruptError()
except:
raise
path_config
是指向一个配置文件的路径,这个配置文件是给那个二进制程序用的;里面包含了比如说要运行程序的日期。
当我启动这个包装器时,一切看起来都很好。然而,当我按下^C
时,包装器脚本似乎会在终止之前,从进程池中启动额外的numproc
个进程。举个例子,当我启动脚本处理第1到第10天的数据时,我在ps aux
的输出中可以看到有两个二进制程序在运行(通常是第1天和第3天的程序)。现在,当我按下^C
时,包装器脚本退出了,第1天和第3天的二进制程序消失了,但却有新的二进制程序在运行,第5天和第7天的。
所以在我看来,Pool
在最终结束之前又启动了numproc
个进程。
有没有人知道这是怎么回事,我该怎么处理呢?
1 个回答
在这个页面上,Jesse Noller,multiprocessing模块的作者,展示了处理KeyboardInterrupt
的正确方法是让子进程返回,而不是重新抛出这个异常。这样可以让主进程结束进程池。
不过,下面的代码显示,主进程在所有由pool.map
生成的任务完成后,才会进入except KeyboardInterrupt
的部分。这就是为什么(我认为)你在按下Ctrl-C
后,看到你的工作函数run_nlin
被多次调用的原因。
一个可能的解决方法是让所有的工作函数检查一个multiprocessing.Event
是否被设置。如果这个事件被设置了,工作函数就提前退出;如果没有,就继续进行长时间的计算。
import logging
import multiprocessing as mp
import time
logger = mp.log_to_stderr(logging.WARNING)
def worker(x):
try:
if not terminating.is_set():
logger.warn("Running worker({x!r})".format(x = x))
time.sleep(3)
else:
logger.warn("got the message... we're terminating!")
except KeyboardInterrupt:
logger.warn("terminating is set")
terminating.set()
return x
def initializer(terminating_):
# This places terminating in the global namespace of the worker subprocesses.
# This allows the worker function to access `terminating` even though it is
# not passed as an argument to the function.
global terminating
terminating = terminating_
def main():
terminating = mp.Event()
result = []
pool = mp.Pool(initializer=initializer, initargs=(terminating, ))
params = range(12)
try:
logger.warn("starting pool runs")
result = pool.map(worker, params)
pool.close()
except KeyboardInterrupt:
logger.warn("^C pressed")
pool.terminate()
finally:
pool.join()
logger.warn('done: {r}'.format(r = result))
if __name__ == '__main__':
main()
运行这个脚本会得到:
% test.py
[WARNING/MainProcess] starting pool runs
[WARNING/PoolWorker-1] Running worker(0)
[WARNING/PoolWorker-2] Running worker(1)
[WARNING/PoolWorker-3] Running worker(2)
[WARNING/PoolWorker-4] Running worker(3)
这里按下了Ctrl-C;每个工作进程都设置了terminating
事件。其实只需要一个进程设置这个事件就可以了,但这样做虽然有点小效率损失,还是能正常工作。
C-c C-c[WARNING/PoolWorker-4] terminating is set
[WARNING/PoolWorker-2] terminating is set
[WARNING/PoolWorker-3] terminating is set
[WARNING/PoolWorker-1] terminating is set
现在,所有其他由pool.map
排队的任务都被执行:
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-4] got the message... we're terminating!
[WARNING/PoolWorker-2] got the message... we're terminating!
[WARNING/PoolWorker-1] got the message... we're terminating!
[WARNING/PoolWorker-3] got the message... we're terminating!
最后,主进程到达except KeyboardInterrupt
的部分。
[WARNING/MainProcess] ^C pressed
[WARNING/MainProcess] done: []