并行队列 - 多进程池,Python
我的目标是遍历一个文件夹,计算里面所有文件的MD5值。我参考了一个类似问题的代码来解决这个问题。
import os
import re
import sys
import time
import md5
from stat import S_ISREG
import multiprocessing
global queue
size_limit = 500000
target = sys.argv[1]
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
############Analysis and Multiprocessing####################
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_search(topdir):
"""yield up full pathname for only files we want to search"""
for fname in walk_files(topdir):
try:
# if it is a regular file and big enough, we want to search it
sr = os.stat(fname)
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
yield fname
except OSError:
pass
def worker_search_fn(fname):
fp = open(fname, 'rt')
# read one line at a time from file
contents = fp.read()
hash = md5.md5(contents)
global queue
print "enqueue"
queue.put(fname+'-'+hash.hexdigest())
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
################MAIN MAIN MAIN#######################
#kick of processes to md5 the files and wait till completeion
queue = multiprocessing.Queue()
pool = multiprocessing.Pool()
pool.map(worker_search_fn, files_to_search(target))
pool.close()
pool.join()
#Should be done, now lets send do our analysis
while not queue.empty():
print queue.get()
我在代码中添加了“print enqueue”这行,用来调试。我发现当递归遍历一个很大的文件夹时,代码确实会卡住。我不确定是否是因为两个进程同时在访问队列,导致了死锁。
也许有更好的方法来实现这个功能?这个结构不一定要是队列,但必须是无锁的,这样才能充分利用多进程。我想要并行地递归遍历一个文件夹并计算MD5值,完成后再对整个列表进行处理。为了调试,我只是打印出完成的队列。有什么建议吗?
2 个回答
1
因为大文件夹需要很长时间来执行 walk_files()
,所以这不是死锁。
而且……
可以去掉 pool.join()
。
multiprocessing.Pool().map()
会一直等到结果准备好,所以你不需要 pool.join()
。
3
不太清楚你的程序是受输入输出(I/O)限制还是受中央处理器(CPU)限制。简单来说,如果你的任务是I/O限制的,比如说减少磁盘读取次数,那么一个进程的表现可能会比多个进程更好。你可以通过设置不同的 nprocesses
值(下面会有示例)来检查哪种方式在你的情况下效果更好。
在这种情况下,你不需要使用队列:
#!/usr/bin/env python
import os
import sys
from hashlib import md5
from multiprocessing import Pool, freeze_support
from stat import S_ISREG
def walk_files(topdir):
"""yield up full pathname for each file in tree under topdir"""
for dirpath, dirnames, filenames in os.walk(topdir):
for fname in filenames:
pathname = os.path.join(dirpath, fname)
yield pathname
def files_to_process(topdir, size_limit):
"""yield up full pathname for only files we want to process"""
for fname in walk_files(topdir):
try: sr = os.stat(fname)
except OSError: pass
else:
# if it is a regular file and small enough, we want to process it
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit:
yield fname
def md5sum(fname):
with open(fname, 'rb') as fp:
# read all file at once
contents = fp.read()
hash = md5(contents)
return fname, hash.hexdigest()
def main(argv=None):
if argv is None:
argv = sys.argv
topdir = argv[1]
size_limit = 500000
nprocesses = 1
pool = Pool(processes=nprocesses)
files = files_to_process(topdir, size_limit)
for fname, hexdigest in pool.imap_unordered(md5sum, files):
print("%s\t%s" % (fname, hexdigest))
if __name__=="__main__":
freeze_support()
main()
示例
$ python md5sum.py .
./md5sum.py 9db44d3117673790f1061d4b8f00e8ce