为什么readlines()读取的内容超过sizehint?

1 投票
2 回答
2522 浏览
提问于 2025-04-20 18:52

背景

我正在用 Python 2.7.6 处理非常大的文本文件(超过 30GB)。为了加快处理速度,我把文件分成小块,然后利用 multiprocessing 库把这些小块分配给子进程。具体来说,我在主进程中遍历文件,记录下我想要分割的字节位置,然后把这些位置传给子进程,子进程再打开输入文件,使用 file.readlines(chunk_size) 来读取它们的那一块内容。不过,我发现读取到的小块似乎比我设定的 sizehint 参数大了很多(大约是 4 倍)。

问题

为什么 sizehint 的设置没有被遵循呢?

示例代码

以下代码展示了我遇到的问题:

import sys

# set test chunk size to 2KB
chunk_size = 1024 * 2

count = 0
chunk_start = 0
chunk_list = []

fi = open('test.txt', 'r')
while True:
    # increment chunk counter
    count += 1

    # calculate new chunk end, advance file pointer
    chunk_end = chunk_start + chunk_size
    fi.seek(chunk_end)

    # advance file pointer to end of current line so chunks don't have broken 
    # lines
    fi.readline() 
    chunk_end = fi.tell()

    # record chunk start and stop positions, chunk number
    chunk_list.append((chunk_start, chunk_end, count))

    # advance start to current end
    chunk_start = chunk_end

    # read a line to confirm we're not past the end of the file
    line = fi.readline()
    if not line:
        break

    # reset file pointer from last line read
    fi.seek(chunk_end, 0)

fi.close()

# This code represents the action taken by subprocesses, but each subprocess
# receives one chunk instead of iterating the list of chunks itself.
with open('test.txt', 'r', 0) as fi:
    # iterate over chunks
    for chunk in chunk_list:
        chunk_start, chunk_end, chunk_num = chunk

        # advance file pointer to chunk start
        fi.seek(chunk_start, 0)

        # print some notes and read in the chunk
        sys.stdout.write("Chunk #{0}: Size: {1} Start {2} Real Start: {3} Stop {4} "
              .format(chunk_num, chunk_end-chunk_start, chunk_start, fi.tell(), chunk_end))
        chunk = fi.readlines(chunk_end - chunk_start)
        print("Real Stop: {0}".format(fi.tell()))

        # write the chunk out to a file for examination
        with open('test_chunk{0}'.format(chunk_num), 'w') as fo:
            fo.writelines(chunk)

结果

我用一个大约 23.3KB 的输入文件(test.txt)运行了这段代码,得到了以下输出:

块 #1: 大小: 2052 开始 0 实际开始: 0 停止 2052 实际停止: 8193
块 #2: 大小: 2051 开始 2052 实际开始: 2052 停止 4103 实际停止: 10248
块 #3: 大小: 2050 开始 4103 实际开始: 4103 停止 6153 实际停止: 12298
块 #4: 大小: 2050 开始 6153 实际开始: 6153 停止 8203 实际停止: 14348
块 #5: 大小: 2050 开始 8203 实际开始: 8203 停止 10253 实际停止: 16398
块 #6: 大小: 2050 开始 10253 实际开始: 10253 停止 12303 实际停止: 18448
块 #7: 大小: 2050 开始 12303 实际开始: 12303 停止 14353 实际停止: 20498
块 #8: 大小: 2050 开始 14353 实际开始: 14353 停止 16403 实际停止: 22548
块 #9: 大小: 2050 开始 16403 实际开始: 16403 停止 18453 实际停止: 23893
块 #10: 大小: 2050 开始 18453 实际开始: 18453 停止 20503 实际停止: 23893
块 #11: 大小: 2050 开始 20503 实际开始: 20503 停止 22553 实际停止: 23893
块 #12: 大小: 2048 开始 22553 实际开始: 22553 停止 24601 实际停止: 23893

每个块的大小大约都是 2KB,所有的开始和停止位置都对得上,fi.tell() 报告的实际文件位置也看起来正确,所以我觉得我的分块算法是没问题的。不过,实际的停止位置显示 readlines() 读取的内容远超出我设定的大小提示。而且,输出文件 #1 到 #8 的大小都是 8.0KB,这比我设定的大小提示要大得多。

即使我尝试只在行尾进行分块的做法不对,readlines() 也不应该读取超过 2KB 加一行的内容。文件 #9 到 #12 的大小逐渐变小,这很合理,因为这些块的起始点越来越接近文件的末尾,而 readlines() 不会读取超过文件末尾的内容。

备注

  1. 我的测试输入文件每行简单地打印了 "< 行号 >\n",从 1 到 5000。
  2. 我尝试了不同的块和输入文件大小,结果类似。
  3. readlines 的文档说读取的大小可能会被向上舍入到内部缓冲区的大小,所以我尝试了不使用缓冲区打开文件(如上所示),但没有任何区别。
  4. 我使用这个算法来分割文件,因为我需要支持 *.bz2 和 *.gz 压缩文件,而 *.gz 文件在不解压的情况下无法识别未压缩的文件大小。*.bz2 文件也没有这个功能,但我可以从文件末尾向前查找 0 字节,并使用 fi.tell() 获取文件大小。请参见我相关的问题
  5. 在需要支持压缩文件的要求之前,脚本的前一个版本使用 os.path.getsize() 作为分块循环的停止条件,readlines 在那种情况下似乎工作得很好。

2 个回答

0

这虽然没有直接回答你的问题,但或许能帮到你...

我觉得可能有更好的方法来处理你的文件,这样就能避免你现在遇到的问题。这只是我的想法,不过既然文件是可以逐步读取的,那像这样做是否可行呢?

import bzip2
import gzip
from multiprocessing import Pool, cpu_count


def chunker(filepath):
    """define and yield chunks"""
    if filepath.endswith(".bz"):
        read_open = bzip2.open
    elif filepath.endswith(".gz"):
        read_open = gzip.open

    with read_open(filepath) as in_f:        
        delim = "something"
        chunk = []
        for line in in_f:
            if delim not in line:
                chunk.append(line)
            else:
                current, next_ = line.split(delim)
                chunk.append(current)
                yield chunk
                chunk = [next_]
        if chunk:
            yield chunk

def process_chunk(chunk):
    # do magic
    return 

if __name__ == '__main__':
    filepath = ""
    chunk_iter = chunker(filepath)

    pool = Pool(processes=cpu_count() - 1)
    for result in pool.imap(process_chunk, chunk_iter , chunksize=1)
        print result

或者,如果你已经在用一种方法一次性读取并生成了文件块的列表,那为什么不在读取的时候把每个文件块写成单独的文件呢(前提是你有足够的磁盘空间)?这样你就可以给一个工作池提供一系列文件路径来处理。

另外,如果你的工作者处理文件块的速度足够快,并且你有足够的内存,你可以在读取的时候把整个文件块传给一个队列。然后,工作者就可以从队列中获取文件块。

2

文档中提到的 readlines 的缓冲区和你在 open 函数的第三个参数控制的缓冲区不是一回事。这里说的缓冲区是指 file_readlines 中的这个缓冲区:

static PyObject *
file_readlines(PyFileObject *f, PyObject *args)
{
    long sizehint = 0;
    PyObject *list = NULL;
    PyObject *line;
    char small_buffer[SMALLCHUNK];

其中 SMALLCHUNK 是在之前定义的:

#if BUFSIZ < 8192
#define SMALLCHUNK 8192
#else
#define SMALLCHUNK BUFSIZ
#endif

我不知道 BUFSIZ 是从哪里来的,但看起来你遇到的是 #define SMALLCHUNK 8192 的情况。无论如何,readlines 永远不会使用小于 8 KiB 的缓冲区,所以你可能需要把你的块设置得比这个更大。

撰写回答