图像块的多处理

2024-04-16 09:15:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个函数,它必须遍历图像的各个像素并计算一些几何图形。这个函数需要很长时间才能运行(在2400万像素的图像上大约需要5个小时),但它似乎很容易在多个内核上并行运行。然而,我一辈子都找不到一个有良好文档记录、解释良好的示例来使用多处理软件包执行类似的操作。下面是我现在运行的代码作为一个玩具示例:

import numpy as np
import matplotlib.pyplot as plt
from scipy import misc
from skimage import color
import multiprocessing 
from multiprocessing import Process

#Some dumb stand in function for this exercise
def dumb_func(image):
    ny, nx = image.shape
    temp = np.empty_like(image)

    for y in range(ny):
        for x in range(nx):
            temp[y, x] = np.square(image[y, x])

    return temp

#Convert image to greyscale
img = color.rgb2gray(misc.ascent())

#Resize the image
ns = 2048 #Pixel size
img = misc.imresize(img, size = (ns, ns))


#Split the image into equal chunks...not sure how this works for arrays that
#are weird shapes and aren't the same size in each dimension

divs = 4
init_split = np.array_split(img, divs, axis = 0)
side = init_split[0].shape[0]
chunked = np.empty((divs, divs, side, side))
cur = 0
for i in range(divs):
    split = np.array_split(init_split[i], divs, axis = 1)
    for j in range(divs):
        chunked[i, j, :, :] = split[j]
        cur +=1

#Pull core count and divide by two to be safe
cores = int(multiprocessing.cpu_count() / 2)

result = np.empty_like(chunked)
idxs = np.array(np.meshgrid(np.arange(0, divs, 1), 
                            np.arange(0, divs, 1))).T.reshape(-1, 2)

基本上,这段代码加载到图像中,将其转换为灰度,使其变大,然后将其分块。分块数组的形状是(i,j,ny,nx),其中i和j是标识我正在处理的图像块的索引,ny,nx以像素表示每个块的大小。在

另外,我正在创建一个名为idxs的数组,该数组将所有可能的索引存储到分块数组中,以提取分块图像。在

我要做的是并行地在块上运行一个函数(在本例中,以dumb_func为例),并将结果存储在相同形状的results数组中。我想象的方法是在idxs数组上循环,并将属于这些索引的块分配给进程,直到核心数,等待这些核心完成,然后向核心提供更多的进程,直到完成。我陷入了困境,因为我无法A)弄清楚如何访问函数中的返回值,以及B)如何处理可能有16个块和5个核心的情况,导致最后一次迭代只需要一个进程。在

我该怎么做呢?我花了6-7个小时阅读了关于多处理池,进程,地图,星图等等。。。我一辈子都不明白如何实现这一点。在

编辑编辑:

这是我更新的代码,运行时没有错误。但是,新的数据数组永远不会更新。我用一个值100填充它,在例程的末尾,new_data就是如何初始化的。在

^{pr2}$

Tags: 函数in图像imageimportimgfornp
2条回答

我会这样做,从依赖关系开始:

from multiprocessing import Pool
import numpy as np
from PIL import Image

# and some for testing
from random import random
from time import sleep

首先,我定义了一个将图像分成“块”的函数,就像您所说的那样:

^{pr2}$

它是一个懒惰的迭代器,所以这可以持续一段时间。在

然后定义我的worker函数:

def dumb_func(cc):
    (y0,y1), (x0,x1) = cc
    # convert to floats for ease of processing
    chunk = image[y0:y1,x0:x1] / 255.
    # random slow down for testing
    # sleep(random() ** 6)
    res = chunk ** 2
    # convert back to bytes for efficiency
    return cc, (res * 255).astype(np.uint8)

为了提高效率,我确保源数组尽可能地接近原始格式,并以相同的格式发送回去(如果显然要处理其他像素格式,这可能需要一些修改)。在

然后我把它放在一起:

if __name__ == '__main__':
    source = Image.open('tmp.jpeg')
    image = np.asarray(source)
    print("loaded", image.shape, image.dtype)

    with Pool() as pool:
        resit = pool.imap_unordered(
            dumb_func, chunkit(*image.shape[:2]))

        output = np.empty_like(image)
        for cc, res in resit:
            (y0,y1), (x0,x1) = cc
            output[y0:y1,x0:x1] = res

    im = Image.fromarray(output, 'RGB')
    im.save('out.jpeg')

这将在几秒钟内处理一个15像素的图像,其中大部分都花在加载/保存图像上。它可能在数组步进和缓存友好性方面更聪明,但希望能有所帮助!在

注意:我认为这段代码依赖于cpythonunix风格的进程分叉语义,以确保在进程之间有效地共享映像。不知道如果你把它放在别的东西上会发生什么

我一直在为基本相同的事情编写代码。现在的目标只是用透明的像素替换白色像素,但是它似乎替换了整个图像,所以在某个地方出现了一个bug……但是它在multiprocessing模块中不再出现错误,所以也许它可以作为一个示例,说明如何加载Queue,然后让你的工作进程处理它!在

from PIL import Image
from multiprocessing import Process, JoinableQueue
from threading import Thread
from time import time

def worker_function(q, new_data):
    while True:
        # print("Items in queue: {}".format(q.qsize()))
        index, pixel = q.get()
        if pixel[0] > 240 and pixel[1] > 240 and pixel[2] > 240:
            out_pixel = (0, 0, 0, 0)
        else:
            out_pixel = pixel
        new_data[index] = out_pixel
        q.task_done()

if __name__ == "__main__":
    start = time()
    q = JoinableQueue()

    my_image = Image.open('InputImage.jpg')
    my_image = my_image.convert('RGBA')
    datas = list(my_image.getdata())
    new_data = [0] * len(datas) # make a blank array the size of our image to fill later

    print('putting image into queue')
    for count, item in enumerate(datas):
        q.put((count, item))

    print('starting workers')
    worker_count = 50
    processes = []
    for i in range(worker_count):
        p = Process(target=worker_function, args=[q, new_data])
        p.daemon = True
        p.start()
    print('main thread waiting')
    q.join()
    my_image.putdata(new_data)
    my_image.save('output.png', "PNG")

    end = time()
    print('{:.3f} seconds elapsed'.format(end - start))

我认为在if __name__ == "__main__"块中“保护”代码是很重要的,否则派生的进程似乎在运行它。在

更新

看起来你需要实现一个Manager()(或者可能还有其他我不知道的方法!)。我把代码改成:

^{pr2}$

虽然这似乎不是最快的选择!我相信还有其他方法可以提高速度。我对Threads执行相同操作的代码看起来非常相似:

from PIL import Image
from threading import Thread
from queue import Queue
import time

start = time.time()
q = Queue()

planeIm = Image.open('InputImage.jpg')
planeIm = planeIm.convert('RGBA')
datas = planeIm.getdata()
new_data = [0] * len(datas)

print('putting image into queue')
for count, item in enumerate(datas):
    q.put((count, item))

def worker_function():
    while True:
        # print("Items in queue: {}".format(q.qsize()))
        index, pixel = q.get()
        if pixel[0] > 240 and pixel[1] > 240 and pixel[2] > 240:
            out_pixel = (0, 0, 0, 0)
        else:
            out_pixel = pixel
        new_data[index] = out_pixel
        q.task_done()

print('starting workers')
worker_count = 100
for i in range(worker_count):
    t = Thread(target=worker_function)
    t.daemon = True
    t.start()
print('main thread waiting')
q.join()
print('Queue has been joined')
planeIm.putdata(new_data)
planeIm.save('output.png', "PNG")

end = time.time()

elapsed = end - start
print('{:3.3} seconds elapsed'.format(elapsed))

但是,处理我的图像需要大约23秒的线程和大约170秒的多处理!!我怀疑这可能是因为启动Process对象所需的更大开销,而且我处理每个像素的算法目前都很简单(只是if pixel[0] > 240 and pixel[1] > 240 and pixel[2] > 240:位),所以我很可能无法获得复杂像素处理算法所能带来的速度提升。还要注意multiprocessing documentation

a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.

这让我相信有更快的选择。在

相关问题 更多 >