使用multiprocessing池的apply_async方法时,回调由谁运行?
我想了解一下在使用多进程池的apply_sync方法时,背后发生了什么。
谁来运行回调方法?是调用apply_async的主进程吗?
假设我发出了很多apply_async命令,并且都设置了回调,然后继续执行我的程序。当apply_async开始完成时,我的程序还在忙着做其他事情。那回调是怎么在“主进程”还在忙的时候被运行的呢?
这里有个例子。
import multiprocessing
import time
def callback(x):
print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)
def func(x):
print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)
return x
pool = multiprocessing.Pool()
args = range(20)
for a in args:
pool.apply_async(func, (a,), callback=callback)
print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)
t0 = time.time()
while time.time() - t0 < 60:
pass
print 'Finished with the script'
输出结果大概是这样的:
PoolWorker-1 正在运行 func,参数是 0
PoolWorker-2 正在运行 func,参数是 1
PoolWorker-3 正在运行 func,参数是 2
MainProcess 正在睡觉一分钟 <-- 主进程正在忙
PoolWorker-4 正在运行 func,参数是 3
PoolWorker-1 正在运行 func,参数是 4
PoolWorker-2 正在运行 func,参数是 5
PoolWorker-3 正在运行 func,参数是 6
PoolWorker-4 正在运行 func,参数是 7
MainProcess 正在运行回调,参数是 0 <-- 主进程在循环中运行回调!!
MainProcess 正在运行回调,参数是 1
MainProcess 正在运行回调,参数是 2
MainProcess 正在运行回调,参数是 3
MainProcess 正在运行回调,参数是 4
PoolWorker-1 正在运行 func,参数是 8
...
脚本执行完毕
主进程是怎么在这个循环中运行回调的呢?
文档中关于回调的说明似乎给了我一些提示,但我不太明白。
apply_async(func[, args[, kwds[, callback]]])
apply()方法的一种变体,返回一个结果对象。
如果指定了回调,那么它应该是一个可调用的函数,接受一个参数。当结果准备好时,会调用这个回调(除非调用失败)。回调应该立即完成,否则处理结果的线程会被阻塞。
1 个回答
文档里确实有个提示:
回调函数应该立即完成,因为否则处理结果的线程会被阻塞。
回调函数是在主进程中处理的,但它们是在自己独立的线程中运行。当你创建一个 Pool
时,它实际上会在内部创建几个 Thread
对象:
class Pool(object):
Process = Process
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
... # stuff we don't care about
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
对我们来说,最有意思的线程是 _result_handler
;稍后我们会解释原因。
稍微转个话题,当你运行 apply_async
时,它会在内部创建一个 ApplyResult
对象来管理从子进程获取结果:
def apply_async(self, func, args=(), kwds={}, callback=None):
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
如你所见,_set
方法实际上是执行传入的 callback
的,前提是任务成功完成。此外,注意它在 __init__
cache 字典中。
现在,回到 _result_handler
线程对象。这个对象会调用 _handle_results
函数,代码如下:
while 1:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
break
if task is None:
debug('result handler got sentinel')
break
job, i, obj = task
try:
cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called!
except KeyError:
pass
# More stuff
这是一个循环,它从队列中提取子进程的结果,找到在 cache
中对应的条目,然后调用 _set
,执行我们的回调函数。即使你在一个循环中,它也能运行,因为它不是在主线程中运行的。