使用Python进行多进程寻找最大值

0 投票
2 回答
2652 浏览
提问于 2025-04-18 05:37

我正在使用Python 2.7.5和OpenCV库。我有一张测试图片,我想在一堆图片中找到和它最相似的那一张。我写了一个函数,利用OpenCV来计算相似点的总数。相似点越多,说明图片越相似。不过,这个函数运行起来比较慢,所以我想把代码改成并行执行,这样可以加快速度。

#img is the image that I am trying to find the most number of similar pointswith
maxSimilarPts = 0;

#testImages is a list of testImages
for testImage in testImages:
    #getNumSimilarPts returns the number of similar points between two images
    similarPts = getNumSimilarPts(img, testImage) 

    if similarPts > maxSimilarPts:
        maxSimilarPts = similarPts

请问我该如何在Python中实现并行处理呢?任何帮助都非常感谢。

2 个回答

0

重要提示:

这段代码只能在python3上直接运行。如果你想在python2上运行,就必须安装concurrent.futures这个库

from concurrent.futures import ProcessPoolExecutor


def multiprocess_max(iterable, key):
    with ProcessPoolExecutor() as executor:
        return max(executor.map(lambda item: (item, key(item)), iterable),
                   key=lambda item: item[1])[0]

这个方法的核心思想是:

计算比较项目的关键值是一个耗时的过程。那么,为什么不通过多个进程来计算关键值,而只用一个进程来进行比较呢?

具体是这样操作的:

首先,创建一个concurrent.futures.ProcessPoolExecutor,这个东西是对multiprocessing模块的一个简单封装,提供了一个类似内置的map()函数,但它可以同时处理多个任务。

接着,从集合中为每个项目创建一个包含两个元素的元组:第一个是原始项目(如果它的关键值是最大的,我们就返回这个),第二个是用传入的key函数计算出来的关键值。

当我们得到了结果后,就把它传给内置的max()函数,但这里有个问题:现在我们的集合是元组的集合!所以,我们需要传入一个key函数,它返回第二个元素,也就是计算出来的关键值。

最后,由于max()返回的是整个项目(包括我们不想要的关键值),我们从结果中提取出第一个元素,也就是原始项目,然后返回它。

编辑:

在我的控制台(IDLE)中运行这段代码时,我发现这个问题(我也是因为需要这个才找到的),我一度以为我的解决方案是错的 :-)

但其实我错了,不是解决方案的问题。这个解决方案在解释器中是无法工作的。根据文档的说明:

__main__模块必须能够被工作子进程导入。这意味着ProcessPoolExecutor在交互式解释器中无法工作。

0

下面是一个(未经测试的)并行版本的原始代码。它同时运行5个工作者。每个工作者从输入队列中取出一张图片,计算相似度,然后把相似度值和图片放到输出队列中。当所有工作者都完成任务,且没有更多图片时,父进程会打印出最相似图片的(相似度,图片ID)。

# adapted from Raymond Hettinger
# http://stackoverflow.com/questions/11920490/how-do-i-run-os-walk-in-parallel-in-python/23779787#23779787

from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os, sys


def parallel_worker():
    while True:
        testImage = imageq.get()
        similarPts = getNumSimilarPts(img, testImage) 
        similarq.put( [similarPts, testImage] )
        imageq.task_done()

similarq = Queue()
imageq = Queue()
for testImage in testImages:
    imageq.put(testImage)

pool = Pool(5)
for i in range(5):
    pool.apply_async(parallel_worker)

imageq.join()
print 'Done'

print max(similarq)

撰写回答