数组创建太慢

2024-06-10 04:12:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试从头创建一个图像数组。 我让代码运行,但运行大约需要30秒。 我觉得使用numpy本机函数可以更快。 我该怎么做

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        image = np.float32(final_image)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        cv2.imshow("ok", image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

如何加快图像阵列的创建速度? 我需要每一个时间步创建一个映像,因为我想模拟每一个新时间步都会出现的实时数据流

这就是为什么我只想优化代码的这一部分:

for xxx in range(image_width):
    final_image[:int(vol_norm[xxx]), xxx, :] = 1

我该怎么做


Tags: imageimportdatatimenpminwidthcv2
1条回答
网友
1楼 · 发布于 2024-06-10 04:12:24

首先是最简单的优化,然后是:

  1. 使用将值与np.arange(...)进行比较,而不是内部循环
  2. 使用灰色图像代替3通道RGB。要处理的数据减少了3倍
  3. 使用np.uint8类型代替np.float32,后者处理速度更快,并且不需要转换为float32进行CV2可视化

所有这些优化都提供了巨大的加速(10x倍),我的运行时间是2.6 sec,而不是以前的27 sec

另外一个我没有做的非常有用的优化是,当当前窗口中的整个数据的最大/最小值没有改变时,您不需要重新计算以前的图像像素。只有在“最大/最小”更改的情况下,才需要重新计算以前的图像数据。我预计,您的真实数据会像外汇或比特币价格一样逐渐变化,因此窗口内的最大/最小变化非常不常见

上面提到的优化1)-3)将在下一个代码中实现:

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = 256
image_channel = 3

show_img = False

def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

aranges = np.arange(image_heigh, dtype = np.int32)[:, None]

for ii in range(len(volumes)-image_width):
    # ===================== part to optimize start
    #final_image = np.zeros((image_heigh, image_width, image_channel), dtype = np.float32)

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    final_image = (aranges < vol_norm[None, :].astype(np.int32)).astype(np.uint8) * 255

    # ===================== part to optimize end

    if show_img:
        cv2.imshow('ok', final_image)
        cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

对于上面的代码,我只是对内环进行了一次优化,它将代码的速度提高了2x倍以上,以获得1.3 sec的计时。但我也放回了3个通道加上float32,这降低了速度,最终2.8 sechere is the code

如果不需要重新计算旧图像数据,则可以进行下一步优化

要优化的主要内容是,在每一步上都要重新计算几乎相同的整个图像,并沿宽度方向移动1个像素。相反,您需要计算整个图像一次,然后右移不是1像素而是整个图像宽度

然后在这个优化之后,运行时间是0.08 sec

并且只为显示动画而进行1像素步进,而不是为了计算图像数据,如果需要速度,图像数据应该只计算一次

import cv2
import numpy as np
import time

volumes = np.random.randint(low=0, high=200, size=10000)
print(volumes)

image_heigh = 128
image_width = volumes.size #256
image_channel = 3
screen_width = 256

show_img = False


def nomralized(data, data_min, data_max, maximum_value):

    nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))

    return nomamized_data

start_time = time.time()

for ii in range(0, len(volumes), image_width):
    # ===================== part to optimize start
    final_image = np.zeros((image_heigh, image_width, image_channel))

    start = ii
    end = ii + image_width

    current_vols = volumes[start:end]

    # nomalize data
    vol_min = 0
    vol_max = np.max(current_vols)

    vol_norm = nomralized(data=current_vols,
                      data_min=vol_min,
                      data_max=vol_max,
                      maximum_value=image_heigh)

    for xxx in range(image_width):
        final_image[:int(vol_norm[xxx]), xxx, :] = 1

    # ===================== part to optimize end

    if show_img:
        for start in range(0, final_image.shape[1] - screen_width):
            image = np.float32(final_image[:, start : start + screen_width])
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            cv2.imshow("ok", image)
            cv2.waitKey(27)

print("total running time: ", (time.time() - start_time))

我还使用您的数据创建了动画图像:

img

如果要创建相同的动画,只需在上面的脚本末尾附加下一段代码:

# Needs: python -m pip install pillow
import PIL.Image
imgs = [PIL.Image.fromarray(final_image[:, start : start + screen_width].astype(np.uint8) * 255) for start in range(0, final_image.shape[1] - screen_width, 6)]
imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)

我还实现了实时实时流数据渲染/可视化的模拟

  1. live_stream()生成器在随机时间点吐出随机数量的数据,这是为了模拟数据生成过程
  2. stream_fetcher()侦听实时流并将接收到的所有数据记录到python队列q0,此获取程序在一个线程中运行
  3. renderer()获取由fetcher记录的数据,并通过数学公式和规范化过程将其渲染为图像,它渲染尽可能多的可用数据,从而生成具有不同宽度的图像,渲染的图像保存到另一个队列q1
  4. visualizer()通过获取尽可能多的渲染图像来可视化渲染数据

所有函数都在单独的线程中运行,不会阻塞整个进程。另外,如果任何线程的工作速度变慢,那么它会跳过一些数据以赶上当前的实时数据,因此每个队列不会溢出

另外,您可能会看到可视化的过程是不稳定的,这不是因为函数有点慢,而是因为实时流在每个时间步中吐出不同数量的数据,这就是实时数据通常的行为方式

在下一段代码中,我还做了前面提到的额外优化,如果min/max没有改变,就不会重新计算图像

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 20)).astype(np.float64).cumsum() + last
        yield a
        last = a[-1]
        time.sleep(random.random() * 0.1)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
                
        vols = np.concatenate(data)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put(whole_image)
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    imgs = []
    
    while True:
        data = []
        data.append(q1.get())
        try:
            while True:
                data.append(q1.get(block = False))
        except queue.Empty:
            pass
        image = np.concatenate(data, axis = 1)[:, -image_width:]
        cv2.imshow('ok', image)
        cv2.waitKey(1)

        if imgs is not None:
            try:
                # Needs: python -m pip install pillow
                import PIL.Image
                has_pil = True
            except:
                has_pil = False
                imgs = None
            if has_pil:
                imgs.append(PIL.Image.fromarray(np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)))

                if len(imgs) >= 1000:
                    print('saving...', flush = True)
                    imgs[0].save('result.png', append_images = imgs[1:], save_all = True, lossless = True, duration = 100)
                    imgs = None
                    print('saved!', flush = True)

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

上面的实时流程模拟被渲染成result.png,我在下面显示:

img

我还决定改进可视化,使用更高级的matplotlib而不是cv2来显示轴并进行实时绘图。可视化图像如下所示:

img

接下来是与上面最后一幅图像相对应的基于matplotlib的代码:

import cv2, numpy as np
import time, random, threading, queue

image_height = 256
image_width = 512
save_nsec = 20
dpi, fps = 100, 15

# Make results reproducible and deterministic
np.random.seed(0)
random.seed(0)

def live_stream():
    last = 0.
    pos = 0
    while True:
        a = np.random.uniform(low = -1., high = 1., size = random.randint(1, 30)).astype(np.float64).cumsum() + last
        yield a, pos, pos + a.size - 1
        pos += a.size
        last = a[-1]
        time.sleep(random.random() * 2.2 / fps)

q0 = queue.Queue()
def stream_fetcher():
    for e in live_stream():
        q0.put(e)

threading.Thread(target = stream_fetcher, daemon = True).start()

aranges = np.arange(image_height, dtype = np.int32)[:, None]

q1 = queue.Queue()
def renderer():
    def normalized(data, data_min, data_max, maximum_value):
        nomamized_data = maximum_value * ((data - data_min) / (data_max - data_min))
        return nomamized_data

    prev_image = np.zeros((image_height, 0), dtype = np.uint8)
    prev_vols = np.zeros((0,), dtype = np.float64)
        
    while True:        
        data = []
        data.append(q0.get())
        try:
            while True:
                data.append(q0.get(block = False))
        except queue.Empty:
            pass
            
        data_vols = [e[0] for e in data]
        data_minx, data_maxx = data[0][1], data[-1][2]

        vols = np.concatenate(data_vols)[-image_width:]
        prev_vols = prev_vols[-(image_width - vols.size) or prev_vols.size:]
        concat_vols = np.concatenate((prev_vols, vols))[-image_width:]
        vols_min, vols_max = np.amin(concat_vols), np.amax(concat_vols)
        if prev_vols.size > 0 and (vols_min < np.amin(prev_vols) - 10 ** -8 or vols_max > np.amax(prev_vols) + 10 ** -8):
            vols = concat_vols
            prev_image = prev_image[:, :-prev_vols.size]
            prev_vols = prev_vols[:0]

        vols_norm = normalized(
            data = vols, data_min = vols_min,
            data_max = vols_max, maximum_value = image_height,
        )
        
        image = (aranges < vols_norm.astype(np.int32)[None, :]).astype(np.uint8) * 255
        whole_image = np.concatenate((prev_image, image), axis = 1)[:, -image_width:]
        
        q1.put((whole_image, data_maxx - whole_image.shape[1] + 1, data_maxx, vols_min, vols_max))
        
        prev_image = whole_image
        prev_vols = concat_vols

threading.Thread(target = renderer, daemon = True).start()


def visualizer():
    import matplotlib.pyplot as plt, matplotlib.animation
    
    def images():
        while True:
            data = []
            data.append(q1.get())
            try:
                while True:
                    data.append(q1.get(block = False))
            except queue.Empty:
                pass
            minx = min([e[1] for e in data])
            maxx = min([e[2] for e in data])
            miny = min([e[3] for e in data])
            maxy = min([e[4] for e in data])
            image = np.concatenate([e[0] for e in data], axis = 1)[:, -image_width:]
            image = np.pad(image, ((0, 0), (image_width - image.shape[1], 0)), constant_values = 0)
            image = np.repeat(image[:, :, None], 3, axis = -1)
            yield image, minx, maxx, miny, maxy
            
    it = images()
    im = None
    fig = plt.figure(figsize = (image_width / dpi, image_height / dpi), dpi = dpi)
            
    def animate_func(i):
        nonlocal it, im, fig
        image, minx, maxx, miny, maxy = next(it)
        print(f'.', end = '', flush = True)
        if im is None:
            im = plt.imshow(image, interpolation = 'none', aspect = 'auto')
        else:
            im.set_array(image)
        im.set_extent((minx, maxx, miny, maxy))
        return [im]
            
    anim = matplotlib.animation.FuncAnimation(fig, animate_func, frames = round(save_nsec * fps), interval = 1000 / fps)
    
    print('saving...', end = '', flush = True)
    #anim.save('result.mp4', fps = fps, dpi = dpi, extra_args = ['-vcodec', 'libx264'])
    anim.save('result.gif', fps = fps, dpi = dpi, writer = 'imagemagick')
    print('saved!', end = '', flush = True)
    
    plt.show()

threading.Thread(target = visualizer, daemon = True).start()

while True:
    time.sleep(0.1)

然后我决定玩一点,用RGB调色板着色最后一个图像,峰值越高,越是红色,如果它在中间越高,那么它就越绿,如果它足够低,那么它就更蓝了。下面的结果图像是由this coloring code实现的:

img

this code的帮助下,下面是另一个单色动画,线条样式而不是条形样式:

img

相关问题 更多 >