numpy使用预先计算的映射有效地将值从一个矩阵复制到另一个矩阵

2024-06-09 03:42:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个I*J大小的输入矩阵A

输出矩阵B的大小为N*M

还有一些预先计算好的地图,尺寸为N*M*2,它规定了B中的每个坐标,A中的坐标要取。地图没有我可以使用的特定规则或线性。只是一张看起来随机的地图。在

矩阵非常大(~5000*~3000),所以创建映射矩阵是不可能的(5000*3000*5000*3000)

我用了一个简单的地图和循环:

for i in range(N):
    for j in range(M):
        B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]

我用索引法成功地做到了:

^{pr2}$

这样做效果更好,但还是有点慢。在

我有无限的时间提前计算映射或任何其他效用计算。但在这些预先计算之后,这种映射应该尽快进行。在

目前,我看到的唯一的另一个选择就是在C或其他更快的地方重新实现它。。。在

(如果有人好奇,我会用其他形状和方向不同的图像,用一些编码来创建一个图像。但它的映射非常复杂,不是简单或线性的东西可以使用)


Tags: in图像for规则尺寸地方时间地图
3条回答

如果您有无限时间进行预计算,您可以通过使用平面索引来获得一点加速:

map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)

然后简单地做:

^{2}$

请注意,这个加速是在我们从花式索引得到的大加速之上。例如:

>>> A = np.random.random((5000, 3000))
>>> mapping = np.random.randint(0, 15000, (5000, 3000, 2)) % [5000, 3000]
>>> 
>>> map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)
>>> 
>>> np.all(A.ravel()[map_f] == A[mapping[..., 0], mapping[..., 1]])
True
>>> 
>>> timeit('A[mapping[:, :, 0], mappping[:, :, 1]]', globals=globals(), number=10)
4.101239089999581
>>> timeit('A.ravel()[map_f]', globals=globals(), number=10)
2.7831342950012186

如果我们与原始的循环代码相比,加速比将更接近40倍

最后,请注意,此解决方案不仅避免了额外的依赖性和潜在的安装噩梦,而且更简单、更短、更快:

numba:
precomp:    132.957 ms
main        238.359 ms

flat indexing:
precomp:     76.223 ms
main:       219.910 ms

代码:

import numpy as np
from numba import jit

@jit
def fast(A, B, mapping):
    N, M = B.shape
    for i in range(N):
        for j in range(M):
            B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
    return B

from timeit import timeit

A = np.random.random((5000, 3000))
mapping = np.random.randint(0, 15000, (5000, 3000, 2)) % [5000, 3000]

a = np.random.random((5, 3))
m = np.random.randint(0, 15, (5, 3, 2)) % [5, 3]

print('numba:')
print(f"precomp: {timeit('b = fast(a, np.empty_like(a), m)', globals=globals(), number=1)*1000:10.3f} ms")
print(f"main     {timeit('B = fast(A, np.empty_like(A), mapping)', globals=globals(), number=10)*100:10.3f} ms")

print('\nflat indexing:')
print(f"precomp: {timeit('map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)', globals=globals(), number=10)*100:10.3f} ms")
map_f = np.ravel_multi_index((*np.moveaxis(mapping, 2, 0),), A.shape)
print(f"main:    {timeit('B = A.ravel()[map_f]', globals=globals(), number=10)*100:10.3f} ms")

对于这些类型的性能关键问题,一个非常好的解决方案是保持简单,并使用一个高性能包。最简单的可能是Numba,它提供了jit修饰符,它将数组和循环重代码编译为优化的LLVM。下面是一个完整的例子:

from time import time
import numpy as np
from numba import jit

# Function doing the computation
def normal(A, B, mapping):
    N, M = B.shape
    for i in range(N):
        for j in range(M):
            B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
    return B

# The same exact function, but with the Numba jit decorator
@jit
def fast(A, B, mapping):
    N, M = B.shape
    for i in range(N):
        for j in range(M):
            B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]
    return B

# Create sample data
def create_sample_data(I, J, N, M):
    A = np.random.random((I, J))
    B = np.empty((N, M))
    mapping = np.asarray(np.stack((
        np.random.random((N, M))*I,
        np.random.random((N, M))*J,
        ), axis=2), dtype=int)
    return A, B, mapping
A, B, mapping = create_sample_data(500, 600, 700, 800)

# Run normally
t0 = time()
B = normal(A, B, mapping)
t1 = time()
print('normal took', t1 - t0, 'seconds')

# Run using Numba.
# First we should run the function with smaller arrays,
# just to compile the code.
fast(*create_sample_data(5, 6, 7, 8))
# Now, run with real data
t0 = time()
B = fast(A, B, mapping)
t1 = time()
print('fast took', t1 - t0, 'seconds')

这使用了您自己的循环解决方案,使用标准Python时它的速度本来就很慢,但是在使用Numba时它的速度与C一样快。在我的机器上,normal函数执行时间为0.270秒,而fast函数的执行时间为0.00248秒。也就是说,Numba给了我们一个109x的加速(!)差不多是免费的。在

注意,fastNumba函数被调用了两次,首先是用小的输入数组,然后才是实际数据。这是一个经常被忽视的关键步骤。如果没有它,您将发现性能的提高并不如第一次调用编译代码那样好。在这个初始调用中,输入数组的类型和维度应该相同,但是每个维度中的大小并不重要。在

我在函数之外创建B,并将其作为参数传递(要“用值填充”)。您不妨在函数内部分配B,Numba不在乎。在

获得Numba最简单的方法是通过Anaconda分布正确地进行。在

一种选择是使用^{},这通常可以在这种简单的算法代码中提供实质性的改进。在

import numpy as np
from numba import njit

I, J = 5000, 5000
N, M = 3000, 3000

A = np.random.randint(0, 10, [I, J])
B = np.random.randint(0, 10, [N, M])
mapping = np.dstack([np.random.randint(0, I - 1, (N, M)),
                     np.random.randint(0, J - 1, (N, M))])

B0 = B.copy()

def orig(A, B, mapping):
    for i in range(N):
        for j in range(M):
            B[i, j] = A[mapping[i, j, 0], mapping[i, j, 1]]

new = njit(orig)

这给了我们匹配的结果:

^{2}$

而且速度更快:

In [320]: %time orig(A, B0.copy(), mapping)
Wall time: 6.11 s

In [321]: %time new(A, B0.copy(), mapping)
Wall time: 257 ms

在第一次呼叫之后,当它必须执行jit工作时,速度更快:

In [322]: %time new(A, B0.copy(), mapping)
Wall time: 171 ms

In [323]: %time new(A, B0.copy(), mapping)
Wall time: 163 ms

增加两行代码可以提高30倍。在

相关问题 更多 >