快速读取交错数据的方法?

9 投票
3 回答
2605 浏览
提问于 2025-04-16 07:21

我有一个文件,里面包含了几个数据通道。这个文件是以一个基本的速率进行采样的,每个通道的采样速率是这个基本速率除以一个数字——这个数字似乎总是2的幂,不过我觉得这并不重要。

举个例子,如果我有通道 abc,它们的采样速率分别是1、2和4,那么我的数据流看起来会像这样:

a0 b0 c0 a1 a2 b1 a3 a4 b2 c1 a5 ...

为了增加点乐趣,这些通道可以是浮点数或整数(虽然我知道每个通道的数据类型),而且数据流不一定要在2的幂的地方结束:这个示例数据流在不需要进一步扩展的情况下也是有效的。数值有时很大,有时又是小端格式,不过我一开始就知道我在处理什么。

我有代码可以正确地解包这些数据,并把正确的值填充到numpy数组中,但速度很慢:看起来像这样(希望我没有省略太多;只是给你一个算法的概念):

for sample_num in range(total_samples):
    channels_to_sample = [ch for ch in all_channels if ch.samples_for(sample_num)]
    format_str = ... # build format string from channels_to_sample
    data = struct.unpack( my_file.read( ... ) ) # read and unpack the data
    # iterate over data tuple and put values in channels_to_sample
    for val, ch in zip(data, channels_to_sample):
        ch.data[sample_num / ch.divider] = val

而且速度很慢——在我的笔记本上读取一个20MB的文件需要几秒钟。性能分析工具告诉我,我在 Channel#samples_for() 这个函数上花了很多时间——这很合理,因为这里有一些条件逻辑。

我觉得应该有办法一次性完成这个,而不是用嵌套循环——也许可以用索引技巧把我想要的字节直接读入每个数组?构建一个巨大的、复杂的格式字符串的想法似乎也不太靠谱。

更新

感谢那些回复我的人。顺便说一下,使用numpy的索引技巧把我读取测试数据的时间从大约10秒减少到了0.2秒,速度提升了50倍。

3 个回答

1

grouper() 这个方法的使用说明可以在这里找到,再加上 itertools.izip(),应该能对你有所帮助。

2

channel.samples_for(sample_num) 替换成一个 iter_channels(channels_config) 的迭代器,这个迭代器会保持一些内部状态,让你可以一次性读取文件。用法如下:

for (chan, sample_data) in izip(iter_channels(), data):
    decoded_data = chan.decode(sample_data)

要实现这个迭代器,可以想象一个基本的时钟,它的周期是1。不同通道的周期是整数。按照顺序遍历这些通道,当时钟的值除以通道的周期余数为零时,就输出这个通道。

for i in itertools.count():
    for chan in channels:
        if i % chan.period == 0:
            yield chan
7

想要真正提高性能,最好的办法就是不再用Python一个一个循环处理所有样本,而是让NumPy在编译好的C代码中完成这个循环。虽然这有点复杂,但其实是可以做到的。

首先,你需要做一些准备工作。正如Justin Peel所提到的,样本的排列模式在经过一定步骤后会重复。如果d_1, ..., d_k是你k个数据流的除数,而b_1, ..., b_k是这些流的样本大小(以字节为单位),那么lcm就是这些除数的最小公倍数。

N = lcm*sum(b_1/d_1+...+b_k/d_k)

这个值将是样本流模式重复的字节数。如果你已经弄清楚了前N个字节分别属于哪个流,那么你可以简单地重复这个模式。

接下来,你可以通过类似下面的方式构建前N个字节的流索引数组:

stream_index = []
for sample_num in range(lcm):
    stream_index += [i for i, ch in enumerate(all_channels)
                     if ch.samples_for(sample_num)]
repeat_count = [b[i] for i in stream_index]
stream_index = numpy.array(stream_index).repeat(repeat_count)

这里,d是序列d_1, ..., d_k,而b是序列b_1, ..., b_k。

现在你可以进行如下操作:

data = numpy.fromfile(my_file, dtype=numpy.uint8).reshape(-1, N)
streams = [data[:,stream_index == i].ravel() for i in range(k)]

你可能需要在数据的末尾稍微填充一些内容,以便让reshape()正常工作。

现在,你已经将每个流的所有字节放在了不同的NumPy数组中。你可以通过简单地给每个流的dtype属性重新定义数据格式。如果你想把第一个流解释为大端整数,只需写:

streams[0].dtype = ">i"

这样做不会改变数组中的数据,只是改变了数据的解释方式。

这看起来可能有点复杂,但从性能上来说应该会好很多。

撰写回答