<p><strong>概述:</strong></p>
<p>我的答案有两部分:<br/></p>
<ul>
<li>第1部分展示了如何从@niemmi的<code>ProcessPoolExecutor.map()</code>解决方案中获得更高的速度</li>
<li>第2部分显示了<code>ProcessPoolExecutor</code>的子类<code>.submit()</code>和<code>.map()</code>产生非等效计算时间。</li>
</ul>
<p><strong>================================================================================================================================</p>
<p><strong>第1部分:加快ProcessPoolExecutor.map()的速度</p>
<p><strong>背景:</strong>
本节以@niemmi的<code>.map()</code>解决方案为基础,该解决方案本身就非常优秀。在研究他的离散化方案以更好地理解它如何与.map()chunksizes参数交互时,我发现了这个有趣的解决方案。</p>
<p>我认为@niemi对<code>chunk = nmax // workers</code>的定义是一个chunksize的定义,即每个工人池中的每个工人要处理的实际数字范围(给定任务)的较小值。现在,这个定义的前提是假设一台计算机有x个工人,在每个工人之间平均分配任务将使每个工人得到最佳利用,因此总任务将以最快的速度完成。因此,要将给定任务分解为的块的数量应始终等于池工作者的数量。然而,这个假设是正确的吗?</p>
<p><strong>命题:</strong>这里,我建议,当与^{<cd1>一起使用时,上述假设并不总是导致最快的计算时间。相反,<strong>将任务离散化到大于池工作人员数量的程度可以加快速度,即更快地完成给定任务。</p>
<p><strong>实验:</strong>我修改了@niemmi的代码,以允许离散化任务的数量超过池工作者的数量。此代码如下所示,用于确定数字5在0到1E8的数字范围内出现的次数。我已经使用1、2、4和6个池工作线程执行了这段代码,并针对离散化任务数与池工作线程数的不同比率执行了这段代码。对于每个场景,进行3次运行,并将计算时间制成表格。”<em>这里的Speed-up</em>”定义为在离散化任务数大于池工作数时的平均计算时间内,使用相等数量的块和池工作数的平均计算时间。</p>
<p><strong>发现:</strong></p>
<p><a href="https://i.stack.imgur.com/hUR26.png" rel="noreferrer"><img src="https://i.stack.imgur.com/hUR26.png" alt="nchunk over nworkers"/></a></p>
<ol>
<li><p>左边的图显示了实验部分中提到的所有场景所花费的计算时间。结果表明,<em>块数/工人数=1</em>的计算时间总是大于<em>块数>;工人数</em>的计算时间,即前者的效率总是低于后者。</p></li>
<li><p>右图显示,当<em>块数/工人数</em>达到14个或更多的阈值时,<strong>获得1.2倍或更多的加速。有趣的是,观察到当用一个工人执行<code>ProcessPoolExecutor.map()</code>时,也会出现加速趋势。</p></li>
</ol>
<p><strong>结论:</strong>自定义ProcessPoolExecutor.map()用于解决给定任务的离散任务数时,应谨慎确保此数大于池工作线程数,因为此做法缩短了计算时间。</p>
<p><strong>concurrent.futures.ProcessPoolExecutor.map()代码。(仅限修订部分)</strong></p>
<pre><code>def _concurrent_map(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop,
itertools.repeat(number))
# 2.2. Consolidate result as a list and return this list.
for future in futures:
#print('type(future)=',type(future))
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('\n within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 4 # Pool of workers
chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
a = _concurrent_map(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
</code></pre>
<p><strong>================================================================================================================================</p>
<p><strong>第2部分:当返回排序/排序的结果列表时,使用ProcessPoolExecutor子类.submit()和.map()的总计算时间可能不同。</strong></p>
<p><strong>背景:</strong>我修改了<code>.submit()</code>和<code>.map()</code>两个代码,以便对它们的计算时间和可视化主代码的计算时间进行“苹果对苹果”比较e计算主代码调用的并发方法执行并发操作的时间,以及由并发方法调用的每个离散化任务/工作线程的计算时间。此外,这些代码中的concurrent方法被构造成直接从<code>.submit()</code>的未来对象和<code>.map()</code>的迭代器返回结果的无序有序列表。下面提供了源代码(<em>希望对您有所帮助。</em>)。</p>
<p><strong>实验</strong>这两个新改进的代码用于执行第1部分中描述的相同实验,只考虑了6个池工作线程,并使用python内置的<code>list</code>和<code>sorted</code>方法分别将结果的无序和有序列表返回到代码的主要部分。</p>
<p><strong>发现:</strong>
<a href="https://i.stack.imgur.com/Jvarr.png" rel="noreferrer"><img src="https://i.stack.imgur.com/Jvarr.png" alt=".submit vs .map plus list vs sorted"/></a></p>
<ol>
<li>从concurrent方法的结果中,我们可以看到用于创建<code>ProcessPoolExecutor.submit()</code>的所有未来对象和创建<code>ProcessPoolExecutor.map()</code>的迭代器(作为池工作线程数上离散化任务数的函数)的计算时间是等价的。这个结果仅仅意味着<code>ProcessPoolExecutor</code>子类<code>.submit()</code>和<code>.map()</code>是同样有效/快速的。</li>
<li>比较main和它的并发方法的计算时间,可以看出main比它的并发方法运行的时间长。这是意料之中的,因为它们的时差反映了<code>list</code>和<code>sorted</code>方法(以及这些方法中包含的其他方法)的计算次数。显然,与<code>sorted</code>方法相比,<code>list</code>方法返回结果列表所需的计算时间更少。.submit()和.map()代码的<code>list</code>方法的平均计算时间相似,大约为0.47秒。.submit()和.map()代码的排序方法的平均计算时间分别为1.23秒和1.01秒。换句话说,对于.submit()和.map()代码,<code>list</code>方法的执行速度分别比<code>sorted</code>方法快2.62倍和2.15倍。</li>
<li>不清楚为什么<code>sorted</code>方法从
<code>.map()</code>比<code>.submit()</code>快,因为离散化的数量
任务增加的数量超过池工作人员的数量,除非
离散化任务数等于池工作者数。
也就是说,这些发现表明,使用同样快的<code>.submit()</code>或<code>.map()</code>子类的决定可能会受到排序方法的阻碍。例如,如果目的是在尽可能短的时间内生成一个有序列表,那么应该优先使用ProcessPoolExecutor.map(),而不是<code>ProcessPoolExecutor.submit()</code>,因为<code>.map()</code>可以允许最短的总计算时间。</li>
<li>我的答案第1部分中提到的离散化方案在这里显示,以加快<code>.submit()</code>和<code>.map()</code>子类的性能。当离散化任务的数量等于池工作人员的数量时,加速的数量可以高达20%。</li>
</ol>
<p><strong>改进的.map()代码</p>
<pre><code>#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from itertools import repeat, chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurence of number in range nmin to nmax and return
the found occurences in a list.'''
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
#print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
# format(nmin, nmax, number, len(match),end))
return match
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop, repeat(number))
end = time() - start
print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(futures)) #Return an unordered result list
#return sorted(chain.from_iterable(futures)) #Return an ordered result list
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
found = _concurrent(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
#print('found = ', found)
print("found {0} in {1:.4f}sec".format(len(found),end))
</code></pre>
<p><strong>改进的.submit()代码。</strong><br/>
此代码与.map代码相同,只是您用以下内容替换了_concurrent方法:</p>
<pre><code>def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(num_of_chunks):
cstart = chunksize * i
cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
end = time() - start
print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(f.result() for f in cf.as_completed(
futures))) #Return an unordered list
#return list(chain.from_iterable(f.result() for f in cf.as_completed(
# futures))) #Return an ordered list
</code></pre>
<p><strong>================================================================================================================================</p>