Python中大文件的高效缓存和扫描方法
我遇到的问题有点复杂,所以我会尽量提供更完整的信息。对于不耐烦的人,这里是我能总结的最简洁的方式:
有什么最快(执行时间最短)的方法可以把一个文本文件分割成所有(重叠的)长度为 N(比如 N=36)的子字符串,同时去掉换行符。
我正在写一个模块,用来解析 FASTA 格式的基因组文件。这些文件包含了所谓的 'hg18' 人类参考基因组,如果你感兴趣,可以从 UCSC 基因组浏览器 下载(加油,蜗牛们!)。
你会注意到,这些基因组文件由 chr[1..22].fa 和 chr[XY].fa 组成,还有一些其他的小文件,但在这个模块中不使用这些小文件。
已经有几个模块可以解析 FASTA 文件,比如 BioPython 的 SeqIO。(抱歉,我想发个链接,但我还没有足够的积分。)不幸的是,我找到的每个模块都无法完成我想要的具体操作。
我的模块需要把基因组数据(比如 'CAGTACGTCAGACTATACGGAGCTA' 可能是一行)分割成每一个重叠的 N 长度的子字符串。让我用一个非常小的文件举个例子(实际的染色体文件长度在 355 到 2000 万个字符之间),假设 N=8。
>>>import cStringIO >>>example_file = cStringIO.StringIO("""\ >header CAGTcag TFgcACF """) >>>for read in parse(example_file): ... print read ... CAGTCAGTF AGTCAGTFG GTCAGTFGC TCAGTFGCA CAGTFGCAC AGTFGCACF
我找到的在我能想到的方法中性能最好的函数是这个:
def parse(file):
size = 8 # of course in my code this is a function argument
file.readline() # skip past the header
buffer = ''
for line in file:
buffer += line.rstrip().upper()
while len(buffer) >= size:
yield buffer[:size]
buffer = buffer[1:]
这个方法可以工作,但不幸的是,用这种方式解析人类基因组仍然需要大约 1.5 小时(见下面的说明)。也许这是我能用这种方法看到的最好效果了(可能需要完全重构代码,但我想尽量避免,因为这种方法在代码的其他部分有一些非常具体的优势),所以我想把这个问题交给大家。
谢谢大家!
- 注意,这个时间包括了很多额外的计算,比如计算对链读取和在大约 5G 大小的哈希表上进行查找。
回答后的结论:结果发现,使用 fileobj.read() 然后处理得到的字符串(比如 string.replace() 等)相比程序的其他部分,所需的时间和内存都相对较少,所以我采用了这种方法。谢谢大家!
4 个回答
我觉得问题可能是你存储了太多的数据在字符串格式里,这样对你的情况来说其实是很浪费的,导致你内存不够用,可能还在频繁使用交换空间。虽然你有128GB的内存,应该是足够的... :)
既然你在评论中提到你需要存储额外的信息,我建议你可以创建一个单独的类来引用一个父字符串。我用hg18中的chr21.fa文件做了个简单测试;这个文件大约48MB,差不多有100万行。我这里只有1GB的内存,所以测试后我就把对象丢掉了。这个测试不会显示碎片化、缓存等问题,但我觉得它是测量解析速度的一个不错起点:
import mmap
import os
import time
import sys
class Subseq(object):
__slots__ = ("parent", "offset", "length")
def __init__(self, parent, offset, length):
self.parent = parent
self.offset = offset
self.length = length
# these are discussed in comments:
def __str__(self):
return self.parent[self.offset:self.offset + self.length]
def __hash__(self):
return hash(str(self))
def __getitem__(self, index):
# doesn't currently handle slicing
assert 0 <= index < self.length
return self.parent[self.offset + index]
# other methods
def parse(file, size=8):
file.readline() # skip header
whole = "".join(line.rstrip().upper() for line in file)
for offset in xrange(0, len(whole) - size + 1):
yield Subseq(whole, offset, size)
class Seq(object):
__slots__ = ("value", "offset")
def __init__(self, value, offset):
self.value = value
self.offset = offset
def parse_sep_str(file, size=8):
file.readline() # skip header
whole = "".join(line.rstrip().upper() for line in file)
for offset in xrange(0, len(whole) - size + 1):
yield Seq(whole[offset:offset + size], offset)
def parse_plain_str(file, size=8):
file.readline() # skip header
whole = "".join(line.rstrip().upper() for line in file)
for offset in xrange(0, len(whole) - size + 1):
yield whole[offset:offset+size]
def parse_tuple(file, size=8):
file.readline() # skip header
whole = "".join(line.rstrip().upper() for line in file)
for offset in xrange(0, len(whole) - size + 1):
yield (whole, offset, size)
def parse_orig(file, size=8):
file.readline() # skip header
buffer = ''
for line in file:
buffer += line.rstrip().upper()
while len(buffer) >= size:
yield buffer[:size]
buffer = buffer[1:]
def parse_os_read(file, size=8):
file.readline() # skip header
file_size = os.fstat(file.fileno()).st_size
whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
for offset in xrange(0, len(whole) - size + 1):
yield whole[offset:offset+size]
def parse_mmap(file, size=8):
file.readline() # skip past the header
buffer = ""
for line in file:
buffer += line
if len(buffer) >= size:
for start in xrange(0, len(buffer) - size + 1):
yield buffer[start:start + size].upper()
buffer = buffer[-(len(buffer) - size + 1):]
for start in xrange(0, len(buffer) - size + 1):
yield buffer[start:start + size]
def length(x):
return sum(1 for _ in x)
def duration(secs):
return "%dm %ds" % divmod(secs, 60)
def main(argv):
tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
n = 0
for fn in tests:
n += 1
with open(argv[1]) as f:
start = time.time()
length(fn(f))
end = time.time()
print "%d %-20s %s" % (n, fn.__name__, duration(end - start))
fn = parse_mmap
n += 1
with open(argv[1]) as f:
f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
start = time.time()
length(fn(f))
end = time.time()
print "%d %-20s %s" % (n, fn.__name__, duration(end - start))
if __name__ == "__main__":
sys.exit(main(sys.argv))
1 parse 1m 42s
2 parse_sep_str 1m 42s
3 parse_tuple 0m 29s
4 parse_plain_str 0m 36s
5 parse_orig 0m 45s
6 parse_os_read 0m 34s
7 parse_mmap 0m 37s
前四个是我的代码,orig是你的,最后两个是其他人的答案。
用户自定义的对象创建和回收的成本比元组或普通字符串要高得多!这其实不应该让人感到惊讶,但我没想到差别会这么大(对比一下#1和#3,它们的区别仅在于使用了用户自定义类和元组)。你说你想和字符串一起存储额外的信息,比如偏移量(在parse和parse_sep_str的情况下),所以你可以考虑在C扩展模块中实现那种类型。如果你不想直接写C,可以看看Cython和相关的工具。
案例#1和#2是相同的,这是预期中的结果:通过指向一个父字符串,我是想节省内存而不是处理时间,但这个测试并没有测量到这一点。
你能不能把文件映射到内存里,然后用一个滑动窗口来逐步处理它?我写了一个简单的小程序,运行起来非常轻便:
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND sarnold 20919 0.0 0.0 33036 4960 pts/2 R+ 22:23 0:00 /usr/bin/python ./sliding_window.py
处理一个636229字节的fasta文件(这个文件是通过http://biostar.stackexchange.com/questions/1759找到的)花了0.383秒。
#!/usr/bin/python
import mmap
import os
def parse(string, size):
stride = 8
start = string.find("\n")
while start < size - stride:
print string[start:start+stride]
start += 1
fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
一些经典的输入输出(IO)优化方法。
- 使用更底层的读取操作,比如
os.read
,并将数据读入一个大的固定缓冲区。 - 使用多线程或多进程,其中一个负责读取和缓存数据,另一个负责处理数据。
- 如果你有多个处理器或机器,可以使用多进程和消息队列,将处理任务分配到不同的CPU上,就像map-reduce那样。
使用更底层的读取操作其实不需要太大的改动。其他的方法则需要比较大的重写工作。