快速将二维字符串numpy数组转换为三维整数数组的方法

1 投票
4 回答
107 浏览
提问于 2025-04-13 17:37

我有一个非常大的numpy数组,里面的内容像这样:

[['0/1' '2/0']
['3/0' '1/4']]

我想把它转换成一个三维数组,像这样:

[[[0 1] [2 0]]
[[3 0] [1 4]]]

这个数组非常宽,有很多列,但行数不多。字符串的可能性大约有100种。这其实不是一个分数,只是用来展示文件里的内容(这是基因组数据,以这种格式给我的)。

我不想并行处理,因为我会在单个CPU上运行这个程序,然后再转到单个GPU,所以额外的CPU在GPU核运行时会闲着。

我试过用numba:

import numpy as np
import itertools
from numba import njit
import time


@njit(nopython=True)
def index_with_numba(data,int_data,indices): 
    for pos in indices:   
        str_match = str(pos[0])+'/'+str(pos[1])
        for i in range(data.shape[0]):
            for j in range(data.shape[1]):
                if data[i, j] == str_match:
                    int_data[i,j] = pos
    return int_data

def generate_masks():
    masks=[]
    def _2d_array(i,j):
        return np.asarray([i,j],dtype=np.int32)
    for i in range(10):
        for j in range(10):
            masks.append(_2d_array(i,j))
    return masks

rows = 100000
cols = 200

numerators = np.random.randint(0, 10, size=(rows,cols))
denominators = np.random.randint(0, 10, size=(rows,cols))

samples =  np.array([f"{numerator}/{denominator}" for numerator, denominator in zip(numerators.flatten(), denominators.flatten())],dtype=str).reshape(rows, cols)
samples_int = np.empty((samples.shape[0],samples.shape[1],2),dtype=np.int32)


# Generate all possible masks
masks = generate_masks()
t0=time.time()
samples_int = index_with_numba(samples,samples_int, masks)
t1=time.time()
print(f"Time to index {t1-t0}")

但是速度太慢,没法用。

Time to index 182.0304057598114

我想这样做的原因是,我想写一个cuda内核,基于原始值执行某个操作——比如对于'0/1'我需要得到0和1等等,但我处理不了字符串。我曾想过也许可以用掩码,但看起来不太合适。

任何建议都很感谢。

4 个回答

1

因为你的整数都是单个数字,所以你可以把输入的数组看作一个 'U1' 数组:

arr_s = np.array([['0/1', '2/0'],
                  ['3/0', '1/4']])

arr_u1 = arr_s.view('U1')
# array([['0', '/', '1', '2', '/', '0'],
#        ['3', '/', '0', '1', '/', '4']], dtype='<U1')

现在,你已经知道字符串中数字的位置了: 你期望结果的 [:, :, 0] 位置的元素在 arr_u1[:, ::3] 中,而 [:, :, 1] 位置的元素在 arr_u1[:, 2::3] 中。

nrows, ncols = arr_s.shape
result = np.zeros((nrows, ncols, 2), dtype=int)
result[:, :, 0] = arr_u1[:, ::3].astype(int)
result[:, :, 1] = arr_u1[:, 2::3].astype(int)

这样就能得到你期望的结果:

array([[[0, 1],
        [2, 0]],

       [[3, 0],
        [1, 4]]])

比较一下你的 index_with_numba 和我的运行时间,我的电脑上速度快了大约20倍:

def pho(data):
    arr_u1 = data.view('U1')
    nrows, ncols = data.shape
    result = np.zeros((nrows, ncols, 2), dtype=int)
    result[:, :, 0] = arr_u1[:, ::3].astype(int)
    result[:, :, 1] = arr_u1[:, 2::3].astype(int)
    return result

t0 = time.time()
samples_int = index_with_numba(samples,samples_int, masks)
t1 = time.time() - t0
samples2_int = pho(samples)
t2 = time.time() - t1 - t0
print(f"index_with_numba: {t1}\npho: {t2}")
index_with_numba: 162.01522159576416
pho: 8.772466897964478
3

另一种解决方案:直接把数组转换成 uint8 类型,然后从正确的值中减去 48,最后再调整形状:

def get_arr(arr):
    out = arr.ravel().view("uint8")[::4]
    out = (out[out != 47] - 48).reshape(arr.shape[0], arr.shape[1], -1)
    return out

arr = np.array([["0/1", "2/0"], ["3/0", "1/4"]])
print(get_arr(arr))

输出结果:

[[[0 1]
  [2 0]]

 [[3 0]
  [1 4]]]

性能测试:

import numpy as np
from timeit import timeit

def get_arr(arr):
    out = arr.ravel().view("uint8")[::4]
    out = (out[out != 47] - 48).reshape(arr.shape[0], arr.shape[1], -1)
    return out


def get_arr_pho(arr):
    nrows, ncols = arr.shape
    arr_u1 = arr.view("U1")
    result = np.zeros((nrows, ncols, 2), dtype=int)
    result[:, :, 0] = arr_u1[:, ::3].astype(int)
    result[:, :, 1] = arr_u1[:, 2::3].astype(int)
    return result

rows = 100000
cols = 200


numerators = np.random.randint(0, 10, size=(rows, cols))
denominators = np.random.randint(1, 10, size=(rows, cols))

arr = np.array(
    [
        f"{numerator}/{denominator}"
        for numerator, denominator in zip(numerators.flatten(), denominators.flatten())
    ],
    dtype=str,
).reshape(rows, cols)

assert np.allclose(get_arr(arr), get_arr_pho(arr))

t1 = timeit("get_arr(arr)", number=1, globals=globals())
t2 = timeit("get_arr_pho(arr)", number=1, globals=globals())
print(t1)
print(t2)

输出结果:

0.1379915471188724
4.7036198258865625
3

Unicode字符串在Numba中的支持非常有限,而且速度非常慢。如果你想让Numba的代码快上很多倍,最简单的方法就是在调用Numba函数之前,把unicode字符串转换成字节字符串。对于这么简单的任务,根本不需要使用GPU。需要注意的是,即使是Numpy进行这种转换也比较慢,因为Numpy主要是为数值计算设计的,所以在这方面并不是特别高效。如果可以的话,建议直接生成字节字符串数组,而不是unicode字符串,因为这里并不需要unicode。

下面是一个例子:

# ~6 seconds on my machine (i5-9600KF CPU)
bsamples = samples.astype('S3')

# 1 ms instead of ~198 s  --  BUT INCORRECT (see below)
index_with_numba(bsamples, samples_int, masks)

更新:

不过有个问题:转换后的Numba函数由于类型不匹配,不再正确你还需要调整Numba函数。实际上,代码可以稍微优化并进行并行处理。下面是优化后的函数(也是正确的):

@nb.njit(parallel=True)
def numba_faster(data,int_data,indices):
    for pos in indices:
        v0 = pos[0] + ord('0')
        v1 = pos[1] + ord('0')
        for i in nb.prange(data.shape[0]):
            for j in range(data.shape[1]):
                if data[i, j][0] == v0 and data[i, j][2] == v1:
                    int_data[i,j] = pos
    return int_data

更快的实现

上面的实现和你最初的代码非常相似,但似乎indices为基础的循环其实并不需要,这个循环让代码变得很慢。实际上,整个data数组被遍历了len(indices)次,每次只设置一个项目。这种做法效率不高。我们可以直接在并行中实时转换字符串

@nb.njit(parallel=True)
def numba_fastest(data,int_data):
    for i in nb.prange(data.shape[0]):
        for j in range(data.shape[1]):
            int_data[i,j,0] = data[i, j][0] - ord('0')
            int_data[i,j,1] = data[i, j][2] - ord('0')
    return int_data

性能评估

以下是在我的机器上(使用i5-9600KF CPU)的性能结果:

Initial code:                        198.31 s
Pho's code:                           11.88 s
numba_faster (with conversion):        6.37 s
numba_fastest (without conversion):    5.77 s
numba_faster (without conversion):     0.62 s
Andrej Kesely's code (latest):         0.20 s
numba_fastest (without conversion):    0.02 s   <-----

numba_fastest是最快的实现。虽然转换仍然是瓶颈,但这部分可以很容易地去掉。还要注意,Numba函数的编译时间没有包含在基准测试中(你可以通过为函数添加签名来提前编译Numba函数)。

最终,numba_fastest比最初的字符串转换代码快34倍,而在没有转换的情况下,速度快了大约10,000倍

撰写回答