为什么readlines()读取的内容超过sizehint?
背景
我正在用 Python 2.7.6 处理非常大的文本文件(超过 30GB)。为了加快处理速度,我把文件分成小块,然后利用 multiprocessing 库把这些小块分配给子进程。具体来说,我在主进程中遍历文件,记录下我想要分割的字节位置,然后把这些位置传给子进程,子进程再打开输入文件,使用 file.readlines(chunk_size)
来读取它们的那一块内容。不过,我发现读取到的小块似乎比我设定的 sizehint
参数大了很多(大约是 4 倍)。
问题
为什么 sizehint
的设置没有被遵循呢?
示例代码
以下代码展示了我遇到的问题:
import sys
# set test chunk size to 2KB
chunk_size = 1024 * 2
count = 0
chunk_start = 0
chunk_list = []
fi = open('test.txt', 'r')
while True:
# increment chunk counter
count += 1
# calculate new chunk end, advance file pointer
chunk_end = chunk_start + chunk_size
fi.seek(chunk_end)
# advance file pointer to end of current line so chunks don't have broken
# lines
fi.readline()
chunk_end = fi.tell()
# record chunk start and stop positions, chunk number
chunk_list.append((chunk_start, chunk_end, count))
# advance start to current end
chunk_start = chunk_end
# read a line to confirm we're not past the end of the file
line = fi.readline()
if not line:
break
# reset file pointer from last line read
fi.seek(chunk_end, 0)
fi.close()
# This code represents the action taken by subprocesses, but each subprocess
# receives one chunk instead of iterating the list of chunks itself.
with open('test.txt', 'r', 0) as fi:
# iterate over chunks
for chunk in chunk_list:
chunk_start, chunk_end, chunk_num = chunk
# advance file pointer to chunk start
fi.seek(chunk_start, 0)
# print some notes and read in the chunk
sys.stdout.write("Chunk #{0}: Size: {1} Start {2} Real Start: {3} Stop {4} "
.format(chunk_num, chunk_end-chunk_start, chunk_start, fi.tell(), chunk_end))
chunk = fi.readlines(chunk_end - chunk_start)
print("Real Stop: {0}".format(fi.tell()))
# write the chunk out to a file for examination
with open('test_chunk{0}'.format(chunk_num), 'w') as fo:
fo.writelines(chunk)
结果
我用一个大约 23.3KB 的输入文件(test.txt)运行了这段代码,得到了以下输出:
块 #1: 大小: 2052 开始 0 实际开始: 0 停止 2052 实际停止: 8193
块 #2: 大小: 2051 开始 2052 实际开始: 2052 停止 4103 实际停止: 10248
块 #3: 大小: 2050 开始 4103 实际开始: 4103 停止 6153 实际停止: 12298
块 #4: 大小: 2050 开始 6153 实际开始: 6153 停止 8203 实际停止: 14348
块 #5: 大小: 2050 开始 8203 实际开始: 8203 停止 10253 实际停止: 16398
块 #6: 大小: 2050 开始 10253 实际开始: 10253 停止 12303 实际停止: 18448
块 #7: 大小: 2050 开始 12303 实际开始: 12303 停止 14353 实际停止: 20498
块 #8: 大小: 2050 开始 14353 实际开始: 14353 停止 16403 实际停止: 22548
块 #9: 大小: 2050 开始 16403 实际开始: 16403 停止 18453 实际停止: 23893
块 #10: 大小: 2050 开始 18453 实际开始: 18453 停止 20503 实际停止: 23893
块 #11: 大小: 2050 开始 20503 实际开始: 20503 停止 22553 实际停止: 23893
块 #12: 大小: 2048 开始 22553 实际开始: 22553 停止 24601 实际停止: 23893
每个块的大小大约都是 2KB,所有的开始和停止位置都对得上,fi.tell()
报告的实际文件位置也看起来正确,所以我觉得我的分块算法是没问题的。不过,实际的停止位置显示 readlines()
读取的内容远超出我设定的大小提示。而且,输出文件 #1 到 #8 的大小都是 8.0KB,这比我设定的大小提示要大得多。
即使我尝试只在行尾进行分块的做法不对,readlines()
也不应该读取超过 2KB 加一行的内容。文件 #9 到 #12 的大小逐渐变小,这很合理,因为这些块的起始点越来越接近文件的末尾,而 readlines()
不会读取超过文件末尾的内容。
备注
- 我的测试输入文件每行简单地打印了 "< 行号 >\n",从 1 到 5000。
- 我尝试了不同的块和输入文件大小,结果类似。
- readlines 的文档说读取的大小可能会被向上舍入到内部缓冲区的大小,所以我尝试了不使用缓冲区打开文件(如上所示),但没有任何区别。
- 我使用这个算法来分割文件,因为我需要支持 *.bz2 和 *.gz 压缩文件,而 *.gz 文件在不解压的情况下无法识别未压缩的文件大小。*.bz2 文件也没有这个功能,但我可以从文件末尾向前查找 0 字节,并使用
fi.tell()
获取文件大小。请参见我相关的问题。 - 在需要支持压缩文件的要求之前,脚本的前一个版本使用
os.path.getsize()
作为分块循环的停止条件,readlines 在那种情况下似乎工作得很好。
2 个回答
这虽然没有直接回答你的问题,但或许能帮到你...
我觉得可能有更好的方法来处理你的文件,这样就能避免你现在遇到的问题。这只是我的想法,不过既然文件是可以逐步读取的,那像这样做是否可行呢?
import bzip2
import gzip
from multiprocessing import Pool, cpu_count
def chunker(filepath):
"""define and yield chunks"""
if filepath.endswith(".bz"):
read_open = bzip2.open
elif filepath.endswith(".gz"):
read_open = gzip.open
with read_open(filepath) as in_f:
delim = "something"
chunk = []
for line in in_f:
if delim not in line:
chunk.append(line)
else:
current, next_ = line.split(delim)
chunk.append(current)
yield chunk
chunk = [next_]
if chunk:
yield chunk
def process_chunk(chunk):
# do magic
return
if __name__ == '__main__':
filepath = ""
chunk_iter = chunker(filepath)
pool = Pool(processes=cpu_count() - 1)
for result in pool.imap(process_chunk, chunk_iter , chunksize=1)
print result
或者,如果你已经在用一种方法一次性读取并生成了文件块的列表,那为什么不在读取的时候把每个文件块写成单独的文件呢(前提是你有足够的磁盘空间)?这样你就可以给一个工作池提供一系列文件路径来处理。
另外,如果你的工作者处理文件块的速度足够快,并且你有足够的内存,你可以在读取的时候把整个文件块传给一个队列。然后,工作者就可以从队列中获取文件块。
文档中提到的 readlines
的缓冲区和你在 open
函数的第三个参数控制的缓冲区不是一回事。这里说的缓冲区是指 file_readlines
中的这个缓冲区:
static PyObject *
file_readlines(PyFileObject *f, PyObject *args)
{
long sizehint = 0;
PyObject *list = NULL;
PyObject *line;
char small_buffer[SMALLCHUNK];
其中 SMALLCHUNK
是在之前定义的:
#if BUFSIZ < 8192
#define SMALLCHUNK 8192
#else
#define SMALLCHUNK BUFSIZ
#endif
我不知道 BUFSIZ
是从哪里来的,但看起来你遇到的是 #define SMALLCHUNK 8192
的情况。无论如何,readlines
永远不会使用小于 8 KiB 的缓冲区,所以你可能需要把你的块设置得比这个更大。