在Python中高效递归遍历文件目录并最小化内存使用

2 投票
4 回答
2512 浏览
提问于 2025-04-18 08:03

我有一大堆文件想要逐个检查,并计算它们的md5校验和。

这些文件有很多存储在不同的物理硬盘上,但它们都挂载在同一个目录里:

/mnt/drive1/dir1/file.jpg
/mnt/drive2/dir1/file2.jpg    

我该如何在不把整个目录和文件结构都加载到内存中的情况下,逐层检查/mnt这个目录呢?

有没有办法用多个线程来实现这个?虽然可能不需要用多个线程/进程去逐层检查目录,但文件操作可能会消耗很多CPU资源,使用多个CPU核心会更有效。

提前谢谢你们。

4 个回答

0

不确定有没有一个通用的正确答案,但也许你可以从简单的开始,进行一些基准测试,看看哪些地方是瓶颈。

选项 1

可以用 find /mnt -type f 命令生成一个文件列表,然后把这个列表传给你的脚本。这样可以利用多个工作线程(multiprocessing)来并行处理,但你可能需要根据物理设备来拆分这个列表,让每个线程处理一个设备。如果连接的存储设备比较慢,这一点可能很重要。

选项 2

生成一个顶层目录的列表(比如深度最大为 1 或 2),让线程分别在每个分区上操作。可以用 Linux 的 find -type d 命令或者 Python 的 os.walk() 来实现。


在你还不知道内存使用情况有多糟糕之前,我建议不要太担心加载太多内容到内存中。也就是说,在你进行基准测试之前。还有,我觉得 os.walk 或者文件列表不会造成严重的内存问题。如果我错了,请纠正我。

2

稍微详细解释一下,我的意思是这段代码会创建一个进程池(进程的数量是通过命令行指定的),然后它会打开当前目录下的每个文件,对每个文件进行50次压缩,并计算出它的CRC值。接着,它会把所有的CRC值进行异或运算,然后打印出来。

import multiprocessing
import os
import sys
import zlib

NUM_PROCS = int(sys.argv[1])

def processFile(filepath):
    infile = open(filepath, 'r')
    contents = infile.read()
    for i in xrange(50):
        contents = zlib.compress(contents)
    return zlib.crc32(contents)

def generateFilepaths():
    for (dirpath, dirnames, filenames) in os.walk('.'):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            yield filepath

if __name__ == '__main__':
    pool = multiprocessing.Pool(NUM_PROCS)

    fullCrc = 0
    for crc in pool.imap_unordered(processFile, generateFilepaths()):
        fullCrc ^= crc

    print fullCrc

需要注意的是,即使不对每个文件做这么多的工作,这个程序仍然会受到输入输出的限制。因此,使用多线程可能不会让你获得太大的速度提升。

2

为什么不直接用简单的 os.walk 呢?它不会占用太多内存。

import os
for root, dirs, files in os.walk('/mnt'):
    for name in files:
        print os.path.join(root, name)
1
import multiprocessing
import os.path
import hashlib
import sys


VALID_EXTENSIONS = ('.JPG', '.GIF', '.JPEG')
MAX_FILE_SZ = 1000000


def md5_file(fname):
    try:
        with open(fname) as fo:
            m = hashlib.md5()
            chunk_sz = m.block_size * 128
            data = fo.read(chunk_sz)
            while data:
                m.update(data)
                data = fo.read(chunk_sz)
        md5_file.queue.put((fname, m.hexdigest()))
    except IOError:
        md5_file.queue.put((fname, None))


def is_valid_file(fname):
    ext = os.path.splitext(fname)[1].upper()
    fsz = os.path.getsize(fname)
    return ext in VALID_EXTENSIONS and fsz <= MAX_FILE_SZ


def init(queue):
    md5_file.queue = queue


def main():
    # Holds tuple (fname, md5sum) / md5sum will be none if an IOError occurs
    queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(None, init, [queue])

    for dirpath, dirnames, filenames in os.walk(sys.argv[1]):
        # Convert filenames to full paths...
        full_path_fnames = map(lambda fn: os.path.join(dirpath, fn), 
                               filenames)
        full_path_fnames = filter(is_valid_file, full_path_fnames)
        pool.map(md5_file, full_path_fnames)

    # Dump the queue
    while not queue.empty():
        print queue.get()
    return 0

if __name__ == '__main__':
    sys.exit(main())

这可能不是最完美的解决方案,但对我来说是有效的。你可能需要稍微调整一下,以便能看到它在做什么。

出于某种奇怪的原因,你不能共享一个全局队列。所以,我不得不使用池的 initializer 函数。我也不太明白为什么会这样。

只需将要处理的根目录作为唯一的参数传入,完成后它会输出 md5 校验和。

撰写回答