使用多进程Pool并行运行Python批处理
我需要写一段代码来处理大约25万个输入文件,并且是批量处理。我看到了一篇帖子:https://codereview.stackexchange.com/questions/20416/python-parallelization-using-popen
但是我不知道怎么把这个实现到我的代码里。
我想要的
我希望能给每个进程分配特定数量的核心,换句话说,就是在某个时间点上,只有特定数量的进程可以同时运行。
如果一个进程完成了,另一个进程应该接替它的位置。
我的代码(使用subprocess)
Main.py
import subprocess
import os
import multiprocessing
import time
MAXCPU = multiprocessing.cpu_count()
try:
cp = int(raw_input("Enter Number of CPU's to use (Total %d) = "%MAXCPU))
assert cp <= MAXCPU
except:
print "Bad command taking all %d cores"%MAXCPU
cp =MAXCPU # set MAXCPU as CPU
list_pdb = [i for i in os.listdir(".") if i.endswith(".pdb")] # Input PDB files
assert len(list_pdb) != 0
c = {}
d = {}
t = {}
devnull = file("Devnull","wb")
for each in range(0, len(list_pdb), cp): # Number of cores in Use = 4
for e in range(cp):
if each + e < len(list_pdb):
args = ["sh", "Child.sh", list_pdb[each + e], str(cp)]
p = subprocess.Popen(args, shell=False,
stdout=devnull, stderr=devnull)
c[p.pid] = p
print "Started Process : %s" % list_pdb[each + e]
while c:
print c.keys()
pid, status = os.wait()
if pid in c:
print "Ended Process"
del c[pid]
devnull.close()
Child.sh
#!/bin/sh
sh grand_Child.sh
sh grand_Child.sh
sh grand_Child.sh
sh grand_Child.sh
# Some heavy processes with $1
grand_Child.sh
#!/bin/sh
sleep 5
输出
1 个回答
3
这里有一段使用 multiprocessing.Pool
的代码。它简单多了,因为这个模块几乎完成了所有的工作!
这个版本还做了:
很多日志记录,显示进程开始和结束的时间
打印出将要处理的文件数量
允许你同时处理超过 CPU 数量的任务
在运行多进程任务时,通常最好运行的进程数量要比 CPU 多。因为不同的进程在等待输入输出时,不会一直占用 CPU。很多人会选择运行 2n+1 的进程数,比如在一个有 4 个 CPU 的系统上,他们会运行 2*4+1,也就是 9 个进程来处理任务。(我一般会固定使用“5”或“10”,除非有特别的理由去改变,哈哈,我就是懒 :))
祝你玩得开心!
源代码
import glob
import multiprocessing
import os
import subprocess
MAXCPU = multiprocessing.cpu_count()
TEST = False
def do_work(args):
path,numproc = args
curproc = multiprocessing.current_process()
print curproc, "Started Process, args={}".format(args)
devnull = open(os.devnull, 'w')
cmd = ["sh", "Child.sh", path, str(numproc)]
if TEST:
cmd.insert(0, 'echo')
try:
return subprocess.check_output(
cmd, shell=False,
stderr=devnull,
)
finally:
print curproc, "Ended Process"
if TEST:
cp = MAXCPU
list_pdb = glob.glob('t*.py')
else:
cp = int(raw_input("Enter Number of processes to use (%d CPUs) = " % MAXCPU))
list_pdb = glob.glob('*.pdb') # Input PDB files
# assert cp <= MAXCPU
print '{} files, {} procs'.format(len(list_pdb), cp)
assert len(list_pdb) != 0
pool = multiprocessing.Pool(cp)
print pool.map(
do_work, [ (path,cp) for path in list_pdb ],
)
pool.close()
pool.join()
输出结果
27 files, 4 procs
<Process(PoolWorker-2, started daemon)> Started Process, args=('tdownload.py', 4)
<Process(PoolWorker-2, started daemon)> Ended Process
<Process(PoolWorker-2, started daemon)> Started Process, args=('tscapy.py', 4)
<Process(PoolWorker-2, started daemon)> Ended Process