如何在Python多进程池中运行清理代码?

8 投票
2 回答
8685 浏览
提问于 2025-04-16 18:02

我有一些在Windows上运行的Python代码,它使用了多进程模块来启动一组工作进程。每个工作进程在执行完map_async方法后,需要进行一些清理工作。

有没有人知道该怎么做呢?

2 个回答

2

你唯一真正的选择是在你使用 map_async 的函数结束时进行清理。

如果这个清理是为了在程序结束时执行的,你就不能使用池的概念。它们是两个不同的东西。池不会决定进程的生命周期,除非你使用 maxtasksperchild,这个功能是在 Python 2.7 中新增的。即使如此,你也无法在进程结束时运行代码。不过,maxtasksperchild 可能对你有用,因为当进程被终止时,它打开的任何资源都会被释放。

话虽如此,如果你有很多函数需要进行清理,你可以设计一个装饰器来避免重复工作。下面是我所说的一个例子:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

当你执行这个代码时(假设 stdout 没有因为我没有加锁而混乱),你应该能看到输出的顺序表明你的清理任务是在每个任务结束时运行的:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]
4

你真的想要在每个工作进程结束时运行一次清理函数,而不是在每个由 map_async 创建的任务结束时运行一次吗?

multiprocess.pool.Pool 会创建一个工作进程池,比如说有8个工作进程。然后 map_async 可能会提交40个任务,让这40个任务在8个工作进程之间分配。我能理解你为什么想在每个任务结束时运行清理代码,但我很难理解你为什么只想在每个8个工作进程结束前运行清理代码。

不过,如果你真的想这么做,你可以通过修改 multiprocessing.pool.worker 来实现:

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

结果是:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP

撰写回答