Python:遍历大文件的最快方法

2 投票
5 回答
3472 浏览
提问于 2025-04-15 19:38

好的,我正在处理一个很大的二进制文件。

我需要尽量缩短这个循环的时间:

def NB2(self, ID_LEN):
    r1=np.fromfile(ReadFile.fid,dTypes.NB_HDR,1)
    num_receivers=r1[0][0]
    num_channels=r1[0][1]
    num_samples=r1[0][5]

    blockReturn = np.zeros((num_samples,num_receivers,num_channels))

    for rec in range(0,num_receivers):
        for chl in range(0,num_channels):
            for smpl in range(0,num_samples):
                r2_iq=np.fromfile(ReadFile.fid,np.int16,2)
                blockReturn[smpl,rec,chl] = np.sqrt(math.fabs(r2_iq[0])*math.fabs(r2_iq[0]) + math.fabs(r2_iq[1])*math.fabs(r2_iq[1]))

    return blockReturn

事情是这样的:r1是文件的头部,dTypes.NB_HDR是我自己定义的一种类型:

NB_HDR= np.dtype([('f3',np.uint32),('f4',np.uint32),('f5',np.uint32),('f6',np.int32),('f7',np.int32),('f8',np.uint32)])

这个类型可以获取即将到来的数据块的所有信息,并且能把我放到文件的正确位置(也就是数据块的开始位置!)。

在这个数据块里,有:

每个通道4096个样本,

每个接收器4个通道,

总共有9个接收器。

所以num_receivers(接收器数量)、num_channels(通道数量)、num_samples(样本数量)这些值现在都是固定的,但如你所见,这数据量相当大。每个“样本”是两个int16(16位整数)值的组合,我想要计算它们的大小(所以用到了毕达哥拉斯定理)。

这段NB2代码会对文件中的每个“块”执行,对于一个12GB的文件(就是这么大),大约有20,900个块,而我需要处理1000个这样的文件(总共12TB)。即使是毫秒级的速度提升,我都会非常感激。

编辑:其实知道我怎么在文件里移动可能会有帮助。我有一个这样的函数:

def navigateTo(self, blockNum, indexNum):
    ReadFile.fid.seek(ReadFile.fileIndex[blockNum][indexNum],0)
    ReadFile.currentBlock = blockNum
    ReadFile.index = indexNum

在运行这些代码之前,我会扫描文件并列出ReadFile.fileIndex中的索引位置,然后用这个函数浏览这些位置,再“寻址”到绝对位置——这样做效率高吗?

谢谢!

5 个回答

1

我会尽量少用循环,多用常量。所有可以线性处理的事情都应该这样做。如果值是固定不变的,就用常量,这样可以减少查找的时间,因为这会消耗很多CPU的运算时间。

这是从理论的角度来说的 ;-)

如果可以的话,尽量使用经过高度优化的库。我不太清楚你想要实现什么,但我宁愿用现成的FFT库,而不是自己去写 :>

还有一件事:http://en.wikipedia.org/wiki/Big_O_notation(这可能会让你大开眼界)

3
import numpy as np
def NB2(self, ID_LEN):
    r1=np.fromfile(ReadFile.fid,dTypes.NB_HDR,1)
    num_receivers=r1[0][0]
    num_channels=r1[0][1]
    num_samples=r1[0][5]

    # first, match your array bounds to the way you are walking the file
    blockReturn = np.zeros((num_receivers,num_channels,num_samples))

    for rec in range(0,num_receivers):
        for chl in range(0,num_channels):
            # second, read in all the samples at once if you have enough memory
            r2_iq=np.fromfile(ReadFile.fid,np.int16,2*num_samples)
            r2_iq.shape = (-1,2) # tell numpy that it is an array of two values

            # create dot product vector by squaring data elementwise, and then
            # adding those elements together.  Results is of length num_samples
            r2_iq = r2_iq * r2_iq
            r2_iq = r2_iq[:,0] + r2_iq[:,1]
            # get the distance by performing the square root "into" blockReturn
            np.sqrt(r2_iq, out=blockReturn[rec,chl,:])

    return blockReturn

这应该能提升你的性能。Numpy的工作有两个主要的思路。首先,你的结果数组的维度应该和你的循环维度相匹配,这样可以提高内存的使用效率。
第二,Numpy是非常快的。我用Numpy的速度比手动写的C代码还要快,这主要是因为它使用了LAPack和向量加速。不过,要想充分利用这个优势,你需要让它一次处理更多的数据。这就是为什么你的示例循环被简化为一次性读取接收器和信道的完整样本。然后,利用Numpy强大的向量运算能力,通过点积来计算你的幅度。

在幅度计算方面还有一些优化的空间,但Numpy会为你重复使用缓冲区,这使得优化的重要性没有你想象的那么大。希望这对你有帮助!

3

因为在读取头部信息后,你就知道一个数据块的长度了,所以可以一次性读取整个数据块。然后再调整一下数组的形状(这个过程非常快,只会影响一些元数据),接着使用 np.hypot 这个函数:

blockData = np.fromfile(ReadFile.fid, np.int16, num_receivers*num_channels*num_samples*2)
blockData = blockData.reshape((num_receivers, num_channes, num_samples, 2))
return np.hypot(blockData[:,:,:,0], blockData[:,:,:,1])

在我的电脑上,每个数据块的处理时间是11毫秒。

撰写回答