Python进程池非守护进程?
有没有可能创建一个非守护进程的Python进程池?我想要一个池子,可以调用里面还有另一个池子的函数。
我想这样做是因为守护进程不能创建新的进程。具体来说,这会导致出现错误:
AssertionError: daemonic processes are not allowed to have children
举个例子,假设有一个场景,function_a
里面有一个池子,它运行 function_b
,而 function_b
里面又有一个池子,它运行 function_c
。这个函数链会失败,因为 function_b
是在一个守护进程中运行的,而守护进程是不能创建进程的。
10 个回答
从Python 3.8开始,concurrent.futures.ProcessPoolExecutor
这个工具就没有之前的限制了。它可以轻松地使用嵌套的进程池,完全没有问题:
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
上面的示例代码是在Python 3.8下测试的。
不过,ProcessPoolExecutor
有一个限制,就是它没有maxtasksperchild
这个选项。如果你需要这个功能,可以看看Massimiliano的回答。
感谢: jfs的回答
我需要在Python 3.7中使用一个非守护进程的池,最后我调整了一个在接受的回答中发布的代码。下面是创建这个非守护进程池的代码片段:
import multiprocessing.pool
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
由于当前的 multiprocessing
实现已经进行了大量重构,基于上下文来工作,我们需要提供一个 NoDaemonContext
类,这个类里面有我们的 NoDaemonProcess
作为属性。然后,NestablePool
将使用这个上下文,而不是默认的上下文。
不过,我得提醒你,这种方法有至少两个需要注意的地方:
- 它仍然依赖于
multiprocessing
包的实现细节,因此随时可能会出问题。 - 有很多合理的原因让
multiprocessing
不容易使用非守护进程,很多原因可以在这里找到。就我个人而言,最有说服力的理由是:
允许子线程再创建自己的子进程(即孙子进程)会有风险,如果父线程或子线程在子进程完成并返回之前就结束了,就可能会出现一大堆“僵尸”孙子进程。
multiprocessing.pool.Pool
这个类会在它的初始化方法里创建工作进程,并把这些进程设置为守护进程(也就是后台进程),然后启动它们。在这些进程启动之前,你不能把它们的daemon
属性改成False
,而且一旦启动后就更不能改了。不过,你可以自己创建一个multiprocessing.pool.Pool
的子类(multiprocessing.Pool
其实只是一个包装函数),然后用你自己定义的multiprocessing.Process
子类来替代,这样你创建的进程就不会是守护进程了。
下面是一个完整的例子,展示了怎么做。重要的部分是最上面的两个类NoDaemonProcess
和MyPool
,还有在最后要对你的MyPool
实例调用pool.close()
和pool.join()
。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()