获取文本文件的首尾行最有效的方法是什么?

79 投票
13 回答
145166 浏览
提问于 2025-04-16 01:55

我有一个文本文件,每一行都有一个时间戳。我的目标是找到时间范围。所有的时间都是按顺序排列的,所以第一行是最早的时间,最后一行是最新的时间。我只需要第一行和最后一行。用Python有什么高效的方法可以获取这两行呢?

注意:这些文件的长度相对较大,大约有100万到200万行,我还需要对几百个文件进行这个操作。

13 个回答

25

这是对SilentGhost回答的一个修改版,可以满足你的需求。

with open(fname, 'rb') as fh:
    first = next(fh)
    offs = -100
    while True:
        fh.seek(offs, 2)
        lines = fh.readlines()
        if len(lines)>1:
            last = lines[-1]
            break
        offs *= 2
    print first
    print last

这里不需要限制行长度的上限。

95
from os import SEEK_END, SEEK_CUR

def readlast(f):
    try:
        f.seek(-2, SEEK_END)       # Jump to the second last byte.
        while f.read(1) != b"\n":  #  Until newline is found ...
            f.seek(-2, SEEK_CUR)   #  ... jump back, over the read byte plus one.
    except OSError:                # Reached begginning of File
        f.seek(0)                  #  Set cursor to beginning of file as well.
    return f.read()                # Read all data from this point on.
        
with open(path, "rb") as f:
    first = f.readline()
    last  = readlast(f)

在使用 seek 的时候,格式是 fseek(offset, whence=0)

引用自 docs.python.org

将流的位置更改为给定的字节偏移量。偏移量是相对于由 whence 指定的位置来解释的。whence 的默认值是 SEEK_SET。whence 的值有:

  • SEEK_SET0 = 流的开始(默认值);偏移量应该是零或正数
  • SEEK_CUR1 = 当前流的位置;偏移量可以是负数
  • SEEK_END2 = 流的结束;偏移量通常是负数

跳跃搜索 (2.7+)

from collections import deque
from os import SEEK_CUR, SEEK_END

def readlast(f, d = b'\n'):
    """"readlast(f: io.IOBase, d: bytes = b'\n') -> bytes

    Return the last segment of file `f`, containing data segments separated by
    `d`.
    """
    arr = deque(); step = 1; pos = -1
    try:
        # Seek to last byte of file, save it to arr as to not check for newline.
        pos = f.seek(-1, SEEK_END) 
        arr.appendleft(f.read())
        # Seek past the byte read, plus one to use as the first segment.
        pos = f.seek(-2, SEEK_END) 
        seg = f.read(1)
        # Break when 'd' occurs, store index of the rightmost match in 'i'.
        while seg.rfind(d) == -1:
            # Store segments with no b'\n' in a memory-efficient 'deque'.
            arr.appendleft(seg)
            # Step back in file, past the bytes just read plus twice that.
            pos = f.seek(-step*3, SEEK_CUR)
            # Read new segment, twice as big as the one read previous iteration.
            step *= 2
            seg = f.read(step)
        # Ignore the characters up to 'i', and the triggering newline character.
        arr.appendleft(seg[seg.rfind(d)+1:])
    except OSError: 
        # Reached beginning of file. Read remaining data and check for newline.
        f.seek(0)
        seg = f.read(pos)
        arr.appendleft(seg[seg.rfind(d)+1:])
    return b"".join(arr)

我今天可能会选择一个使用指数增长步长的函数,所以在这里添加了这样的例子,并且会把它和原始答案一起保留(暂时)。

它很好地处理了边缘情况,除了多字节分隔符和以文本模式打开的文件(请参见“边缘情况”以获取处理这些情况的示例)。

用法:

f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'

边缘情况 (2.7+)

我没有编辑原始答案,因为问题特别询问效率,并且为了尊重之前的投票。

这个版本解决了多年来提出的所有评论和问题,同时保持了逻辑和向后兼容性(虽然可读性有所牺牲)。

在写作时提出并解决的问题包括:

  • 解析空文件时返回空字符串,这在 评论中由 Loïc 提到。
  • 当没有找到分隔符时返回所有内容,这在 评论中由 LazyLeopard 提到。
  • 避免使用相对偏移量以支持 文本模式,这是在 评论中由 AnotherParker 提到的。
  • UTF16/UTF32 的处理,这在 评论中由 Pietro Battiston 提到。

还支持多字节分隔符。

from os import SEEK_CUR, SEEK_END

def _readlast__bytes(f, sep, size, step):
    # Point cursor 'size' + 'step' bytes away from the end of the file.
    o = f.seek(0 - size - step, SEEK_END)
    # Step 'step' bytes each iteration, halt when 'sep' occurs.
    while f.read(size) != sep:
        f.seek(0 - size - step, SEEK_CUR)

def _readlast__text(f, sep, size, step):
    # Text mode, same principle but without the use of relative offsets.
    o = f.seek(0, SEEK_END)
    o = f.seek(o - size - step)
    while f.read(size) != sep:
        o = f.seek(o - step)

def readlast(f, sep, fixed = False):
    """readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str

    Return the last segment of file `f`, containing data segments separated by
    `sep`.

    Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
    to pass the correct delimiter) in files opened in byte mode.
    """
    size = len(sep)
    step = len(sep) if (fixed is True) else (fixed or 1)
    step = size if fixed else 1
    if not size:
        raise ValueError("Zero-length separator.")
    try:
        if 'b' in f.mode:
            # Process file opened in byte mode.
            _readlast__bytes(f, sep, size, step)
        else:
            # Process file opened in text mode.
            _readlast__text(f, sep, size, step)
    except (OSError, ValueError): 
        # Beginning of file reached.
        f.seek(0, SEEK_SET)
    return f.read()

用法:

f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"

效率

用于与这个答案进行比较的代码(在发布时是最受欢迎答案的优化版本):

with open(file, "rb") as f:
    first = f.readline()     # Read and store the first line.
    for last in f: pass      # Read all lines, keep final value.

结果:

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs  6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s

正如问题所述,“每个文件1-2百万行”,当然会大大增加差异。

67

io模块的文档

with open(fname, 'rb') as fh:
    first = next(fh).decode()

    fh.seek(-1024, 2)
    last = fh.readlines()[-1].decode()

这里的变量值是1024:它代表的是平均字符串长度。我选择1024只是为了举个例子。如果你对平均行长度有个大概的估计,可以用那个值乘以2。

因为你对行长度的最大值完全没有头绪,显而易见的解决办法就是遍历整个文件:

for line in fh:
    pass
last = line

你不需要担心二进制标志,直接用 open(fname) 就可以了。

补充说明:由于你有很多文件需要处理,可以用 random.sample 随机抽取几十个文件,然后在这些文件上运行这段代码来确定最后一行的长度。假设你预先设定一个较大的偏移值(比如1MB)。这将帮助你估算出完整运行时的值。

撰写回答