将多个1d曲线计算为2d数组(图像)表示

1 投票
2 回答
49 浏览
提问于 2025-04-14 17:34

我想知道有没有人能帮我用Python尽可能高效地完成这个操作。我觉得使用numpy的花式索引是最好的方法,但我一直没能搞定(其他方法也欢迎)。

我有一些简单的数据集,它们的形状像这样的一些曲线:

这里输入图片描述

我想计算它们的图像表示:一个矩阵,曲线下方的值为1,上方的值为0:

这里输入图片描述

我现在的方法是使用np.searchsorted来将一维数组的值映射到图像中的“像素”数量:

import numpy as np
from matplotlib import pyplot as plt

# 1D data
n_data = 8
x_data = np.arange(n_data)
y_data = np.array([0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9])

# 2D empty data array
y_matrix = np.zeros((n_data, n_data))
rows = np.arange(n_data)[:, np.newaxis]

# Approximation to a 0 to 7 interval
threshold_array = np.array([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])
approximation = np.searchsorted(threshold_array, y_data)

然后简单的减法可以生成二维的掩码:

# Use broadcasting to create a boolean mask for the elements to be set to 1
mask = n_data - 1 - rows < approximation
y_matrix[mask] = 1

print(y_matrix) 

这个掩码看起来是这样的:

[[0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 1. 0. 0. 0.]
 [0. 0. 1. 1. 1. 0. 0. 0.]
 [0. 0. 1. 1. 1. 1. 0. 0.]
 [0. 0. 1. 1. 1. 1. 0. 0.]
 [0. 1. 1. 1. 1. 1. 1. 0.]
 [0. 1. 1. 1. 1. 1. 1. 0.]
 [1. 1. 1. 1. 1. 1. 1. 1.]]

现在我想把这个操作应用到成千上万的向量上。例如:

y_data = np.array([[0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
                  [0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
                  [0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9]])

应该得到三个矩阵,或者一个形状为(8,8,3)的数组。有没有人能提供一个高效的方法?

谢谢

2 个回答

2

纯粹使用numpy的实现:

import numpy as np

# data
n_data = 8
x_data = np.arange(n_data)
# I varied the 2nd and 3rd list to better visualize the functionality
y_data = np.array([[0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
                  [1.2, 1.3, 4.9, 7.2, 4.2, 4.7, 2.9, 0.9],
                  [2.2, 2.3, 7.9, 6.2, 5.2, 3.7, 1.9, 0.9]])

images = np.tile(y_data[:,None,:], (1,n_data,1))
images = images > x_data[::-1,None]
images = images.astype(float)

print(images)

输出结果:

[[[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 1. 1. 0. 0. 0.]
  [0. 0. 1. 1. 1. 0. 0. 0.]
  [0. 0. 1. 1. 1. 1. 0. 0.]
  [0. 0. 1. 1. 1. 1. 0. 0.]
  [0. 1. 1. 1. 1. 1. 1. 0.]
  [0. 1. 1. 1. 1. 1. 1. 0.]
  [1. 1. 1. 1. 1. 1. 1. 1.]]

 [[0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 0. 1. 0. 0. 0. 0.]
  [0. 0. 1. 1. 1. 1. 0. 0.]
  [0. 0. 1. 1. 1. 1. 0. 0.]
  [0. 0. 1. 1. 1. 1. 1. 0.]
  [1. 1. 1. 1. 1. 1. 1. 0.]
  [1. 1. 1. 1. 1. 1. 1. 1.]]

 [[0. 0. 1. 0. 0. 0. 0. 0.]
  [0. 0. 1. 1. 0. 0. 0. 0.]
  [0. 0. 1. 1. 1. 0. 0. 0.]
  [0. 0. 1. 1. 1. 0. 0. 0.]
  [0. 0. 1. 1. 1. 1. 0. 0.]
  [1. 1. 1. 1. 1. 1. 0. 0.]
  [1. 1. 1. 1. 1. 1. 1. 0.]
  [1. 1. 1. 1. 1. 1. 1. 1.]]]
3

一个可能的解决办法是使用 (对于大量向量来说,速度提升非常明显):

from numba import njit

@njit
def compute_repr_numba(y_data):
    x, y = y_data.shape

    out = np.zeros(shape=(x, y, y), dtype=np.uint8)

    for i in range(len(y_data)):
        vec = y_data[i]
        out_arr = out[i]
        for j, val in enumerate(vec):
            for k in range(y - 1, y - int(np.ceil(val)) - 1, -1):
                out_arr[k, j] = 1

    return out

y_data = np.array(
    [
        [0.2, 2.3, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
        [0.0, 0.0, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
    ]
)
print(compute_repr_numba_single(y_data))

输出结果:

[[[0 0 0 1 0 0 0 0]
  [0 0 0 1 1 0 0 0]
  [0 0 1 1 1 0 0 0]
  [0 0 1 1 1 1 0 0]
  [0 0 1 1 1 1 0 0]
  [0 1 1 1 1 1 1 0]
  [0 1 1 1 1 1 1 0]
  [1 1 1 1 1 1 1 1]]

 [[0 0 0 1 0 0 0 0]
  [0 0 0 1 1 0 0 0]
  [0 0 1 1 1 0 0 0]
  [0 0 1 1 1 1 0 0]
  [0 0 1 1 1 1 0 0]
  [0 0 1 1 1 1 1 0]
  [0 0 1 1 1 1 1 0]
  [0 0 1 1 1 1 1 1]]]

基准测试(加上并行处理):

import perfplot
from numba import njit, prange


def compute_repr_normal(y_data):
    x, y = y_data.shape
    threshold_array = np.array([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0])

    out = np.zeros(shape=(x, y, y), dtype=np.uint8)

    for i, vec in enumerate(y_data):
        rows = np.arange(y)[:, np.newaxis]
        approximation = np.searchsorted(threshold_array, vec)
        mask = y - 1 - rows < approximation
        out[i, mask] = 1

    return out


@njit
def compute_repr_numba_single(y_data):
    x, y = y_data.shape

    out = np.zeros(shape=(x, y, y), dtype=np.uint8)

    for i in range(len(y_data)):
        vec = y_data[i]
        out_arr = out[i]
        for j, val in enumerate(vec):
            for k in range(y - 1, y - int(np.ceil(val)) - 1, -1):
                out_arr[k, j] = 1

    return out


@njit(parallel=True)
def compute_repr_numba_parallel(y_data):
    x, y = y_data.shape

    out = np.zeros(shape=(x, y, y), dtype=np.uint8)

    for i in prange(len(y_data)):
        vec = y_data[i]
        out_arr = out[i]
        for j, val in enumerate(vec):
            for k in range(y - 1, y - int(np.ceil(val)) - 1, -1):
                out_arr[k, j] = 1

    return out


def compute_repr_oskar(y_data):
    x, y = y_data.shape
    x_data = np.arange(y)

    images = np.tile(y_data[:, None, :], (1, y, 1))
    images = images > x_data[::-1, None]
    images = images.astype(np.uint8)
    return images


y_data = np.array(
    [
        [0.0, 0.0, 5.9, 7.2, 6.2, 4.7, 2.9, 0.9],
    ]
)

assert np.allclose(compute_repr_numba_parallel(y_data), compute_repr_normal(y_data))
assert np.allclose(compute_repr_numba_single(y_data), compute_repr_normal(y_data))
assert np.allclose(compute_repr_oskar(y_data), compute_repr_normal(y_data))

np.random.seed(0)

perfplot.show(
    setup=lambda n: np.random.random(size=(n, 8)),
    kernels=[
        compute_repr_normal,
        compute_repr_numba_parallel,
        compute_repr_numba_single,
        compute_repr_oskar,
    ],
    labels=["normal", "numba_parallel", "numba_single", "compute_repr_oskar"],
    n_range=[5, 10, 25, 100, 500, 1000, 10_000, 50_000, 250_000],
    xlabel="N",
    logx=True,
    logy=True,
    equality_check=np.allclose,
)

在我的电脑上(AMD 5700x)生成了这个图表:

enter image description here

撰写回答