使用multiprocessing池的apply_async方法时,回调由谁运行?

47 投票
1 回答
16449 浏览
提问于 2025-04-18 13:29

我想了解一下在使用多进程池的apply_sync方法时,背后发生了什么。

谁来运行回调方法?是调用apply_async的主进程吗?

假设我发出了很多apply_async命令,并且都设置了回调,然后继续执行我的程序。当apply_async开始完成时,我的程序还在忙着做其他事情。那回调是怎么在“主进程”还在忙的时候被运行的呢?

这里有个例子。

import multiprocessing
import time

def callback(x):
    print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)

def func(x):
    print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
    return x

pool = multiprocessing.Pool()

args = range(20)

for a in args:
    pool.apply_async(func, (a,), callback=callback)

print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)

t0 = time.time()
while time.time() - t0 < 60:
    pass

print 'Finished with the script'

输出结果大概是这样的:

PoolWorker-1 正在运行 func,参数是 0

PoolWorker-2 正在运行 func,参数是 1

PoolWorker-3 正在运行 func,参数是 2

MainProcess 正在睡觉一分钟 <-- 主进程正在忙

PoolWorker-4 正在运行 func,参数是 3

PoolWorker-1 正在运行 func,参数是 4

PoolWorker-2 正在运行 func,参数是 5

PoolWorker-3 正在运行 func,参数是 6

PoolWorker-4 正在运行 func,参数是 7

MainProcess 正在运行回调,参数是 0 <-- 主进程在循环中运行回调!!

MainProcess 正在运行回调,参数是 1

MainProcess 正在运行回调,参数是 2

MainProcess 正在运行回调,参数是 3

MainProcess 正在运行回调,参数是 4

PoolWorker-1 正在运行 func,参数是 8

...

脚本执行完毕

主进程是怎么在这个循环中运行回调的呢?

文档中关于回调的说明似乎给了我一些提示,但我不太明白。

apply_async(func[, args[, kwds[, callback]]])

apply()方法的一种变体,返回一个结果对象。

如果指定了回调,那么它应该是一个可调用的函数,接受一个参数。当结果准备好时,会调用这个回调(除非调用失败)。回调应该立即完成,否则处理结果的线程会被阻塞。

1 个回答

46

文档里确实有个提示:

回调函数应该立即完成,因为否则处理结果的线程会被阻塞。

回调函数是在主进程中处理的,但它们是在自己独立的线程中运行。当你创建一个 Pool 时,它实际上会在内部创建几个 Thread 对象:

class Pool(object):
    Process = Process

    def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None):
        self._setup_queues()
        self._taskqueue = Queue.Queue()
        self._cache = {}
        ... # stuff we don't care about
        self._worker_handler = threading.Thread(
            target=Pool._handle_workers,
            args=(self, )
            )
        self._worker_handler.daemon = True
        self._worker_handler._state = RUN 
        self._worker_handler.start()

        self._task_handler = threading.Thread(
            target=Pool._handle_tasks,
            args=(self._taskqueue, self._quick_put, self._outqueue,
                  self._pool, self._cache)
            )
        self._task_handler.daemon = True
        self._task_handler._state = RUN 
        self._task_handler.start()

        self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
            )
        self._result_handler.daemon = True
        self._result_handler._state = RUN
        self._result_handler.start()

对我们来说,最有意思的线程是 _result_handler;稍后我们会解释原因。

稍微转个话题,当你运行 apply_async 时,它会在内部创建一个 ApplyResult 对象来管理从子进程获取结果:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self


    def _set(self, i, obj):
        self._success, self._value = obj
        if self._callback and self._success:
            self._callback(self._value)
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()
        del self._cache[self._job]

如你所见,_set 方法实际上是执行传入的 callback 的,前提是任务成功完成。此外,注意它在 __init__cache 字典中。

现在,回到 _result_handler 线程对象。这个对象会调用 _handle_results 函数,代码如下:

    while 1:
        try:
            task = get()
        except (IOError, EOFError):
            debug('result handler got EOFError/IOError -- exiting')
            return

        if thread._state:
            assert thread._state == TERMINATE
            debug('result handler found thread._state=TERMINATE')
            break

        if task is None:
            debug('result handler got sentinel')
            break

        job, i, obj = task
        try:
            cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!
        except KeyError:
            pass

        # More stuff

这是一个循环,它从队列中提取子进程的结果,找到在 cache 中对应的条目,然后调用 _set,执行我们的回调函数。即使你在一个循环中,它也能运行,因为它不是在主线程中运行的。

撰写回答