当您map
一个multiprocessing.Pool
的iterable时,迭代是在开始时为池中的每个进程划分为一个队列,还是有一个公共队列,当进程空闲时从该队列执行任务?
def generate_stuff():
for foo in range(100):
yield foo
def process(moo):
print moo
pool = multiprocessing.Pool()
pool.map(func=process, iterable=generate_stuff())
pool.close()
因此,考虑到这个未经测试的建议代码;如果池中有4个进程,则每个进程是否分配25个要做的事情,或者是否每个进程逐个挑选100个要做的事情,以便每个进程可以执行不同数量的事情,例如30、26、24、20。
好吧,显而易见的答案是测试一下。
因此,测试可能不会告诉您太多,因为作业将尽快完成,而且即使池进程在准备就绪时抓取作业,结果也可能是均匀分布的。但有一个简单的方法可以解决这个问题:
如果这些数字是“参差不齐”的,那么您就知道,池进程必须在准备就绪的情况下获取新的作业。(我显式地将
chunksize
设置为1,以确保块不会太大,以至于每个块一开始只能得到一个块。)当我在8核机器上运行时:
因此,看起来这些流程正在快速获得新的工作。
因为你特别问了4个工人,我把
Pool()
改成了Pool(4)
,得到了这个:然而,有一种比测试更好的方法可以发现:readthe source。
如您所见,
map
只需调用map_async
,这将创建一堆批并将它们放在self._taskqueue
对象(一个Queue.Queue
实例)上。如果进一步阅读,则此队列不会直接与其他进程共享,但有一个池管理器线程,每当进程完成并返回结果时,该线程会从队列中弹出下一个作业并将其提交回进程。这也是找出
map
的默认块大小的方法。上面链接的2.7实现表明,它只是len(iterable) / (len(self._pool) * 4)
向上舍入(比避免分数运算稍微详细一些),或者换句话说,只够每个进程4个块。但您真的不应该依赖于此;文档含糊不清地间接地暗示它将使用某种启发式方法,但并不能保证这将是什么。因此,如果您真的需要“每个进程大约4个块”,请显式地计算它。更现实地说,如果您需要默认值之外的任何东西,那么您可能需要一个特定于域的值(通过计算、猜测或分析)。http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map
我假设一个进程在处理完前一个块时从队列中提取下一个块。
默认值
chunksize
取决于iterable
的长度,因此选择的块数大约是进程数的四倍。(source)要估计Python实现使用的
chunksize
而不查看其multiprocessing
模块源代码,请运行:它表明,
imap
,imap_unordered
默认使用chunksize=1
,而map
的max_chunksize
取决于nprocesses
,nitem
(每个进程的块数不固定)和max_chunksize
取决于python版本。如果指定了*map*
函数,则所有函数都会考虑chunksize
参数。使用量
要查看单个作业的分布方式,请指定
--verbose
参数。相关问题 更多 >
编程相关推荐