Python中大文件的高效缓存和扫描方法

9 投票
4 回答
8812 浏览
提问于 2025-04-16 10:38

我遇到的问题有点复杂,所以我会尽量提供更完整的信息。对于不耐烦的人,这里是我能总结的最简洁的方式:

有什么最快(执行时间最短)的方法可以把一个文本文件分割成所有(重叠的)长度为 N(比如 N=36)的子字符串,同时去掉换行符。

我正在写一个模块,用来解析 FASTA 格式的基因组文件。这些文件包含了所谓的 'hg18' 人类参考基因组,如果你感兴趣,可以从 UCSC 基因组浏览器 下载(加油,蜗牛们!)。

你会注意到,这些基因组文件由 chr[1..22].fa 和 chr[XY].fa 组成,还有一些其他的小文件,但在这个模块中不使用这些小文件。

已经有几个模块可以解析 FASTA 文件,比如 BioPython 的 SeqIO。(抱歉,我想发个链接,但我还没有足够的积分。)不幸的是,我找到的每个模块都无法完成我想要的具体操作。

我的模块需要把基因组数据(比如 'CAGTACGTCAGACTATACGGAGCTA' 可能是一行)分割成每一个重叠的 N 长度的子字符串。让我用一个非常小的文件举个例子(实际的染色体文件长度在 355 到 2000 万个字符之间),假设 N=8。

>>>import cStringIO
>>>example_file = cStringIO.StringIO("""\
>header
CAGTcag
TFgcACF
""")
>>>for read in parse(example_file):
...    print read
...
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

我找到的在我能想到的方法中性能最好的函数是这个:


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

这个方法可以工作,但不幸的是,用这种方式解析人类基因组仍然需要大约 1.5 小时(见下面的说明)。也许这是我能用这种方法看到的最好效果了(可能需要完全重构代码,但我想尽量避免,因为这种方法在代码的其他部分有一些非常具体的优势),所以我想把这个问题交给大家。

谢谢大家!

  • 注意,这个时间包括了很多额外的计算,比如计算对链读取和在大约 5G 大小的哈希表上进行查找。

回答后的结论:结果发现,使用 fileobj.read() 然后处理得到的字符串(比如 string.replace() 等)相比程序的其他部分,所需的时间和内存都相对较少,所以我采用了这种方法。谢谢大家!

4 个回答

3

我觉得问题可能是你存储了太多的数据在字符串格式里,这样对你的情况来说其实是很浪费的,导致你内存不够用,可能还在频繁使用交换空间。虽然你有128GB的内存,应该是足够的... :)

既然你在评论中提到你需要存储额外的信息,我建议你可以创建一个单独的类来引用一个父字符串。我用hg18中的chr21.fa文件做了个简单测试;这个文件大约48MB,差不多有100万行。我这里只有1GB的内存,所以测试后我就把对象丢掉了。这个测试不会显示碎片化、缓存等问题,但我觉得它是测量解析速度的一个不错起点:

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

前四个是我的代码,orig是你的,最后两个是其他人的答案。

用户自定义的对象创建和回收的成本比元组或普通字符串要高得多!这其实不应该让人感到惊讶,但我没想到差别会这么大(对比一下#1和#3,它们的区别仅在于使用了用户自定义类和元组)。你说你想和字符串一起存储额外的信息,比如偏移量(在parse和parse_sep_str的情况下),所以你可以考虑在C扩展模块中实现那种类型。如果你不想直接写C,可以看看Cython和相关的工具。

案例#1和#2是相同的,这是预期中的结果:通过指向一个父字符串,我是想节省内存而不是处理时间,但这个测试并没有测量到这一点。

4

你能不能把文件映射到内存里,然后用一个滑动窗口来逐步处理它?我写了一个简单的小程序,运行起来非常轻便:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py

处理一个636229字节的fasta文件(这个文件是通过http://biostar.stackexchange.com/questions/1759找到的)花了0.383秒。

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
3

一些经典的输入输出(IO)优化方法。

  • 使用更底层的读取操作,比如 os.read,并将数据读入一个大的固定缓冲区。
  • 使用多线程或多进程,其中一个负责读取和缓存数据,另一个负责处理数据。
  • 如果你有多个处理器或机器,可以使用多进程和消息队列,将处理任务分配到不同的CPU上,就像map-reduce那样。

使用更底层的读取操作其实不需要太大的改动。其他的方法则需要比较大的重写工作。

撰写回答