快速读取交错数据的方法?
我有一个文件,里面包含了几个数据通道。这个文件是以一个基本的速率进行采样的,每个通道的采样速率是这个基本速率除以一个数字——这个数字似乎总是2的幂,不过我觉得这并不重要。
举个例子,如果我有通道 a、b 和 c,它们的采样速率分别是1、2和4,那么我的数据流看起来会像这样:
a0 b0 c0 a1 a2 b1 a3 a4 b2 c1 a5 ...
为了增加点乐趣,这些通道可以是浮点数或整数(虽然我知道每个通道的数据类型),而且数据流不一定要在2的幂的地方结束:这个示例数据流在不需要进一步扩展的情况下也是有效的。数值有时很大,有时又是小端格式,不过我一开始就知道我在处理什么。
我有代码可以正确地解包这些数据,并把正确的值填充到numpy数组中,但速度很慢:看起来像这样(希望我没有省略太多;只是给你一个算法的概念):
for sample_num in range(total_samples):
channels_to_sample = [ch for ch in all_channels if ch.samples_for(sample_num)]
format_str = ... # build format string from channels_to_sample
data = struct.unpack( my_file.read( ... ) ) # read and unpack the data
# iterate over data tuple and put values in channels_to_sample
for val, ch in zip(data, channels_to_sample):
ch.data[sample_num / ch.divider] = val
而且速度很慢——在我的笔记本上读取一个20MB的文件需要几秒钟。性能分析工具告诉我,我在 Channel#samples_for()
这个函数上花了很多时间——这很合理,因为这里有一些条件逻辑。
我觉得应该有办法一次性完成这个,而不是用嵌套循环——也许可以用索引技巧把我想要的字节直接读入每个数组?构建一个巨大的、复杂的格式字符串的想法似乎也不太靠谱。
更新
感谢那些回复我的人。顺便说一下,使用numpy的索引技巧把我读取测试数据的时间从大约10秒减少到了0.2秒,速度提升了50倍。
3 个回答
grouper()
这个方法的使用说明可以在这里找到,再加上 itertools.izip()
,应该能对你有所帮助。
把 channel.samples_for(sample_num)
替换成一个 iter_channels(channels_config)
的迭代器,这个迭代器会保持一些内部状态,让你可以一次性读取文件。用法如下:
for (chan, sample_data) in izip(iter_channels(), data):
decoded_data = chan.decode(sample_data)
要实现这个迭代器,可以想象一个基本的时钟,它的周期是1。不同通道的周期是整数。按照顺序遍历这些通道,当时钟的值除以通道的周期余数为零时,就输出这个通道。
for i in itertools.count():
for chan in channels:
if i % chan.period == 0:
yield chan
想要真正提高性能,最好的办法就是不再用Python一个一个循环处理所有样本,而是让NumPy在编译好的C代码中完成这个循环。虽然这有点复杂,但其实是可以做到的。
首先,你需要做一些准备工作。正如Justin Peel所提到的,样本的排列模式在经过一定步骤后会重复。如果d_1, ..., d_k是你k个数据流的除数,而b_1, ..., b_k是这些流的样本大小(以字节为单位),那么lcm就是这些除数的最小公倍数。
N = lcm*sum(b_1/d_1+...+b_k/d_k)
这个值将是样本流模式重复的字节数。如果你已经弄清楚了前N个字节分别属于哪个流,那么你可以简单地重复这个模式。
接下来,你可以通过类似下面的方式构建前N个字节的流索引数组:
stream_index = []
for sample_num in range(lcm):
stream_index += [i for i, ch in enumerate(all_channels)
if ch.samples_for(sample_num)]
repeat_count = [b[i] for i in stream_index]
stream_index = numpy.array(stream_index).repeat(repeat_count)
这里,d
是序列d_1, ..., d_k,而b
是序列b_1, ..., b_k。
现在你可以进行如下操作:
data = numpy.fromfile(my_file, dtype=numpy.uint8).reshape(-1, N)
streams = [data[:,stream_index == i].ravel() for i in range(k)]
你可能需要在数据的末尾稍微填充一些内容,以便让reshape()
正常工作。
现在,你已经将每个流的所有字节放在了不同的NumPy数组中。你可以通过简单地给每个流的dtype
属性重新定义数据格式。如果你想把第一个流解释为大端整数,只需写:
streams[0].dtype = ">i"
这样做不会改变数组中的数据,只是改变了数据的解释方式。
这看起来可能有点复杂,但从性能上来说应该会好很多。