使用Python进行多进程寻找最大值
我正在使用Python 2.7.5和OpenCV库。我有一张测试图片,我想在一堆图片中找到和它最相似的那一张。我写了一个函数,利用OpenCV来计算相似点的总数。相似点越多,说明图片越相似。不过,这个函数运行起来比较慢,所以我想把代码改成并行执行,这样可以加快速度。
#img is the image that I am trying to find the most number of similar pointswith
maxSimilarPts = 0;
#testImages is a list of testImages
for testImage in testImages:
#getNumSimilarPts returns the number of similar points between two images
similarPts = getNumSimilarPts(img, testImage)
if similarPts > maxSimilarPts:
maxSimilarPts = similarPts
请问我该如何在Python中实现并行处理呢?任何帮助都非常感谢。
2 个回答
重要提示:
这段代码只能在python3上直接运行。如果你想在python2上运行,就必须安装concurrent.futures这个库。
from concurrent.futures import ProcessPoolExecutor
def multiprocess_max(iterable, key):
with ProcessPoolExecutor() as executor:
return max(executor.map(lambda item: (item, key(item)), iterable),
key=lambda item: item[1])[0]
这个方法的核心思想是:
计算比较项目的关键值是一个耗时的过程。那么,为什么不通过多个进程来计算关键值,而只用一个进程来进行比较呢?
具体是这样操作的:
首先,创建一个concurrent.futures.ProcessPoolExecutor
,这个东西是对multiprocessing
模块的一个简单封装,提供了一个类似内置的map()
函数,但它可以同时处理多个任务。
接着,从集合中为每个项目创建一个包含两个元素的元组:第一个是原始项目(如果它的关键值是最大的,我们就返回这个),第二个是用传入的key
函数计算出来的关键值。
当我们得到了结果后,就把它传给内置的max()
函数,但这里有个问题:现在我们的集合是元组的集合!所以,我们需要传入一个key
函数,它返回第二个元素,也就是计算出来的关键值。
最后,由于max()
返回的是整个项目(包括我们不想要的关键值),我们从结果中提取出第一个元素,也就是原始项目,然后返回它。
编辑:
在我的控制台(IDLE)中运行这段代码时,我发现这个问题(我也是因为需要这个才找到的),我一度以为我的解决方案是错的 :-)
但其实我错了,不是解决方案的问题。这个解决方案在解释器中是无法工作的。根据文档的说明:
__main__
模块必须能够被工作子进程导入。这意味着ProcessPoolExecutor
在交互式解释器中无法工作。
下面是一个(未经测试的)并行版本的原始代码。它同时运行5个工作者。每个工作者从输入队列中取出一张图片,计算相似度,然后把相似度值和图片放到输出队列中。当所有工作者都完成任务,且没有更多图片时,父进程会打印出最相似图片的(相似度,图片ID)。
# adapted from Raymond Hettinger
# http://stackoverflow.com/questions/11920490/how-do-i-run-os-walk-in-parallel-in-python/23779787#23779787
from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os, sys
def parallel_worker():
while True:
testImage = imageq.get()
similarPts = getNumSimilarPts(img, testImage)
similarq.put( [similarPts, testImage] )
imageq.task_done()
similarq = Queue()
imageq = Queue()
for testImage in testImages:
imageq.put(testImage)
pool = Pool(5)
for i in range(5):
pool.apply_async(parallel_worker)
imageq.join()
print 'Done'
print max(similarq)