大规模并行操作变得容易
minionize的Python项目详细描述
基本原理
- 您编写了一个带有一些参数的程序a.out
- 您需要探索参数的空间(或者至少是其中的一大部分)
Minionize是一种在大规模 平行的方式。通过最小化您的程序,仆从将 它们从各种来源(例如文件系统、pub/sub)输入并运行它。阿尔索 输入可以被确认或重新交付给其他仆从。在
您还可以将^{str1}$minionie看作是“| xargs”的等价物 壳体结构,但:
- pipe |可以从许多不同的源传输数据(不是 仅来自上一进程的stdout)
- xargs可以在将接收到的数据发送到 程序。在
也就是说,minionie是您的besteffort and idempotent的好选择 在科学试验台(Igrida,网格5000)上进行计算,如果需要也可以 一种将二进制文件转换为微服务的快速方法(例如Kubernetes)。在
minionie提供了一些现成的可观测性特性: 它公开了一个api来检索统计信息,并公开了一个基本的cli来显示 他们(老实说,我梦见了一个mtop:htop的仆从)。但是 目前它看起来差不多是这样的:
它是如何工作的
执行上述操作的经典模式是应用主/工作模式 主人把任务交给工人。工人们不断地获取新任务 从队列中运行它并向主机报告其状态(例如成功, 失败)。minionie以某种方式应用了此模式,但是masterless 开箱即用。实际上,现代队列实现将api公开给 确认/重新排列消息。在
目前我们支持:
- 对于pipe |部分:
- ^基于{tt9}$的队列:队列存储在集群中的共享文件系统中(不涉及外部进程)
- Google pub/sub的队列:队列由Google托管
- Apache pulsar,一个可以自行托管的发布/订阅系统
- file,从常规文件中获取输入(例如stdin)
- 对于xargs部分:
- processes:在接收到一个进程时启动孤立的程序。在
- functions:接收后启动python函数。在
- docker:接收后启动docker容器。在
一些例子
最简单的用例
在这种情况下,接收到的参数将附加到 迷你程序。如果你需要更多的参数控制 写你自己的回叫。在
使用Execo引擎:
在# Install the execo minionizer pip install minionize[execo]# Create the queue of params # You'll have to run this prior to launching your minions (adapt to # your need / make a regular script) $) python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(10), save_sweeps=True)"# start your minions $)MINION_ENGINE=execo minionize echo hello hello 0 hello 1 hello 2 hello 3 hello 4 hello 5 hello 6 hello 7 hello 8 hello 9
Note
In other words the ^{tt17}$ wrapper lets you populate the queue with strings representing the parameter of your command line
记录一些统计信息:您需要设置一个Reporter来报告您的统计信息。在
在# Install the execo minionizer pip install minionize[execo]# Create the queue of params # You'll have to run this prior to launching your minions (adapt to # your need / make a regular script) $) python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(10), save_sweeps=True)"# start your minions MINION_ENGINE=execo MINION_REPORTER=json minionize sleep # read the stats (while running or no) MINION_REPORTER=json minion-status
在OAR集群(Igrida/Grid5000)上:
例如使用Execo生成队列
python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(1000), save_sweeps=True)"
- 创建oar扫描脚本:
- 启动你的仆从
echo"MINION_ENGINE=execo" > .env oarsub --array 10 -S ./oar.sh
注意
.env文件在缩小开始时被读取,以便扫描脚本可以 无论使用什么引擎,都要保持不变。在
- 输出示例:
$) cat OAR.1287856.stdout [...] hello from 1287856135 hello from 1287856139 hello from 1287856143 hello from 1287856147 hello from 1287856151 hello from 1287856155 hello from 1287856159 hello from 1287856163 hello from 1287856167[...]
在注意
正如预期的那样,参数已经分配给了不同的仆从
自定义回调
发送到程序的参数可以是任何东西(例如python dict)。在 在某些情况下(实际上很多情况下),您需要将这些参数转换为 你的程序可以理解的东西。所以你需要告诉 minionie如何最小化。这是通过使用特定的回调来实现的。在
编写自定义回调的最简单方法是继承 ProcessCallback或FuncCallback。有了这些回电你就不知道了'吨 不得不担心确认逻辑。在
# a.out is invoked like this: a.out --arg1 varg1 varg2# but the queue holds json like object:# {"arg1": varg11, "arg2": varg21}, {"arg1": varg12, "arg2": varg22} ...# we can write a custom ProcessCallback which overrides the to_cmd methodclassMyProcessCallBack(ProcessCallback):defto_cmd(param:Param):returnf"a.out --arg1 {param['arg1']}{param['arg2']}"m=minionizer(MyProcessCallback())m.run()
# you want to minionize a python function `my_function`# but the queue holds json like object:# {"arg1": varg11, "arg2": varg21}, {"arg1": varg12, "arg2": varg22} ...# we can use the FuncCallback for this purposedefmyfunc(...)# this is your functiondef_myfunc(param:Param)# this is the wrapper which invokes myfunc based on the paramsreturnmyfunc(param["arg1"],param["arg2"])m=minionizer(FuncCallback(_myfunc))m.run()
环境变量
minionie是使用环境变量配置的。 默认情况下,它读取当前目录中的.env文件,但不读取 重写现有的系统环境变量。在
默认值
-------------------------------------------- # which engine (queue implementation) to use MINION_ENGINE=execo # google, pulsar # Execo EXECO_PERSISTENCE_DIR=sweeps # Google GOOGLE_PROJECT_ID=/mandatory/ GOOGLE_TOPIC_ID=/mandatory/ GOOGLE_SUBSCRIPTIOn=/mandatory/ GOOGLE_APPLICATION_CREDENTIALS=/mandatory/ GOOGLE_DECODER=identity # Pulsar PULSAR_CONNECTION=pulsar://localhost:6650 PULSAR_TOPIC=/mandatory/ PULSAR_DECODER=identity --------------------------------------------- # Stat reporter MINION_REPORTER=null # json, stdout # Json REPORTER_JSON_DIRECTORY=minion-report
路线图
- 作为docker入口点易于集成
- minionie python函数(例如@minionie decorator)
- 支持新队列(Apache pulsar、Redis stream、RabbitMQ、Kakfa…)
- 支持新的抽象来运行基于容器的应用程序(docker,singularity…)
- 自动封装使用。最小值.yml在
- 仆从统计
- 保持联系(matthieu dot simonin at inria dot fr)
- 项目
标签: