python作业管理系统
xqute的Python项目详细描述
xqute公司
python作业管理系统
特点
- 以异步方式编写
- 插件系统
- 调度适配器
- 作业重试/失败时管道停止
安装
pip install xqute
玩具的例子
^{pr2}$美国石油学会
https://pwwang.github.io/xqute/
使用
Xqute对象
xqute由以下方式初始化:
xqute=Xqute(...)
可用参数包括:
- 调度程序:调度程序类或名称
- 插件:要为此会话启用/禁用的插件
- 作业metadir:作业元目录(默认:
./.xqute/
) - 作业错误策略:发生错误时的策略
- job_num_retries:作业错误策略为retry时的最大重试次数
- 作业提交批次:提交作业的使用者数量
- 调度程序_forks:作业分叉的最大数量
- **scheduler_opts:调度程序的其他关键字参数
请注意,生产者必须在事件循环中初始化。在
要将作业推入队列,请执行以下操作:
awaitxqute.put(['echo',1])
使用SGE调度程序
xqute=Xqute('sge',scheduler_forks=100,qsub='path to qsub',qdel='path to qdel',qstat='path to qstat',sge_q='1-day',...)
名称以sge_
开头的关键字参数将被解释为qsub
选项。list
或{sge_l=['h_vmem=2G', 'gpu=1']
将在打包脚本中展开,如下所示:
# ...#$ -l h_vmem=2G#$ -l gpu=1# ...
插件
要为xqute
编写插件,需要实现以下钩子:
on_init(scheduler)
:在调度程序对象初始化之后on_shutdown(scheduler, sig)
:当调度程序关闭时on_job_init(scheduler, job)
:初始化作业时on_job_queued(scheduler, job)
:作业排队时on_job_submitted(scheduler, job)
:提交作业时on_job_killing(scheduler, job)
:当作业被终止时on_job_killed(scheduler, job)
:当作业被终止时on_job_failed(scheduler, job)
:作业失败时on_job_succeeded(scheduler, job)
:作业成功时
请注意,除了on_init
和on_shutdown
之外,所有的钩子都是corotine,这意味着您还应该将它们实现为corotine(允许同步实现,但会发出警告)。在
要实现钩子,必须获取插件管理器:
fromsimplugimportSimplugpm=Simplug('xqute')# orfromxquteimportsimplugaspm
然后使用装饰器pm.impl
:
@pm.impldefon_init(scheduler):...
实现调度程序
目前只有2个内置调度器:local
和{
可以通过子类化Scheduler
抽象类来实现调度器。子类中必须实现三个抽象方法:
fromxquteimportScheduerclassMyScheduler(Scheduler):name='my'job_class:MyJobasyncdefsubmit_job(self,job):"""How to submit a job, return a unique id in the scheduler system (the pid for local scheduler for example) """asyncdefkill_job(self,job):"""How to kill a job"""asyncdefjob_is_running(self,job):"""Check if a job is running The uid can be retrieved from job.lock_file """
如您所见,我们可能还需要在MyScheduler
之前实现一个作业类。要实现的唯一抽象方法是wrap_cmd
:
fromxquteimportJobclassMyJob(Job):asyncdefwrap_cmd(self,scheduler):...
您必须在包装好的脚本中使用trap命令来更新作业状态、返回代码并清除锁定文件。在
- 项目
标签: