使用多进程Pool并行运行Python批处理

0 投票
1 回答
3240 浏览
提问于 2025-04-18 16:14

我需要写一段代码来处理大约25万个输入文件,并且是批量处理。我看到了一篇帖子:https://codereview.stackexchange.com/questions/20416/python-parallelization-using-popen

但是我不知道怎么把这个实现到我的代码里。

我想要的

  • 我希望能给每个进程分配特定数量的核心,换句话说,就是在某个时间点上,只有特定数量的进程可以同时运行。

  • 如果一个进程完成了,另一个进程应该接替它的位置。

我的代码(使用subprocess)

Main.py

import subprocess
import os
import multiprocessing
import time
MAXCPU = multiprocessing.cpu_count()

try:
    cp = int(raw_input("Enter Number of CPU's to use (Total %d) = "%MAXCPU))
    assert cp <= MAXCPU
except:
    print "Bad command taking all %d cores"%MAXCPU
    cp =MAXCPU  # set MAXCPU as CPU

list_pdb = [i for i in os.listdir(".") if i.endswith(".pdb")]  # Input PDB files
assert len(list_pdb) != 0

c = {}
d = {}
t = {}

devnull = file("Devnull","wb")
for each in range(0, len(list_pdb), cp):   # Number of cores in Use = 4
    for e in range(cp):
        if each + e < len(list_pdb):
            args = ["sh", "Child.sh", list_pdb[each + e], str(cp)]
            p = subprocess.Popen(args, shell=False,
                stdout=devnull, stderr=devnull)
            c[p.pid] = p
            print "Started Process : %s" % list_pdb[each + e]
    while c:
        print c.keys()
        pid, status = os.wait()
        if pid in c:
            print "Ended Process"
            del c[pid]
devnull.close()

Child.sh

#!/bin/sh
sh grand_Child.sh
sh grand_Child.sh
sh grand_Child.sh
sh grand_Child.sh
# Some heavy processes with $1

grand_Child.sh

#!/bin/sh
sleep 5

输出

在这里输入图片描述

1 个回答

3

这里有一段使用 multiprocessing.Pool 的代码。它简单多了,因为这个模块几乎完成了所有的工作!

这个版本还做了:

  • 很多日志记录,显示进程开始和结束的时间

  • 打印出将要处理的文件数量

  • 允许你同时处理超过 CPU 数量的任务

在运行多进程任务时,通常最好运行的进程数量要比 CPU 多。因为不同的进程在等待输入输出时,不会一直占用 CPU。很多人会选择运行 2n+1 的进程数,比如在一个有 4 个 CPU 的系统上,他们会运行 2*4+1,也就是 9 个进程来处理任务。(我一般会固定使用“5”或“10”,除非有特别的理由去改变,哈哈,我就是懒 :))

祝你玩得开心!

源代码

import glob
import multiprocessing
import os
import subprocess

MAXCPU = multiprocessing.cpu_count()
TEST = False

def do_work(args):
    path,numproc = args
    curproc = multiprocessing.current_process()
    print curproc, "Started Process, args={}".format(args)
    devnull = open(os.devnull, 'w')
    cmd = ["sh", "Child.sh", path, str(numproc)]
    if TEST:
        cmd.insert(0, 'echo')
    try:
        return subprocess.check_output(
            cmd, shell=False,
            stderr=devnull,
        )
    finally:
        print curproc, "Ended Process"

if TEST:
    cp = MAXCPU
    list_pdb = glob.glob('t*.py')
else:
    cp = int(raw_input("Enter Number of processes to use (%d CPUs) = " % MAXCPU))
    list_pdb = glob.glob('*.pdb') # Input PDB files
# assert cp <= MAXCPU

print '{} files, {} procs'.format(len(list_pdb), cp)
assert len(list_pdb) != 0

pool = multiprocessing.Pool(cp)
print pool.map(
    do_work, [ (path,cp) for path in list_pdb ],
)
pool.close()
pool.join()

输出结果

27 files, 4 procs
<Process(PoolWorker-2, started daemon)> Started Process, args=('tdownload.py', 4)
<Process(PoolWorker-2, started daemon)> Ended Process
<Process(PoolWorker-2, started daemon)> Started Process, args=('tscapy.py', 4)
<Process(PoolWorker-2, started daemon)> Ended Process

撰写回答