Python进程池非守护进程?

147 投票
10 回答
80525 浏览
提问于 2025-04-16 23:04

有没有可能创建一个非守护进程的Python进程池?我想要一个池子,可以调用里面还有另一个池子的函数。

我想这样做是因为守护进程不能创建新的进程。具体来说,这会导致出现错误:

AssertionError: daemonic processes are not allowed to have children

举个例子,假设有一个场景,function_a 里面有一个池子,它运行 function_b,而 function_b 里面又有一个池子,它运行 function_c。这个函数链会失败,因为 function_b 是在一个守护进程中运行的,而守护进程是不能创建进程的。

10 个回答

38

从Python 3.8开始,concurrent.futures.ProcessPoolExecutor这个工具就没有之前的限制了。它可以轻松地使用嵌套的进程池,完全没有问题:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

上面的示例代码是在Python 3.8下测试的。

不过,ProcessPoolExecutor有一个限制,就是它没有maxtasksperchild这个选项。如果你需要这个功能,可以看看Massimiliano的回答

感谢: jfs的回答

58

我需要在Python 3.7中使用一个非守护进程的池,最后我调整了一个在接受的回答中发布的代码。下面是创建这个非守护进程池的代码片段:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于当前的 multiprocessing 实现已经进行了大量重构,基于上下文来工作,我们需要提供一个 NoDaemonContext 类,这个类里面有我们的 NoDaemonProcess 作为属性。然后,NestablePool 将使用这个上下文,而不是默认的上下文。

不过,我得提醒你,这种方法有至少两个需要注意的地方:

  1. 它仍然依赖于 multiprocessing 包的实现细节,因此随时可能会出问题。
  2. 有很多合理的原因让 multiprocessing 不容易使用非守护进程,很多原因可以在这里找到。就我个人而言,最有说服力的理由是:

允许子线程再创建自己的子进程(即孙子进程)会有风险,如果父线程或子线程在子进程完成并返回之前就结束了,就可能会出现一大堆“僵尸”孙子进程。

151

multiprocessing.pool.Pool这个类会在它的初始化方法里创建工作进程,并把这些进程设置为守护进程(也就是后台进程),然后启动它们。在这些进程启动之前,你不能把它们的daemon属性改成False,而且一旦启动后就更不能改了。不过,你可以自己创建一个multiprocessing.pool.Pool的子类(multiprocessing.Pool其实只是一个包装函数),然后用你自己定义的multiprocessing.Process子类来替代,这样你创建的进程就不会是守护进程了。

下面是一个完整的例子,展示了怎么做。重要的部分是最上面的两个类NoDaemonProcessMyPool,还有在最后要对你的MyPool实例调用pool.close()pool.join()

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

撰写回答