如何基于另一个具有重复索引的数组获取numpy数组中的值之和

2024-04-28 20:24:08 发布

您现在位置:Python中文网/ 问答频道 /正文

data_values = np.random.rand(10)
data_ind = np.random.randint(0,10,10)
    
data_values = (array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973  ,
            0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214]),
    
data_ind = array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3]))

期望输出:

0 - 0.91693   
1 - 0.36218693  
2 - 0.54130794 + 0.22339518  
3 - 0.47195214  
4 - 0.17958372  
5 - 0.57734696  
6 -  0.14956333 + 0.74504583  
output = array([0.916973, 0.36218694, 0.7647031, 0.47195214, 0.17958371, 0.577347, 0.89460915, 0.8144459], dtype=float32)

我写了很多东西

nodal_values = np.zeros(8, dtype=np.float32)  
for nodes in range(8):  
    nodal_values[nodes] = np.sum(data_values[np.where(data == nodes)[0]])

上述方法需要大量时间,而

a = ((np.mgrid[:M,:N] == b)[0] * c).sum(axis=1)

为数百万的大数据提供内存错误

我正在寻找一种优化的方法


Tags: 方法outputdatanprandomarraynodesvalues
1条回答
网友
1楼 · 发布于 2024-04-28 20:24:08

请签出stackoverflow question guidelines以便提出更好的问题,并正确设置问题格式


选择权

原始代码

这就是您想要为N的大值优化的内容(我自由地编辑了您的代码,使其没有硬编码的值,并修复了一个打字错误,data_values而不是data):

data_values = np.random.rand(N) 
data_ind = np.random.randint(0, N, N)

xsize = data_ind.max() + 1
nodal_values = np.zeros(xsize, dtype=np.float32)  
for nodes in range(xsize):  
    nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

稍好的版本(为了可读性)

我创建了以下版本,该版本提高了可读性并取消了np.where的使用:

idx = np.arange(xsize)[:, None] == data_ind
nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)] # Python list

更好的版本

我在here中通过@Divakar实现了已接受的答案(一定要检查它以更好地理解它),具体到您的案例中:

_, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
nodal_values = np.bincount(idx, data_values) # Same shape and type as your version

比较

使用原始值:

data_values = np.array([0.81444589, 0.57734696, 0.54130794, 0.22339518, 0.916973, 0.14956333, 0.74504583, 0.36218693, 0.17958372, 0.47195214])
data_ind = np.array([7, 5, 2, 2, 0, 6, 6, 1, 4, 3])

我使用timeit模块(mean ± std. dev. of 7 runs, 10000000 loops each)获得了以下性能:

Original code: 49.2 +- 11.1 ns
Much better version: 45.2 +- 4.98 ns
Slightly better version: 36.4 +- 2.81 ns

对于非常小的N值,即1到10,没有显著差异。然而,对于大型的,使用哪一个是毫无疑问的;带有for循环的两个版本花费的时间都太长,而矢量化实现的速度非常快

Small N comparisonBig N comparison

测试它的代码

import numpy as np
import timeit
import matplotlib.pyplot as plt

def original_code():
    xsize = data_ind.max() + 1
    nodal_values = np.zeros(xsize, dtype=np.float32)
    for nodes in range(xsize):
        nodal_values[nodes] = np.sum(data_values[np.where(data_ind == nodes)[0]])

def much_better():
    _, idx, _ = np.unique(data_ind, return_counts=True, return_inverse=True)
    nodal_values = np.bincount(idx, data_values)

def slightly_better():
    xsize = data_ind.max() + 1
    idx = np.arange(xsize)[:, None] == data_ind
    nodal_values = [np.sum(data_values[idx[i]]) for i in range(xsize)]

sizes = [i*5 for i in range(1, 7)]
original_code_times = np.zeros((len(sizes),))
slightly_better_times = np.zeros((len(sizes),))
much_better_times = np.zeros((len(sizes),))
for i, N in enumerate(sizes):
    print(N)
    data_values = np.random.rand(N)
    data_ind = np.random.randint(0, N, N)

    # Divided by 100 repeats to get average
    original_code_times[i] = timeit.timeit(original_code, number=100) / 100
    much_better_times[i] = timeit.timeit(much_better, number=100) / 100
    slightly_better_times[i] = timeit.timeit(slightly_better, number=100) / 100

# Multiply by 1000 to get everything in ms
original_code_times *= 1000
slightly_better_times *= 1000
much_better_times *= 1000

# %%
plt.figure(dpi=120)
plt.title("Small N's")
plt.plot(sizes, original_code_times, label="Original code")
plt.plot(sizes, slightly_better_times, label="Slightly better")
plt.plot(sizes, much_better_times, label="Much better")
plt.ylabel("Time [ms]")
plt.xlabel("N")
plt.xticks(sizes)
plt.legend()
plt.savefig("small_N.png", dpi=120)
plt.show()
plt.close()

我希望这能帮助那些偶然发现这一点的人

相关问题 更多 >