使用Python和PBS在集群上进行“尴尬并行”编程
我有一个函数(神经网络模型),它会生成一些图形。我想在Python中使用PBS在一个标准集群上测试多个参数、方法和不同的输入(这意味着要运行这个函数几百次)。
注意:我尝试过parallelpython、ipython等,但从来没有完全满意过,因为我想要一个更简单的解决方案。这个集群的配置是我无法更改的,所以一个将Python和qsub结合起来的解决方案肯定会对大家有帮助。
为了简化问题,我有一个简单的函数,比如:
import myModule
def model(input, a= 1., N=100):
do_lots_number_crunching(input, a,N)
pylab.savefig('figure_' + input.name + '_' + str(a) + '_' + str(N) + '.png')
其中input
是一个表示输入的对象,input.name
是一个字符串,而do_lots_number_crunching
可能会持续几个小时。
我的问题是:有没有一种正确的方法可以将像这样的参数扫描
for a in pylab.linspace(0., 1., 100):
model(input, a)
转换成“某种东西”,以便为每次调用model
函数时启动一个PBS脚本?
#PBS -l ncpus=1
#PBS -l mem=i1000mb
#PBS -l cput=24:00:00
#PBS -V
cd /data/work/
python experiment_model.py
我在考虑一个函数,它会包含PBS模板,并从Python脚本中调用它,但我还没有想明白(装饰器?)。
4 个回答
看起来我来得有点晚,但几年前我也曾有过同样的问题,想知道怎么把那些可以轻松并行处理的问题在Python中映射到一个集群上,所以我写了自己的解决方案。最近我把它上传到了GitHub,链接在这里:https://github.com/plediii/pbs_util
要使用pbs_util来写你的程序,首先我会在工作目录里创建一个名为pbs_util.ini的文件,内容是:
[PBSUTIL]
numnodes=1
numprocs=1
mem=i1000mb
walltime=24:00:00
然后写一个像这样的Python脚本:
import pbs_util.pbs_map as ppm
import pylab
import myModule
class ModelWorker(ppm.Worker):
def __init__(self, input, N):
self.input = input
self.N = N
def __call__(self, a):
myModule.do_lots_number_crunching(self.input, a, self.N)
pylab.savefig('figure_' + self.input.name + '_' + str(a) + '_' + str(self.N) + '.png')
# You need "main" protection like this since pbs_map will import this file on the compute nodes
if __name__ == "__main__":
input, N = something, picklable
# Use list to force the iterator
list(ppm.pbs_map(ModelWorker, pylab.linspace(0., 1., 100),
startup_args=(input, N),
num_clients=100))
这样就可以了。
你可以很简单地使用jug(这是我为类似的设置开发的工具)。
你需要在一个文件里写代码(比如,model.py
):
@TaskGenerator
def model(param1, param2):
res = complex_computation(param1, param2)
pyplot.coolgraph(res)
for param1 in np.linspace(0, 1.,100):
for param2 in xrange(2000):
model(param1, param2)
就这样!
现在你可以在你的队列上运行“jug jobs”:jug execute model.py
,这会自动进行并行处理。每个任务会在一个循环中做类似这样的事情:
while not all_done():
for t in tasks in tasks_that_i_can_run():
if t.lock_for_me(): t.run()
(其实比这要复杂,但你明白我的意思了)。
它使用文件系统来进行锁定(如果你在NFS系统上),或者如果你愿意的话可以使用redis服务器。它还可以处理任务之间的依赖关系。
这可能不是你最初想要的,但我觉得将这部分与任务队列系统分开会更清晰。
pbs_python[1] 可能适合这个需求。如果你在运行 experiment_model.py 时传入 'a' 作为参数,你可以这样做:
import pbs, os
server_name = pbs.pbs_default()
c = pbs.pbs_connect(server_name)
attopl = pbs.new_attropl(4)
attropl[0].name = pbs.ATTR_l
attropl[0].resource = 'ncpus'
attropl[0].value = '1'
attropl[1].name = pbs.ATTR_l
attropl[1].resource = 'mem'
attropl[1].value = 'i1000mb'
attropl[2].name = pbs.ATTR_l
attropl[2].resource = 'cput'
attropl[2].value = '24:00:00'
attrop1[3].name = pbs.ATTR_V
script='''
cd /data/work/
python experiment_model.py %f
'''
jobs = []
for a in pylab.linspace(0.,1.,100):
script_name = 'experiment_model.job' + str(a)
with open(script_name,'w') as scriptf:
scriptf.write(script % a)
job_id = pbs.pbs_submit(c, attropl, script_name, 'NULL', 'NULL')
jobs.append(job_id)
os.remove(script_name)
print jobs
[1]: https://oss.trac.surfsara.nl/pbs_python/wiki/TorqueUsage pbs_python