在Python中高效递归遍历文件目录并最小化内存使用
我有一大堆文件想要逐个检查,并计算它们的md5校验和。
这些文件有很多存储在不同的物理硬盘上,但它们都挂载在同一个目录里:
/mnt/drive1/dir1/file.jpg
/mnt/drive2/dir1/file2.jpg
我该如何在不把整个目录和文件结构都加载到内存中的情况下,逐层检查/mnt这个目录呢?
有没有办法用多个线程来实现这个?虽然可能不需要用多个线程/进程去逐层检查目录,但文件操作可能会消耗很多CPU资源,使用多个CPU核心会更有效。
提前谢谢你们。
4 个回答
不确定有没有一个通用的正确答案,但也许你可以从简单的开始,进行一些基准测试,看看哪些地方是瓶颈。
选项 1
可以用 find /mnt -type f
命令生成一个文件列表,然后把这个列表传给你的脚本。这样可以利用多个工作线程(multiprocessing
)来并行处理,但你可能需要根据物理设备来拆分这个列表,让每个线程处理一个设备。如果连接的存储设备比较慢,这一点可能很重要。
选项 2
生成一个顶层目录的列表(比如深度最大为 1 或 2),让线程分别在每个分区上操作。可以用 Linux 的 find -type d
命令或者 Python 的 os.walk()
来实现。
在你还不知道内存使用情况有多糟糕之前,我建议不要太担心加载太多内容到内存中。也就是说,在你进行基准测试之前。还有,我觉得 os.walk
或者文件列表不会造成严重的内存问题。如果我错了,请纠正我。
稍微详细解释一下,我的意思是这段代码会创建一个进程池(进程的数量是通过命令行指定的),然后它会打开当前目录下的每个文件,对每个文件进行50次压缩,并计算出它的CRC值。接着,它会把所有的CRC值进行异或运算,然后打印出来。
import multiprocessing
import os
import sys
import zlib
NUM_PROCS = int(sys.argv[1])
def processFile(filepath):
infile = open(filepath, 'r')
contents = infile.read()
for i in xrange(50):
contents = zlib.compress(contents)
return zlib.crc32(contents)
def generateFilepaths():
for (dirpath, dirnames, filenames) in os.walk('.'):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
yield filepath
if __name__ == '__main__':
pool = multiprocessing.Pool(NUM_PROCS)
fullCrc = 0
for crc in pool.imap_unordered(processFile, generateFilepaths()):
fullCrc ^= crc
print fullCrc
需要注意的是,即使不对每个文件做这么多的工作,这个程序仍然会受到输入输出的限制。因此,使用多线程可能不会让你获得太大的速度提升。
为什么不直接用简单的 os.walk 呢?它不会占用太多内存。
import os
for root, dirs, files in os.walk('/mnt'):
for name in files:
print os.path.join(root, name)
import multiprocessing
import os.path
import hashlib
import sys
VALID_EXTENSIONS = ('.JPG', '.GIF', '.JPEG')
MAX_FILE_SZ = 1000000
def md5_file(fname):
try:
with open(fname) as fo:
m = hashlib.md5()
chunk_sz = m.block_size * 128
data = fo.read(chunk_sz)
while data:
m.update(data)
data = fo.read(chunk_sz)
md5_file.queue.put((fname, m.hexdigest()))
except IOError:
md5_file.queue.put((fname, None))
def is_valid_file(fname):
ext = os.path.splitext(fname)[1].upper()
fsz = os.path.getsize(fname)
return ext in VALID_EXTENSIONS and fsz <= MAX_FILE_SZ
def init(queue):
md5_file.queue = queue
def main():
# Holds tuple (fname, md5sum) / md5sum will be none if an IOError occurs
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(None, init, [queue])
for dirpath, dirnames, filenames in os.walk(sys.argv[1]):
# Convert filenames to full paths...
full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
filenames)
full_path_fnames = filter(is_valid_file, full_path_fnames)
pool.map(md5_file, full_path_fnames)
# Dump the queue
while not queue.empty():
print queue.get()
return 0
if __name__ == '__main__':
sys.exit(main())
这可能不是最完美的解决方案,但对我来说是有效的。你可能需要稍微调整一下,以便能看到它在做什么。
出于某种奇怪的原因,你不能共享一个全局队列。所以,我不得不使用池的 initializer
函数。我也不太明白为什么会这样。
只需将要处理的根目录作为唯一的参数传入,完成后它会输出 md5 校验和。