python concurrent.futures.ProcessPoolExecutor:submit()对map()的性能

2024-05-15 13:13:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用concurrent.futures.ProcessPoolExecutor从数字范围中查找数字的出现。其目的是研究从并发中获得的加速性能的数量。为了测试性能,我有一个控件-一个串行代码来执行上述任务(如下所示)。我已经编写了两个并发代码,一个使用concurrent.futures.ProcessPoolExecutor.submit(),另一个使用concurrent.futures.ProcessPoolExecutor.map()来执行相同的任务。它们如下所示。关于起草前者和后者的建议分别见herehere

向这三个代码发出的任务是查找数字5在0到1E8的数字范围内出现的次数。将.submit().map()分配给6名工人,并且.map()的块大小为10000。在并发代码中,工作负载离散化的方式是相同的。但是,用于在两个代码中查找匹配项的函数是不同的。这是因为参数传递给.submit().map()调用的函数的方式不同。

所有3个代码的报告次数相同,即56953279次。然而,完成这项任务所花的时间却大不相同。.submit()的执行速度是对照组的2倍,而{}完成任务的时间是对照组的2倍。

问题:

  1. 我想知道.map()的慢性能是我编写的一个工件,还是它本身就是慢的?”如果是前者,我该如何改进。我只是很惊讶,它的表现慢于控制,因为没有太多的动机来使用它。
  2. 我想知道是否还有什么可以让.submit()代码执行得更快。我的条件是函数_concurrent_submit()必须返回一个iterable,其中包含数字5的数字/出现次数。

基准结果
benchmark results

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

序列号:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

2017年2月13日更新:

除了@niemmi answer,我还提供了一个答案,下面是一些个人研究:

  1. 如何进一步加速@niemmi的.map().submit()解决方案,以及
  2. ProcessPoolExecutor.map()可以导致比ProcessPoolExecutor.submit()更快的速度时。

Tags: innumbermaptimerangestartconcurrentend
2条回答

你在拿苹果和桔子作比较。当使用map时,您将生成所有的1E8数字并将它们传输到工作进程。与实际执行相比,这需要很多时间。当使用submit时,只需创建6组被传输的参数。

如果您将map更改为使用相同的原理操作,您将得到彼此接近的数字:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

正确使用^{}可以提高submit的性能。对于给定的iterable of futures,它将返回一个迭代器,该迭代器将yieldfutures按照它们完成的顺序。

您还可以跳过将数据复制到另一个数组,并使用^{}将来自未来的结果组合到单个iterable:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))

概述:

我的答案有两部分:

  • 第1部分展示了如何从@niemmi的ProcessPoolExecutor.map()解决方案中获得更高的速度
  • 第2部分显示了ProcessPoolExecutor的子类.submit().map()产生非等效计算时间。

================================================================================================================================

第1部分:加快ProcessPoolExecutor.map()的速度

背景: 本节以@niemmi的.map()解决方案为基础,该解决方案本身就非常优秀。在研究他的离散化方案以更好地理解它如何与.map()chunksizes参数交互时,我发现了这个有趣的解决方案。

我认为@niemi对chunk = nmax // workers的定义是一个chunksize的定义,即每个工人池中的每个工人要处理的实际数字范围(给定任务)的较小值。现在,这个定义的前提是假设一台计算机有x个工人,在每个工人之间平均分配任务将使每个工人得到最佳利用,因此总任务将以最快的速度完成。因此,要将给定任务分解为的块的数量应始终等于池工作者的数量。然而,这个假设是正确的吗?

命题:这里,我建议,当与^{一起使用时,上述假设并不总是导致最快的计算时间。相反,将任务离散化到大于池工作人员数量的程度可以加快速度,即更快地完成给定任务。

实验:我修改了@niemmi的代码,以允许离散化任务的数量超过池工作者的数量。此代码如下所示,用于确定数字5在0到1E8的数字范围内出现的次数。我已经使用1、2、4和6个池工作线程执行了这段代码,并针对离散化任务数与池工作线程数的不同比率执行了这段代码。对于每个场景,进行3次运行,并将计算时间制成表格。”这里的Speed-up”定义为在离散化任务数大于池工作数时的平均计算时间内,使用相等数量的块和池工作数的平均计算时间。

发现:

nchunk over nworkers

  1. 左边的图显示了实验部分中提到的所有场景所花费的计算时间。结果表明,块数/工人数=1的计算时间总是大于块数>;工人数的计算时间,即前者的效率总是低于后者。

  2. 右图显示,当块数/工人数达到14个或更多的阈值时,获得1.2倍或更多的加速。有趣的是,观察到当用一个工人执行ProcessPoolExecutor.map()时,也会出现加速趋势。

结论:自定义ProcessPoolExecutor.map()用于解决给定任务的离散任务数时,应谨慎确保此数大于池工作线程数,因为此做法缩短了计算时间。

concurrent.futures.ProcessPoolExecutor.map()代码。(仅限修订部分)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

================================================================================================================================

第2部分:当返回排序/排序的结果列表时,使用ProcessPoolExecutor子类.submit()和.map()的总计算时间可能不同。

背景:我修改了.submit().map()两个代码,以便对它们的计算时间和可视化主代码的计算时间进行“苹果对苹果”比较e计算主代码调用的并发方法执行并发操作的时间,以及由并发方法调用的每个离散化任务/工作线程的计算时间。此外,这些代码中的concurrent方法被构造成直接从.submit()的未来对象和.map()的迭代器返回结果的无序有序列表。下面提供了源代码(希望对您有所帮助。)。

实验这两个新改进的代码用于执行第1部分中描述的相同实验,只考虑了6个池工作线程,并使用python内置的listsorted方法分别将结果的无序和有序列表返回到代码的主要部分。

发现:.submit vs .map plus list vs sorted

  1. 从concurrent方法的结果中,我们可以看到用于创建ProcessPoolExecutor.submit()的所有未来对象和创建ProcessPoolExecutor.map()的迭代器(作为池工作线程数上离散化任务数的函数)的计算时间是等价的。这个结果仅仅意味着ProcessPoolExecutor子类.submit().map()是同样有效/快速的。
  2. 比较main和它的并发方法的计算时间,可以看出main比它的并发方法运行的时间长。这是意料之中的,因为它们的时差反映了listsorted方法(以及这些方法中包含的其他方法)的计算次数。显然,与sorted方法相比,list方法返回结果列表所需的计算时间更少。.submit()和.map()代码的list方法的平均计算时间相似,大约为0.47秒。.submit()和.map()代码的排序方法的平均计算时间分别为1.23秒和1.01秒。换句话说,对于.submit()和.map()代码,list方法的执行速度分别比sorted方法快2.62倍和2.15倍。
  3. 不清楚为什么sorted方法从 .map().submit()快,因为离散化的数量 任务增加的数量超过池工作人员的数量,除非 离散化任务数等于池工作者数。 也就是说,这些发现表明,使用同样快的.submit().map()子类的决定可能会受到排序方法的阻碍。例如,如果目的是在尽可能短的时间内生成一个有序列表,那么应该优先使用ProcessPoolExecutor.map(),而不是ProcessPoolExecutor.submit(),因为.map()可以允许最短的总计算时间。
  4. 我的答案第1部分中提到的离散化方案在这里显示,以加快.submit().map()子类的性能。当离散化任务的数量等于池工作人员的数量时,加速的数量可以高达20%。

改进的.map()代码

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    

改进的.submit()代码。
此代码与.map代码相同,只是您用以下内容替换了_concurrent方法:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

================================================================================================================================

相关问题 更多 >