获取文本文件的首尾行最有效的方法是什么?
我有一个文本文件,每一行都有一个时间戳。我的目标是找到时间范围。所有的时间都是按顺序排列的,所以第一行是最早的时间,最后一行是最新的时间。我只需要第一行和最后一行。用Python有什么高效的方法可以获取这两行呢?
注意:这些文件的长度相对较大,大约有100万到200万行,我还需要对几百个文件进行这个操作。
13 个回答
这是对SilentGhost回答的一个修改版,可以满足你的需求。
with open(fname, 'rb') as fh:
first = next(fh)
offs = -100
while True:
fh.seek(offs, 2)
lines = fh.readlines()
if len(lines)>1:
last = lines[-1]
break
offs *= 2
print first
print last
这里不需要限制行长度的上限。
from os import SEEK_END, SEEK_CUR
def readlast(f):
try:
f.seek(-2, SEEK_END) # Jump to the second last byte.
while f.read(1) != b"\n": # Until newline is found ...
f.seek(-2, SEEK_CUR) # ... jump back, over the read byte plus one.
except OSError: # Reached begginning of File
f.seek(0) # Set cursor to beginning of file as well.
return f.read() # Read all data from this point on.
with open(path, "rb") as f:
first = f.readline()
last = readlast(f)
在使用 seek
的时候,格式是 fseek(offset, whence=0)
引用自 docs.python.org:
将流的位置更改为给定的字节偏移量。偏移量是相对于由 whence 指定的位置来解释的。whence 的默认值是 SEEK_SET。whence 的值有:
跳跃搜索 (2.7+)
from collections import deque
from os import SEEK_CUR, SEEK_END
def readlast(f, d = b'\n'):
""""readlast(f: io.IOBase, d: bytes = b'\n') -> bytes
Return the last segment of file `f`, containing data segments separated by
`d`.
"""
arr = deque(); step = 1; pos = -1
try:
# Seek to last byte of file, save it to arr as to not check for newline.
pos = f.seek(-1, SEEK_END)
arr.appendleft(f.read())
# Seek past the byte read, plus one to use as the first segment.
pos = f.seek(-2, SEEK_END)
seg = f.read(1)
# Break when 'd' occurs, store index of the rightmost match in 'i'.
while seg.rfind(d) == -1:
# Store segments with no b'\n' in a memory-efficient 'deque'.
arr.appendleft(seg)
# Step back in file, past the bytes just read plus twice that.
pos = f.seek(-step*3, SEEK_CUR)
# Read new segment, twice as big as the one read previous iteration.
step *= 2
seg = f.read(step)
# Ignore the characters up to 'i', and the triggering newline character.
arr.appendleft(seg[seg.rfind(d)+1:])
except OSError:
# Reached beginning of file. Read remaining data and check for newline.
f.seek(0)
seg = f.read(pos)
arr.appendleft(seg[seg.rfind(d)+1:])
return b"".join(arr)
我今天可能会选择一个使用指数增长步长的函数,所以在这里添加了这样的例子,并且会把它和原始答案一起保留(暂时)。
它很好地处理了边缘情况,除了多字节分隔符和以文本模式打开的文件(请参见“边缘情况”以获取处理这些情况的示例)。
用法:
f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'
边缘情况 (2.7+)
我没有编辑原始答案,因为问题特别询问效率,并且为了尊重之前的投票。
这个版本解决了多年来提出的所有评论和问题,同时保持了逻辑和向后兼容性(虽然可读性有所牺牲)。
在写作时提出并解决的问题包括:
- 解析空文件时返回空字符串,这在 评论中由 Loïc 提到。
- 当没有找到分隔符时返回所有内容,这在 评论中由 LazyLeopard 提到。
- 避免使用相对偏移量以支持 文本模式,这是在 评论中由 AnotherParker 提到的。
- UTF16/UTF32 的处理,这在 评论中由 Pietro Battiston 提到。
还支持多字节分隔符。
from os import SEEK_CUR, SEEK_END
def _readlast__bytes(f, sep, size, step):
# Point cursor 'size' + 'step' bytes away from the end of the file.
o = f.seek(0 - size - step, SEEK_END)
# Step 'step' bytes each iteration, halt when 'sep' occurs.
while f.read(size) != sep:
f.seek(0 - size - step, SEEK_CUR)
def _readlast__text(f, sep, size, step):
# Text mode, same principle but without the use of relative offsets.
o = f.seek(0, SEEK_END)
o = f.seek(o - size - step)
while f.read(size) != sep:
o = f.seek(o - step)
def readlast(f, sep, fixed = False):
"""readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str
Return the last segment of file `f`, containing data segments separated by
`sep`.
Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
to pass the correct delimiter) in files opened in byte mode.
"""
size = len(sep)
step = len(sep) if (fixed is True) else (fixed or 1)
step = size if fixed else 1
if not size:
raise ValueError("Zero-length separator.")
try:
if 'b' in f.mode:
# Process file opened in byte mode.
_readlast__bytes(f, sep, size, step)
else:
# Process file opened in text mode.
_readlast__text(f, sep, size, step)
except (OSError, ValueError):
# Beginning of file reached.
f.seek(0, SEEK_SET)
return f.read()
用法:
f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"
效率
用于与这个答案进行比较的代码(在发布时是最受欢迎答案的优化版本):
with open(file, "rb") as f:
first = f.readline() # Read and store the first line.
for last in f: pass # Read all lines, keep final value.
结果:
10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s
正如问题所述,“每个文件1-2百万行”,当然会大大增加差异。
with open(fname, 'rb') as fh:
first = next(fh).decode()
fh.seek(-1024, 2)
last = fh.readlines()[-1].decode()
这里的变量值是1024:它代表的是平均字符串长度。我选择1024只是为了举个例子。如果你对平均行长度有个大概的估计,可以用那个值乘以2。
因为你对行长度的最大值完全没有头绪,显而易见的解决办法就是遍历整个文件:
for line in fh:
pass
last = line
你不需要担心二进制标志,直接用 open(fname)
就可以了。
补充说明:由于你有很多文件需要处理,可以用 random.sample
随机抽取几十个文件,然后在这些文件上运行这段代码来确定最后一行的长度。假设你预先设定一个较大的偏移值(比如1MB)。这将帮助你估算出完整运行时的值。