multiprocessing.Pool: 何时使用apply、apply_async或map?
我没有看到关于 Pool.apply、Pool.apply_async 和 Pool.map 的清晰示例和使用场景。我主要在使用 Pool.map
;那其他的有什么优点呢?
3 个回答
关于 apply
和 map
的区别:
pool.apply(f, args)
:这里的 f
只会在池中的一个工作进程里执行。所以池中的一个进程会运行 f(args)
。
pool.map(f, iterable)
:这个方法会把可迭代对象分成几个小块,然后把这些小块作为独立的任务提交给进程池。这样你就能利用池中的所有进程。
下面是一个表格,简单明了地展示了 Pool.apply
、Pool.apply_async
、Pool.map
和 Pool.map_async
之间的区别。在选择使用哪个时,你需要考虑多个参数、并发性、阻塞和顺序等因素:
| Multi-args Concurrence Blocking Ordered-results
---------------------------------------------------------------------
Pool.map | no yes yes yes
Pool.map_async | no yes no yes
Pool.apply | yes no yes no
Pool.apply_async | yes yes no no
Pool.starmap | yes yes yes yes
Pool.starmap_async| yes yes no no
注意事项:
Pool.imap
和Pool.imap_async
是map
和map_async
的懒惰版本。Pool.starmap
方法和map
方法非常相似,但它可以接收多个参数。Async
方法会一次性提交所有的任务,等所有任务完成后再获取结果。使用get
方法来获取结果。Pool.map
(或Pool.apply
)方法和 Python 内置的map
(或apply
)非常相似。它们会阻塞主进程,直到所有任务完成并返回结果。
示例:
map
一次性处理一系列任务
results = pool.map(func, [1, 2, 3])
apply
只能处理一个任务
for x, y in [[1, 1], [2, 2]]:
results.append(pool.apply(func, (x, y)))
def collect_result(result):
results.append(result)
map_async
一次性处理一系列任务
pool.map_async(func, jobs, callback=collect_result)
apply_async
只能处理一个任务,并且在后台并行执行这个任务
for x, y in [[1, 1], [2, 2]]:
pool.apply_async(worker, (x, y), callback=collect_result)
starmap
是 pool.map
的一种变体,支持多个参数
pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
starmap_async
是 starmap()
和 map_async()
的结合,遍历可迭代对象的可迭代对象,并将参数解包后调用函数。返回一个结果对象。
pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)
参考资料:
完整文档请查看这里: https://docs.python.org/3/library/multiprocessing.html
在早期的Python版本中,如果你想用任意参数来调用一个函数,你会使用 apply
。
apply(f,args,kwargs)
虽然 apply
在Python2.7中仍然存在,但在Python3中已经没有了,而且现在一般也不再使用它。现在大家更喜欢用
f(*args,**kwargs)
。multiprocessing.Pool
模块试图提供一个类似的接口。
Pool.apply
就像Python的 apply
,不过它是在一个单独的进程中执行函数调用。Pool.apply
会一直等到函数执行完毕。
Pool.apply_async
也和Python内置的 apply
类似,但它会立即返回,而不是等结果出来。它会返回一个 AsyncResult
对象。你可以调用这个对象的 get()
方法来获取函数调用的结果。get()
方法会一直等到函数执行完毕。因此,pool.apply(func, args, kwargs)
和 pool.apply_async(func, args, kwargs).get()
是等价的。
与 Pool.apply
不同,Pool.apply_async
方法还有一个回调功能,如果提供了这个回调函数,当原函数执行完成时会被调用。这样你就可以不必调用 get()
。
举个例子:
import multiprocessing as mp
import time
def foo_pool(x):
time.sleep(2)
return x*x
result_list = []
def log_result(result):
# This is called whenever foo_pool(i) returns a result.
# result_list is modified only by the main process, not the pool workers.
result_list.append(result)
def apply_async_with_callback():
pool = mp.Pool()
for i in range(10):
pool.apply_async(foo_pool, args = (i, ), callback = log_result)
pool.close()
pool.join()
print(result_list)
if __name__ == '__main__':
apply_async_with_callback()
可能会得到这样的结果
[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]
注意,与 pool.map
不同,结果的顺序可能和你调用 pool.apply_async
的顺序不一致。
所以,如果你需要在一个单独的进程中运行一个函数,但又想让当前进程等待这个函数返回结果,就用 Pool.apply
。像 Pool.apply
一样,Pool.map
也会等到完整的结果返回。
如果你想让工作进程池异步地执行多个函数调用,就用 Pool.apply_async
。结果的顺序不一定和你调用 Pool.apply_async
的顺序一致。
另外,你还可以用 Pool.apply_async
调用多个不同的函数(并不是所有的调用都需要使用同一个函数)。
而 Pool.map
是对多个参数应用同一个函数。不过,与 Pool.apply_async
不同,Pool.map
返回的结果顺序是和参数的顺序一致的。