多处理池是否为每个进程提供相同数量的任务,或者它们是否被分配为可用的任务?

2024-06-09 21:34:48 发布

您现在位置:Python中文网/ 问答频道 /正文

当您map一个multiprocessing.Pool的iterable时,迭代是在开始时为池中的每个进程划分为一个队列,还是有一个公共队列,当进程空闲时从该队列执行任务?

    def generate_stuff():
        for foo in range(100):
             yield foo

    def process(moo):
        print moo

    pool = multiprocessing.Pool()
    pool.map(func=process, iterable=generate_stuff())
    pool.close()

因此,考虑到这个未经测试的建议代码;如果池中有4个进程,则每个进程是否分配25个要做的事情,或者是否每个进程逐个挑选100个要做的事情,以便每个进程可以执行不同数量的事情,例如30、26、24、20。


Tags: mapfoo队列进程defiterable事情multiprocessing
3条回答

So given this untested suggestion code; if there are 4 processes in the pool does each process get allocated 25 stuffs to do, or do the 100 stuffs get picked off one by one by processes looking for stuff to do so that each process might do a different number of stuffs, eg 30, 26, 24, 20.

好吧,显而易见的答案是测试一下。

因此,测试可能不会告诉您太多,因为作业将尽快完成,而且即使池进程在准备就绪时抓取作业,结果也可能是均匀分布的。但有一个简单的方法可以解决这个问题:

import collections
import multiprocessing
import os
import random
import time

def generate_stuff():
    for foo in range(100):
        yield foo

def process(moo):
    #print moo
    time.sleep(random.randint(0, 50) / 10.)
    return os.getpid()

pool = multiprocessing.Pool()
pids = pool.map(func=process, iterable=generate_stuff(), chunksize=1)
pool.close()
print collections.Counter(pids)

如果这些数字是“参差不齐”的,那么您就知道,池进程必须在准备就绪的情况下获取新的作业。(我显式地将chunksize设置为1,以确保块不会太大,以至于每个块一开始只能得到一个块。)

当我在8核机器上运行时:

Counter({98935: 16, 98936: 16, 98939: 13, 98937: 12, 98942: 12, 98938: 11, 98940: 11, 98941: 9})

因此,看起来这些流程正在快速获得新的工作。

因为你特别问了4个工人,我把Pool()改成了Pool(4),得到了这个:

Counter({98965: 31, 98962: 24, 98964: 23, 98963: 22})

然而,有一种比测试更好的方法可以发现:readthe source

如您所见,map只需调用map_async,这将创建一堆批并将它们放在self._taskqueue对象(一个Queue.Queue实例)上。如果进一步阅读,则此队列不会直接与其他进程共享,但有一个池管理器线程,每当进程完成并返回结果时,该线程会从队列中弹出下一个作业并将其提交回进程。

这也是找出map的默认块大小的方法。上面链接的2.7实现表明,它只是len(iterable) / (len(self._pool) * 4)向上舍入(比避免分数运算稍微详细一些),或者换句话说,只够每个进程4个块。但您真的不应该依赖于此;文档含糊不清地间接地暗示它将使用某种启发式方法,但并不能保证这将是什么。因此,如果您真的需要“每个进程大约4个块”,请显式地计算它。更现实地说,如果您需要默认值之外的任何东西,那么您可能需要一个特定于域的值(通过计算、猜测或分析)。

http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.map

map(func, iterable[, chunksize])

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

我假设一个进程在处理完前一个块时从队列中提取下一个块。

默认值chunksize取决于iterable的长度,因此选择的块数大约是进程数的四倍。(source)

要估计Python实现使用的chunksize而不查看其multiprocessing模块源代码,请运行:

#!/usr/bin/env python
import multiprocessing as mp
from itertools import groupby

def work(index):
    mp.get_logger().info(index)
    return index, mp.current_process().name

if __name__ == "__main__":
    import logging
    import sys
    logger = mp.log_to_stderr()

    # process cmdline args
    try:
        sys.argv.remove('--verbose')
    except ValueError:
        pass  # not verbose
    else:
        logger.setLevel(logging.INFO)  # verbose
    nprocesses, nitems = int(sys.argv.pop(1)), int(sys.argv.pop(1))
    # choices: 'map', 'imap', 'imap_unordered'
    map_name = sys.argv[1] if len(sys.argv) > 1 else 'map'
    kwargs = dict(chunksize=int(sys.argv[2])) if len(sys.argv) > 2 else {}

    # estimate chunksize used
    max_chunksize = 0
    map_func = getattr(mp.Pool(nprocesses), map_name)
    for _, group in groupby(sorted(map_func(work, range(nitems), **kwargs),
                                   key=lambda x: x[0]),  # sort by index
                            key=lambda x: x[1]):  # group by process name
        max_chunksize = max(max_chunksize, len(list(group)))
    print("%s: max_chunksize %d" % (map_name, max_chunksize))

它表明,imapimap_unordered默认使用chunksize=1,而mapmax_chunksize取决于nprocessesnitem(每个进程的块数不固定)和max_chunksize取决于python版本。如果指定了*map*函数,则所有函数都会考虑chunksize参数。

使用量

$ ./estimate_chunksize.py nprocesses nitems [map_name [chunksize]] [--verbose]

要查看单个作业的分布方式,请指定--verbose参数。

相关问题 更多 >