使用Python的concurrent.futures并行处理对象

0 投票
1 回答
3011 浏览
提问于 2025-04-19 16:07

我刚开始使用Python 3中的一个库叫做concurrent.futures,目的是对一组图片应用一些处理函数,来处理和调整这些图片的形状。
这些函数包括resize(height, width),用来改变图片的大小,以及opacity(number),用来调整图片的透明度。

另外,我还有一个images()函数,它可以生成类似文件的对象。
所以我尝试了下面这段代码,想要并行处理我的图片:

import concurrent.futures
From mainfile import images
From mainfile import shape


def parallel_image_processing :
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future = executor.submit(images)
    for fileobject in future.result() :
        future1 = executor.submit( shape.resize, fileobject, "65","85")
        future2 = executor.submit( shape.opacity, fileobject, "0.5")

有人能告诉我这样做是否正确吗?

1 个回答

3

我建议让 images 只返回一个路径,而不是一个打开的文件对象:

def images():
    ...
    yield os.path.join(image_dir[0], filename)

然后使用这个:

from functools import partial

def open_and_call(func, filename, args=(), kwargs={}):
    with open(filename, 'rb') as f:
        return func(f, *args, **kwargs)

def parallel_image_processing():
    resize_func = partial(open_and_call, shape.resize, args=("65", "85"))
    opacity_func = partial(open_and_call, shape.opacity, args=("0.5"))
    img_list = list(images())
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures1 = executor.map(resize_func, img_list)
        futures2 = executor.map(opacity_func, img_list)

        concurrent.futures.wait([futures1, futures2])


if __name__ == "__main__":
    # Make sure the entry point to the function that creates the executor 
    # is inside an `if __name__ == "__main__"` guard if you're on Windows.
    parallel_image_processing()

如果你在使用 CPython(与没有全局解释器锁(GIL)的其他实现,比如 Jython 不同),你就不应该使用 ThreadPoolExecutor,因为处理图像需要大量的 CPU 资源;由于 GIL 的限制,在 CPython 中一次只能运行一个线程,所以如果你在这个场景中使用线程,实际上是无法并行处理的。相反,应该使用 ProcessPoolExecutor,它会使用进程而不是线程,从而完全避免 GIL 的问题。注意,这也是我建议不要从 images 返回类似文件的对象的原因——你不能把一个打开的文件句柄传递给工作进程。你必须在工作进程中打开文件。

为了做到这一点,我们让 executor 调用一个小的辅助函数(open_and_call),这个函数会在工作进程中打开文件,然后用正确的参数调用 resize/opacity 函数。

我还使用了 executor.map 而不是 executor.submit,这样我们可以对 images() 返回的每个项目调用 resize/opacity,而不需要显式的 for 循环。我使用 functools.partial 来简化调用需要多个参数的函数,这样在使用 executor.map 时就更方便了(因为它只允许调用单个参数的函数)。

此外,也不需要在执行器中调用 images(),因为你会在继续之前等待它的结果。只需像普通函数一样调用它。我在调用 map 之前,将 images() 返回的生成器对象转换为 list。如果你担心内存使用,可以在每个 map 调用中直接调用 images(),但如果不担心,通常一次调用 images() 并将其存储为列表会更快。

撰写回答