Joblib并行多cpu比sing慢

2024-04-24 17:28:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我刚刚开始使用joblib模块,并试图了解并行函数是如何工作的。下面是一个并行化导致更长运行时间的例子,但我不明白为什么。我在一个cpu上的运行时间是51秒,而在两个cpu上是217秒。

我的假设是并行运行循环会将列表a和b复制到每个处理器。然后将item_n分派到一个cpu,item_n+1分派到另一个cpu,执行该函数,然后将结果写回列表(按顺序)。然后抓住接下来的两个项目,以此类推。很明显我漏掉了一些东西。

这是一个糟糕的例子还是joblib的用法?我只是把代码结构错了吗?

下面是一个例子:

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed

## Create pairs of points for line segments
a = zip(np.random.rand(5000,2),np.random.rand(5000,2))

b = zip(np.random.rand(300,2),np.random.rand(300,2))

## Check if one line segment contains another. 
def check_paths(path, paths):
    for other_path in paths:
        res='no cross'
        chck = Path(other_path)
        if chck.contains_path(path)==1:
            res= 'cross'
            break
    return res

res = Parallel(n_jobs=2) (delayed(check_paths) (Path(points), a) for points in b)

Tags: path函数importfornp时间resrandom
2条回答

除了上面的答案,还有两个方面可以供以后参考,joblib最近的发展对这两个方面都有帮助。

并行池创建开销:这里的问题是创建并行池的成本很高。这里的代价尤其高昂,因为在创建并行对象时,每个作业中都会运行不受“main保护的代码。在最新的joblib(仍然是beta版)中,Parallel可以用作context manager来限制创建池的时间,从而限制此开销的影响。

调度开销: 重要的是要记住,分派for循环的一个项有一个开销(比在没有并行的情况下迭代for循环大得多)。因此,如果这些单独的计算项非常快,则此开销将支配计算。在最新的joblib中,joblib将跟踪每个作业的执行时间,如果执行速度非常快,则开始对它们进行分组。在大多数情况下,这强烈限制了调度开销的影响(请参阅PR中的bench和讨论)。


免责声明:我是joblib的原始作者(只是提醒大家注意我的答案中可能存在的利益冲突,尽管在这里我认为这无关紧要)。

简而言之:我无法再现你的问题。如果您在Windows上,则应该为主循环使用保护程序:documentation of ^{}。我看到的唯一的问题是大量的数据复制开销,但是你的数字似乎不太现实。

很长时间内,以下是我对您代码的计时:

在i7 3770k(4核,8线程)上,对于不同的n_jobs,我得到了以下结果:

For-loop: Finished in 33.8521318436 sec
n_jobs=1: Finished in 33.5527760983 sec
n_jobs=2: Finished in 18.9543449879 sec
n_jobs=3: Finished in 13.4856410027 sec
n_jobs=4: Finished in 15.0832719803 sec
n_jobs=5: Finished in 14.7227740288 sec
n_jobs=6: Finished in 15.6106669903 sec

因此,使用多个进程有好处。然而,虽然我有四个核心,增益已经饱和在三个进程。所以我想执行时间实际上是受内存访问而不是处理器时间的限制。

您应该注意到,每个循环条目的参数都被复制到执行它的进程中。这意味着您为b中的每个元素复制a。这是无效的。因此,请访问全局a。(Parallel将分叉进程,将所有全局变量复制到新生成的进程,因此可以访问a)。这给了我以下代码(使用定时和主循环保护作为joblib建议的文档):

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed
import time
import sys

## Check if one line segment contains another. 

def check_paths(path):
    for other_path in a:
        res='no cross'
        chck = Path(other_path)
        if chck.contains_path(path)==1:
            res= 'cross'
            break
    return res

if __name__ == '__main__':
    ## Create pairs of points for line segments
    a = zip(np.random.rand(5000,2),np.random.rand(5000,2))
    b = zip(np.random.rand(300,2),np.random.rand(300,2))

    now = time.time()
    if len(sys.argv) >= 2:
        res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b)
    else:
        res = [check_paths(Path(points)) for points in b]
    print "Finished in", time.time()-now , "sec"

计时结果:

 n_jobs=1: Finished in 34.2845709324 sec
 n_jobs=2: Finished in 16.6254048347 sec
 n_jobs=3: Finished in 11.219119072 sec
 n_jobs=4: Finished in 8.61683392525 sec
 n_jobs=5: Finished in 8.51907801628 sec
 n_jobs=6: Finished in 8.21842098236 sec
 n_jobs=7: Finished in 8.21816396713 sec
 n_jobs=8: Finished in 7.81841087341 sec

饱和度现在稍微移动到n_jobs=4,这是预期的值。

check_paths做了几个很容易消除的冗余计算。首先,对于other_paths=a中的所有元素,在每个调用中都执行Path(...)行。预先计算一下。其次,字符串res='no cross'是每次循环的循环,尽管它可能只更改一次(随后是中断和返回)。把线移到环的前面。代码如下所示:

import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed
import time
import sys

## Check if one line segment contains another. 

def check_paths(path):
    #global a
    #print(path, a[:10])
    res='no cross'
    for other_path in a:
        if other_path.contains_path(path)==1:
            res= 'cross'
            break
    return res

if __name__ == '__main__':
    ## Create pairs of points for line segments
    a = zip(np.random.rand(5000,2),np.random.rand(5000,2))
    a = [Path(x) for x in a]

    b = zip(np.random.rand(300,2),np.random.rand(300,2))

    now = time.time()
    if len(sys.argv) >= 2:
        res = Parallel(n_jobs=int(sys.argv[1])) (delayed(check_paths) (Path(points)) for points in b)
    else:
        res = [check_paths(Path(points)) for points in b]
    print "Finished in", time.time()-now , "sec"

有时间安排:

n_jobs=1: Finished in 5.33742594719 sec
n_jobs=2: Finished in 2.70858597755 sec
n_jobs=3: Finished in 1.80810618401 sec
n_jobs=4: Finished in 1.40814709663 sec
n_jobs=5: Finished in 1.50854086876 sec
n_jobs=6: Finished in 1.50901818275 sec
n_jobs=7: Finished in 1.51030707359 sec
n_jobs=8: Finished in 1.51062297821 sec

代码上的一个侧节点,虽然我没有真正遵循它的目的,因为这与您的问题无关,contains_path将只返回Trueif this path completely contains the given path.(请参见documentation)。因此,给定随机输入,函数基本上总是返回no cross

相关问题 更多 >