Python多处理:理解chunksize背后的逻辑`

2024-03-29 08:20:47 发布

您现在位置:Python中文网/ 问答频道 /正文

什么因素决定了像multiprocessing.Pool.map()这样的方法的最佳chunksize参数?.map()方法似乎对其默认块大小使用了一种任意的启发式方法(如下所述);是什么促使了这种选择,是否有基于某些特定情况/设置的更周到的方法?

比如说我是:

  • 把一个iterable传递给.map(),这个iterable有大约1500万个元素
  • 在24核的机器上工作,并在multiprocessing.Pool()内使用默认的^{}

我天真的想法是给24个工人每人一个同样大小的块,即15_000_000 / 24或625000块。大块应减少周转/管理费用,同时充分利用所有工人。但这似乎忽略了给每个工人大量生产的一些潜在不利因素。这是一张不完整的照片吗?我遗漏了什么?


我的部分问题源于ifchunksize=None的默认逻辑:同时.map().starmap()调用^{},如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

背后的逻辑是什么?这意味着块大小将更接近15_000_000 / (24 * 4) == 156_250。把len(self._pool)乘以4的目的是什么?

这使得生成的块大小比我上面的“原始逻辑”小4个因子,这包括将iterable的长度除以pool._pool中的工作者数量。

最后,还有一个来自Python文档的snippet进一步激发了我的好奇心:

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.


有帮助但有点太高的相关答案:Python multiprocessing: why are large chunksizes slower?


Tags: the方法selfnonemaplenifcallback
3条回答

我认为你缺少的部分原因是你天真的估计假设每个工作单元花费的时间相同,在这种情况下,你的策略是最好的。但是,如果某些作业比其他作业完成得更快,则某些核心可能会变得空闲,等待缓慢的作业完成。

因此,通过将块分解成4倍多的块,如果一个块提前完成,那么核心可以开始下一个块(而其他核心继续处理速度较慢的块)。

我不知道他们为什么选择因子4,但这将是在最小化映射代码的开销(这需要尽可能大的块)和平衡块占用不同时间(这需要尽可能小的块)之间的权衡。

About this answer

This answer is Part II of the accepted answer above.


7号。Naive与Pool的Chunksize算法

在详细讨论之前,请考虑下面的两个gif。对于不同长度的iterable范围,它们显示了两个比较的算法如何对传递的iterable进行分块(到那时它将是一个序列)以及生成的任务如何分布。工作线程的顺序是随机的,实际上每个工作线程的分布式任务数可能与此图像不同,这些图像适用于较宽场景中的轻型任务和/或任务。如前所述,这里也不包括间接费用。然而,对于在传输数据量可忽略的密集场景中的足够重的任务,实际计算得出的结果非常相似。

cs_4_50

cs_200_250

如第5章所示。Pool's Chunksize Algorithm“”,使用Pool's Chunksize Algorithm,对于足够大的iterable,chunks的数量将稳定在n_chunks == n_workers * 4处,同时它使用朴素的方法在n_chunks == n_workersn_chunks == n_workers + 1之间切换。对于应用的naive算法:因为n_chunks % n_workers == 1True对于n_chunks == n_workers + 1,将创建一个新的节,其中只使用一个worker。

Naive Chunksize-Algorithm:

You might think you created tasks in the same number of workers, but this will only be true for cases where there is no remainder for len_iterable / n_workers. If there is a remainder, there will be a new section with only one task for a single worker. At that point your computation will not be parallel anymore.

下面的图与第5章中的图相似,但显示的是节数而不是块数。对于Pool的完整chunksize算法(n_pool2),n_sections将稳定在臭名昭著的硬编码因子4。对于naive算法,n_sections将在1和2之间交替。

figure10

对于Pool的chunksize算法,通过前面提到的额外处理,在n_chunks = n_workers * 4处保持稳定,防止在此处创建新的节,并将空闲共享限制为一个工作线程足够长的时间。不仅如此,该算法还将不断缩小空闲份额的相对大小,从而使RDE值收敛到100%。

例如,n_workers=4的“足够长”就是len_iterable=210。对于等于或大于该值的iterable,空闲共享将限制为一个worker,这是最初由于chunksize算法中的4乘法而丢失的特性。

figure11

naive chunksize算法也收敛到100%,但收敛速度较慢。会聚效应完全取决于这样一个事实:在有两个部分的情况下,尾部的相对部分会收缩。只有一个雇员的尾部被限制在x轴长度n_workers - 1,这是len_iterable / n_workers可能的最大余数。

How do actual RDE values differ for the naive and Pool's chunksize-algorithm?

下面是两张热图,显示了小于等于5000的所有iterable长度的RDE值,以及小于等于2小于等于100的所有工人的RDE值。 色阶从0.5到1(50%-100%)。你会注意到更多的黑暗区域(较低的RDE值)为天真的算法在左边的热图。相反,右边Pool的chunksize算法绘制了一幅更加阳光明媚的图片。

figure12

左下角暗角与右上角亮角的对角线梯度,再次显示出对工人数量的依赖,即所谓的“长iterable”。

How bad can it get with each algorithm?

在Pool的chunksize算法中,81.25%的RDE

figure13

使用朴素的chunksize算法,情况会变得更糟。计算得出的最低RDE为50.72%。在这种情况下,几乎一半的计算时间只有一个工人在运行!所以,小心,骄傲的Knights Landing主人。;)

figure14


8个。真实性检查

在前几章中,我们考虑了纯数学分布问题的一个简化模型,从使多处理首先成为一个棘手话题的细节中剥离出来。为了更好地理解分布模型(DM)单独有助于解释实际中观察到的工人利用率,我们现在来看看由real计算绘制的并行计划。

设置

下面的图都处理了一个简单的、cpu绑定的伪函数的并行执行,这个伪函数用不同的参数调用,这样我们就可以观察绘制的并行调度如何随输入值的变化而变化。此函数中的“工作”仅包含范围对象上的迭代。这已经足够让核心保持忙碌,因为我们传递了大量的数字。可选地,该函数接受一些taskel惟一的额外data,该额外^{。由于每个taskel包含的工作量都是完全相同的,因此我们仍然在处理一个密集的场景。

函数的修饰是一个包装器,它采用ns分辨率的时间戳(Python 3.7+)。时间戳用于计算taskel的时间跨度,因此可以绘制经验并行调度。

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Pool的starmap方法也被修饰为只有starmap调用本身是计时的。”此调用的“开始”和“结束”确定生成的并行计划的x轴上的最小值和最大值。

我们将观察一台机器上四个工作进程上40个任务的计算,这些任务的规格如下: Python 3.7.1、Ubuntu 18.04.2、英特尔酷睿™ i7-2600K CPU@3.40GHz×8

将改变的输入值是for循环中的迭代次数 (30k、30M、600M)和额外的发送数据大小(每个taskel、numpy ndarray:0 MiB、50 MiB)。

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

下面所示的运行是经过精心挑选的,它们具有相同的块顺序,因此与分布模型中的并行调度相比,您可以更好地发现差异,但不要忘记工作人员获得任务的顺序是不确定的。

DM预测

重申一下,分布模型“预测”了一个平行的时间表,就像我们在第6.2章中已经看到的那样:

figure15

第一次运行:每个taskel 3万次迭代和0 MiB数据

figure16

我们在这里的第一次运行非常短,任务非常“轻”。整个pool.starmap()调用总共只花了14.5毫秒。 您会注意到,与DM相反,空闲不限于尾部部分,而是在任务之间甚至在任务之间发生。这是因为我们真正的日程安排自然包括各种开销。这里的空闲意味着taskel的所有外部。可能真实空闲期间,taskel没有捕获之前已经提到的内容。

你还可以看到,并不是所有的工人都能同时完成他们的任务。这是因为所有的工作线程都通过共享的inqueue进行传输,并且一次只能读取一个工作线程。这同样适用于outqueue。这可能会导致更大的不安,只要你传输非边际大小的数据,我们将看到稍后。

此外,您可以看到,尽管每个taskel包含相同的工作量,但实际测量的taskel时间跨度变化很大。分发给worker-3和worker-4的任务比前两个worker处理的任务需要更多的时间。对于这次运行,我怀疑这是由于turbo boost此时worker-3/4的核心不再可用,所以他们以较低的时钟速率处理任务。

整个计算过程非常轻巧,硬件或操作系统引入的混沌因子会使PS急剧倾斜。计算是“风中之叶”,即使对于理论上合适的情况,DM预测也没有什么意义。

第二次运行:每个任务3000万次迭代和0 MiB数据

figure17

将for循环中的迭代次数从30000次增加到3000万次,可以得到一个接近完美ma的真正并行调度与由DM提供的数据预测的结果一致,hurray!每个taskel的计算量现在已经足够大,可以在开始和中间忽略空闲部分,只让DM预测的大空闲部分可见。

第三次运行:每任务3000万次迭代和50 MiB数据

figure18

保持30M的迭代,但是每个taskel来回发送50mib会再次扭曲图片。在这里排队效应是显而易见的。Worker-4需要比Worker-1等待第二个任务更长的时间。现在想象一下这个有70名工人的时间表吧!

如果任务在计算上非常轻巧,但是可以提供大量数据作为有效负载,那么单个共享队列的瓶颈可能会阻止向池中添加更多工作线程的任何额外好处,即使它们由物理核心支持。在这种情况下,工人-1可以完成它的第一个任务,并等待一个新的工作,甚至在工人-40已经得到它的第一个任务。

现在应该很明显了,为什么一个Pool中的计算时间并不总是随着工作人员的数量而减少。沿着发送相对大量的数据可能会导致出现这样的情况:大部分时间都花在等待数据复制到一个工作者的地址空间,而一次只能给一个工作者供电。

第4次运行:每个taskel有6亿次迭代和50 MiB数据

figure19

在这里,我们再次发送50mib,但将迭代次数从30M增加到600M,从而使总计算时间从10s增加到152s。再次绘制的并行调度,与预测的调度接近完美匹配,通过数据复制产生的开销被边缘化。


9号。结论

讨论的4乘法增加了调度灵活性,但也利用了taskel分布中的不均匀性。如果没有这种乘法运算,空闲共享将仅限于一个工人,即使是短期的iterable(对于密集场景下的DM)。Pool的chunksize算法需要输入的iterables具有一定的大小才能恢复这种特性。

正如这个答案所希望的那样,Pool的chunksize算法与naive方法相比,平均而言,至少在平均情况下,并且在不考虑长开销的情况下,可以获得更好的核心利用率。这里的naive算法的分布效率(DE)可以低至~51%,而Pool的chunksize算法的分布效率低至~81%。DE但是不包括IPC这样的并行化开销(PO)。第8章表明,对于具有边际化开销的稠密场景,DE仍然具有很强的预测能力。

尽管Pool的chunksize算法比naive方法获得了更高的DEDE,但它并没有为每个输入星座提供最优的taskel分布。而一个简单的静态chunking算法不能优化(包括开销)并行效率(PE),它不可能总是提供100%的相对分布效率(RDE),也就是说,与chunksize=1相同。一个简单的chunksize算法只包含基本的数学,并且可以以任何方式“切蛋糕”。

与Pool实现的“等大小分块”算法不同,“均匀大小分块”算法将为每个len_iterable/n_workers组合提供100%的RDE。均匀大小的分块算法在Pool的源代码中实现起来稍微复杂一些,但是可以在现有算法的基础上进行调整,只需将任务打包到外部即可(我将从这里链接,以防我在如何实现这一点上落下一个Q/A)。

简短的回答

Pool的chunksize算法是一种启发式算法。它为您试图填充到Pool方法中的所有可想象的问题场景提供了一个简单的解决方案。因此,它不能针对任何特定的场景进行优化。

该算法任意地将iterable划分为大约4倍于naive方法的块。更多的块意味着更多的开销,但是增加了调度的灵活性。这个答案将如何显示,这将导致平均更高的工人利用率,但是没有保证每种情况下更短的总体计算时间。

“很高兴知道”你可能会想,“但是知道这对我解决具体的多处理问题有什么帮助呢?”好吧,不是的。更诚实的简短回答是,“没有简短的答案”,“多处理是复杂的”和“取决于”。观察到的症状可能有不同的根源,即使是在类似的情况下。

这个答案试图为您提供一些基本概念,帮助您更清楚地了解Pool的调度黑盒。它还试图给你一些基本的工具来识别和避免潜在的悬崖,因为它们与块大小有关。

Table of Contents

Part I

  1. Definitions
  2. Parallelization Goals
  3. Parallelization Scenarios
  4. Risks of Chunksize > 1
  5. Pool's Chunksize-Algorithm
  6. Quantifying Algorithm Efficiency

    6.1 Models

    6.2 Parallel Schedule

    6.3 Efficiencies

    6.3.1 Absolute Distribution Efficiency (ADE)

    6.3.2 Relative Distribution Efficiency (RDE)

Part II

  1. Naive vs. Pool's Chunksize-Algorithm
  2. Reality Check
  3. Conclusion

有必要先澄清一些重要的术语。


一。定义


这里的块是池方法调用中指定的iterable参数的共享。如何计算chunksize以及它会产生什么样的影响,是这个答案的主题。


任务

工作进程中任务的物理表示形式(以数据表示)如下图所示。

figure0

此图显示了对pool.map()的调用示例,该调用沿着代码行显示,取自multiprocessing.pool.worker函数,在该函数中,从inqueue读取的任务将被解包。worker是池工作进程的MainThread中的基本主函数。pool方法中指定的func-参数将只与apply_async等单调用方法的worker-函数中的func-变量和imapchunksize=1匹配。对于其他带有chunksize-参数的池方法,处理函数func将是映射器函数(mapstarstarmapstar)。此函数将用户指定的func-参数映射到iterable(-->;“map tasks”)传输块的每个元素上。所需时间将任务定义为工作单元。


Taskel

虽然一个块的整个处理的单词“task”的用法与multiprocessing.pool中的代码相匹配,但没有迹象表明对用户指定的func使用一个 应将块的元素作为参数引用。为了避免命名冲突带来的混淆(考虑池的maxtasksperchild方法的__init__参数),这个答案将引用 任务中的单个工作单元称为taskel。

A taskel (from task + element) is the smallest unit of work within a task. It is the single execution of the function specified with the func-parameter of a Pool-method, called with arguments obtained from a single element of the transmitted chunk. A task consists of chunksizetaskels.


并行化开销(PO)

PO由Python内部开销和进程间通信(IPC)开销组成。Python中的每个任务开销都附带打包和解包任务及其结果所需的代码。IPC开销包括必要的线程同步和不同地址空间之间的数据复制(需要两个复制步骤:父级->;队列->;子级)。IPC开销的大小取决于操作系统、硬件和数据大小,这使得对影响的概括变得困难。


2。并行化目标

当使用多处理时,我们的总体目标(显然)是最小化所有任务的总处理时间。为了实现这一总体目标,我们的技术目标需要优化硬件资源的利用。

实现技术目标的一些重要子目标是:

  • 最小化并行化开销(最著名的,但不是单独的:IPC
  • 所有cpu核心的高利用率
  • 限制内存使用以防止操作系统过度分页(trashing

首先,这些任务需要足够的计算量(密集),以获得并行化所需的回报。PO的相关性随着每任务绝对计算时间的增加而降低。或者,换句话说,对于您的问题,每个任务的绝对计算时间越大,减少PO的必要性就越小。如果每个taskel的计算将花费数小时,则IPC开销相比之下可以忽略不计。这里的主要关注点是防止在分配完所有任务后使工作进程空闲。保持所有核心加载意味着,我们尽可能地并行化。


三。并行化场景

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()

问题的主要因素是,在我们的单个任务中,有多少计算时间可以变化。要命名它,最佳块大小的选择取决于。。。

Coefficient of Variation (CV) for computation times per taskel.

从这种变化的程度来看,在一个尺度上有两种极端情况:

  1. 所有任务都需要完全相同的计算时间。
  2. 任务可能需要几秒钟或几天才能完成。

为了更好地记忆,我将这些场景称为:

  1. 密集场景
  2. 广泛的场景


密集场景

在密集的场景中,最好一次分发所有任务,将必需的IPC和上下文切换保持在最低限度。这意味着我们只想创建尽可能多的块,尽可能多的工作进程。如上所述,PO的权重随着每个taskel计算时间的缩短而增加。

为了获得最大的吞吐量,我们还希望所有工作进程都处于忙碌状态,直到处理完所有任务(没有空闲的工作进程)。为了实现这个目标,分布的块应该大小相等或接近。


广阔的前景

对于一个广泛的场景来说,最好的例子是一个优化问题,在这个问题中,结果要么快速收敛,要么计算可能需要数小时,甚至数天。通常,在这种情况下,无法预测任务将包含“轻任务”和“重任务”的混合,因此,在一个任务批中同时分发太多任务是不可取的。一次分配的任务比可能的要少,这意味着增加了调度的灵活性。这是需要在这里达到我们的子目标,所有核心的高利用率。

默认情况下,如果Pool方法完全针对密集场景进行优化,那么它们将越来越多地为靠近广泛场景的每个问题创建次优的计时。


四。块大小风险1

考虑一下这个简化的伪代码示例,它是一个广泛的场景-iterable,我们希望将其传递到pool方法中:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

与实际值不同的是,我们假设所需的计算时间以秒为单位,为了简单起见,只有1分钟或1天。 我们假设池有四个工作进程(在四个核心上),并且chunksize设置为2。由于订单将被保留,发送给工人的数据块将是:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

因为我们有足够多的工人,并且计算时间足够长,我们可以说,每个工人进程首先都会得到一个块来工作。(快速完成任务不一定要这样)。此外,我们可以说,整个处理过程大约需要86400+60秒,因为这是这个人工场景中一个块的最高总计算时间,我们只分配一次块。

现在考虑这个iterable,它只有一个元素与以前的iterable相比,切换其位置:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

…以及相应的块:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

只可惜我们的iterable的排序几乎翻了一番(86400+86400)我们的总处理时间!得到恶意(8640086400)块的工人正在阻止任务中的第二个重任务分配给已经完成(60,60)块的空闲工人之一。显然,如果我们设置chunksize=1,我们就不会冒这样不愉快的后果的风险。

这是大块的风险。对于较大的块大小,我们用调度灵活性换取较少的开销,在上面这样的情况下,这是一个糟糕的交易。

我们将在第6章中看到什么。量化算法的效率,较大的块大小也可能导致对密集场景的次优结果


5个。池的块大小算法

下面您将在源代码中找到算法的稍加修改的版本。如您所见,我切断了下半部分并将其包装成一个函数,用于外部计算chunksize参数。我还用factor参数替换了4,并外包了len()调用。

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

为了确保我们都在同一个页面上,下面是divmod要做的:

divmod(x, y)是一个内置函数,返回(x//y, x%y)x // y是底除法,从x / y返回向下舍入的商,而 x % y是从x / y返回余数的模运算。 因此,例如divmod(10, 3)返回(3, 1)

现在,当你看chunksize, extra = divmod(len_iterable, n_workers * 4)时,你会注意到n_workers这里是x / y中的除数y,并乘以4,而不需要以后通过if extra: chunksize +=1进行进一步的调整,会导致初始块大小至少比原来小4倍(对于len_iterable >= n_workers * 4)。

要查看乘以4对中间块大小结果的影响,请考虑以下函数:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

上面的函数计算原始块大小(cs_naive)和池的块大小算法的第一步块大小(cs_pool1),以及完整池算法的块大小(cs_pool2)。此外,它还计算实际因子rf_pool1 = cs_naive / cs_pool1rf_pool2 = cs_naive / cs_pool2,这些因子告诉我们原始计算的块大小比池的内部版本大多少倍。

下面是用这个函数的输出创建的两个图。左边的图只显示了n_workers=4的块大小,直到可输入的长度500。右图显示rf_pool1的值。对于iterable length16,实际因子变为>=4(对于len_iterable >= n_workers * 4),对于iterable length28-31,最大值为7。这与原来的因子有很大的偏差,算法收敛到更长的迭代次数这里的“Longer”是相对的,取决于指定的工人数。

figure1

记住,chunksizecs_pool1仍然缺少extra调整,而完整算法中cs_pool2包含的divmod中的剩余部分。

算法继续:

if extra:
    chunksize += 1

现在,如果有的余数(divmod操作的一个extra),那么将chunksize增加1显然不能解决每个任务。毕竟,如果是这样的话,就不会有余数了。

从下图中可以看出,“额外处理”的效果是,rf_pool2实因子现在从4下面的4收敛,并且偏差稍微平滑一些。n_workers=4len_iterable=500的标准差从0.5233rf_pool1下降到0.4115rf_pool2

figure2

最后,将chunksize增加1会产生这样的效果,即发送的最后一个任务的大小只有len_iterable % chunksize or chunksize

更有趣的是d我们以后将如何看到,更重要的是,额外治疗的效果,但是可以观察到生成块的数量(n_chunks)。 对于足够长的iterable,Pool完成的chunksize算法(下图中的n_pool2)将把chunks的数量稳定在n_chunks == n_workers * 4。 相反,随着iterable长度的增长,naive算法(在初始burp之后)在n_chunks == n_workersn_chunks == n_workers + 1之间保持交替。

figure3

下面您将看到Pool和naive chunksize算法的两个增强信息函数。下一章将需要这些函数的输出。

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

不要被calc_naive_chunksize_info可能出乎意料的外观所迷惑。来自divmodextra不用于计算块大小。

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6。量化算法效率

现在,在我们看到Pool的chunksize算法的输出与naive算法的输出看起来有什么不同之后。。。

  • 如何判断Pool的方法是否真的改善了某些东西?
  • 这究竟是什么东西?

如前一章所示,对于更长的iterable(任务数更多),Pool的chunksize算法大约将iterable划分为比naive方法多4倍的块。较小的数据块意味着更多的任务,而更多的任务意味着更多的并行化开销(PO),这一成本必须与增加的调度灵活性相权衡(回忆一下“数据块大小风险”1“)。

由于相当明显的原因,Pool的基本chunksize算法不能将调度灵活性与我们的PO相比较。IPC开销取决于操作系统、硬件和数据大小。算法不知道我们在什么硬件上运行代码,也不知道taskel需要多长时间才能完成。它是一个启发式的,为所有可能的场景提供基本功能。这意味着它不能针对任何特定场景进行优化。如前所述,PO也变得越来越不关心每个taskel的计算时间(负相关)。

当您回忆起第2章中的并行化目标时,一个要点是:

  • 所有cpu核心的高利用率

前面提到的一些,Pool的chunksize算法可以尝试改进的是最小化空闲的工作进程,分别是cpu核的利用率。

关于multiprocessing.Pool的一个重复的问题是,在您希望所有工作进程都很忙的情况下,人们会对未使用的核心/空闲的工作进程产生疑问。虽然这可能有许多原因,但在计算结束时空闲的工作进程是我们经常可以观察到的,即使在工作进程数不是块数的除数的情况下(等于每个任务的计算时间),也可以观察到这种情况。

现在的问题是:

How can we practically translate our understanding of chunksizes into something which enables us to explain observed worker-utilization, or even compare the efficiency of different algorithms in that regard?


6.1模型

为了在这里获得更深入的见解,我们需要一种并行计算的抽象形式,它将过于复杂的现实简化到可管理的复杂程度,同时在定义的边界内保持重要性。这种抽象称为模型。这种“并行化模型”(PM)的实现会生成工作映射元数据(时间戳),如果要收集数据,则实际计算也会这样。模型生成的元数据允许在特定约束下预测并行计算的度量。

figure4

这里定义的PM中的两个子模型之一是分布模型删除(DM)。DM解释了当不考虑除相应的chunksize算法、工人数量、输入iterable(任务数量)及其计算持续时间以外的其他因素时,原子工作单元(任务)如何分布在并行工人和时间上。这意味着任何形式的开销都不包括在内。

为了获得一个完整的PMDM被扩展为一个开销模型(OM),表示各种形式的并行开销(PO)。这样的模型需要为每个节点分别进行校准(硬件和操作系统依赖)。在aOM中表示多少形式的开销是开放的,因此可以存在具有不同复杂度的多个OMs。实现的OM所需的精度级别由特定计算的PO的总权重确定。较短的任务会导致较高的PO权重,如果我们试图预测并行化效率(PE),则反过来需要更精确的OM


6.2平行进度表(PS)

并行调度是并行计算的二维表示,其中x轴表示时间,y轴表示并行工作者池。工人的数量和总的计算时间标志着矩形的延伸,在矩形中绘制较小的矩形。这些较小的矩形表示原子工作单元(taskel)。

在下面,您可以看到用密集场景的池块大小算法的DM中的数据绘制的PS的可视化效果。

figure5

  • x轴被分割成相等的时间单位,其中每个单位代表taskel所需的计算时间。
  • y轴分为池使用的工作进程数。
  • 此处的taskel显示为最小的青色矩形,放入匿名工作进程的时间线(时间表)中。
  • 任务是工作时间线中的一个或多个任务,它们以相同的色调连续亮显。
  • 空转时间单位用红色的瓷砖表示。
  • 并行调度被分成几个部分。最后一段是尾部。

组成部分的名称如下图所示。

figure6

在一个完整的PM中,包括一个OM,空闲共享不限于尾部,还包括任务之间甚至任务之间的空间。


6.3效率

Note:

Since earlier versions of this answer, "Parallelization Efficiency (PE)" has been renamed to "Distribution Efficiency (DE)". PE now refers to overhead-including efficiency.

上面介绍的模型可以量化工人利用率。我们可以区分:

  • 分配效率(DE)-借助于DM计算(或针对密集场景的简化方法)。
  • 并行化效率(PE)-通过校准的PM(预测)计算,或根据实际计算的元数据计算。

需要注意的是,对于给定的并行化问题,计算效率不会自动与更快的总体计算相关。在此上下文中,工作线程利用率仅区分具有已启动但尚未完成的任务的工作线程和不具有此类“打开”任务的工作线程。这就是说,任务的时间跨度在期间可能出现的空闲而不是注册的。

所有上述效率基本上都是通过计算除法的商来获得的。DE体育伴随着忙碌的分享 占用整个并行调度的一小部分,用于开销扩展PM

这个答案将进一步讨论一个简单的方法来计算密集场景的DE。这足以比较不同的块大小算法,因为。。。

  1. 。。。DMPM的一部分,随着所使用的块大小算法的不同而变化。
  2. 。。。每个taskel的计算持续时间相等的密集场景描述了一种“稳定状态”,这些时间跨度从方程中消失。任何其他场景都会导致随机结果,因为任务的顺序很重要。

6.3.1绝对分配效率(ADE)

这种基本效率通常可以通过将繁忙份额除以并行调度的全部潜力来计算:

Absolute Distribution Efficiency (ADE) = Busy Share / Parallel Schedule

对于密集场景,简化的计算代码如下:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

如果没有空闲共享,则繁忙共享将等于并行调度,因此我们得到100%的ADE。在我们的简化模型中,这是一个场景,在处理所有任务所需的整个时间内,所有可用的进程都将处于忙碌状态。换句话说,整个工作可以有效地并行化到100%。

但是,为什么我在这里一直把PE称为绝对PE

为了理解这一点,我们必须考虑一个可能的chunksize(cs)情况,以确保最大的调度灵活性(同时,也可以有高地人数)。巧合?)以下内容:

___________________________________~ ONE ~___________________________________

例如,如果我们有四个工作进程和37个任务,那么即使使用chunksize=1也会有空闲的工作进程,因为n_workers=4不是37的除数。除以37/4的余数是1。剩下的这一个任务将由一个单独的工人处理,而剩下的三个则处于空闲状态。

同样,还有一个空闲的工人,有39个任务,你可以看到下面的图片。

figure7

当您比较chunksize=1的上部并行计划chunksize=3的下部版本时,您会注意到上部并行计划较小,x轴上的时间线较短。现在应该很明显,块大小意外地变大也会导致整体计算时间的增加,即使对于密集场景也是如此。

But why not just use the length of the x-axis for efficiency calculations?

因为此模型中不包含开销。这两种块大小都不同,因此x轴并不是真正直接可比的。开销仍然会导致更长的总计算时间,如下图中案例2所示。

figure8


6.3.2相对分配效率(RDE)

如果chunksize设置为1时,taskels的更好的分布是可能的,则ADE值不包含该信息。更好的仍然意味着空闲份额更小。

为了得到一个为可能的最大值而调整的DE值,我们必须将考虑的ADE除以为chunksize=1而得到的ADE

Relative Distribution Efficiency (RDE) = ADE_cs_x / ADE_cs_1

下面是代码中的外观:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE,这里如何定义,本质上是一个关于并行调度尾部的故事。RDE受尾部包含的最大有效块大小的影响。(此尾部可以是x轴长度chunksizelast_chunk。) 其结果是,RDE自然收敛到100%(甚至)对于各种“尾部外观”,如下图。

figure9

A low RDE ...

  • is a strong hint for optimization potential.
  • naturally gets less likely for longer iterables, because the relative tail-portion of the overall Parallel Schedule shrinks.

找到这个答案的第二部分。

相关问题 更多 >