大规模并行操作变得容易

minionize的Python项目详细描述


基本原理

  • 您编写了一个带有一些参数的程序a.out
  • 您需要探索参数的空间(或者至少是其中的一大部分)

Minionize是一种在大规模 平行的方式。通过最小化您的程序,仆从将 它们从各种来源(例如文件系统、pub/sub)输入并运行它。阿尔索 输入可以被确认或重新交付给其他仆从。在

您还可以将^{str1}$minionie看作是“| xargs”的等价物 壳体结构,但:

  • pipe |可以从许多不同的源传输数据(不是 仅来自上一进程的stdout)
  • xargs可以在将接收到的数据发送到 程序。在

也就是说,minionie是您的besteffort and idempotent的好选择 在科学试验台(Igrida,网格5000)上进行计算,如果需要也可以 一种将二进制文件转换为微服务的快速方法(例如Kubernetes)。在

minionie提供了一些现成的可观测性特性: 它公开了一个api来检索统计信息,并公开了一个基本的cli来显示 他们(老实说,我梦见了一个mtophtop的仆从)。但是 目前它看起来差不多是这样的:

https://gitlab.inria.fr/msimonin/minionize/-/raw/master/images/stats.png

它是如何工作的

执行上述操作的经典模式是应用主/工作模式 主人把任务交给工人。工人们不断地获取新任务 从队列中运行它并向主机报告其状态(例如成功, 失败)。minionie以某种方式应用了此模式,但是masterless 开箱即用。实际上,现代队列实现将api公开给 确认/重新排列消息。在

目前我们支持:

  • 对于pipe |部分:
    • ^基于{tt9}$的队列:队列存储在集群中的共享文件系统中(不涉及外部进程)
    • Google pub/sub的队列:队列由Google托管
    • Apache pulsar,一个可以自行托管的发布/订阅系统
    • file,从常规文件中获取输入(例如stdin)
  • 对于xargs部分:
    • processes:在接收到一个进程时启动孤立的程序。在
    • functions:接收后启动python函数。在
    • docker:接收后启动docker容器。在

一些例子

最简单的用例

在这种情况下,接收到的参数将附加到 迷你程序。如果你需要更多的参数控制 写你自己的回叫。在

  • 使用Execo引擎:

    # Install the execo minionizer
    pip install minionize[execo]# Create the queue of params
    # You'll have to run this prior to launching your minions (adapt to
    # your need / make a regular script)
    $) python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(10), save_sweeps=True)"# start your minions
    $)MINION_ENGINE=execo minionize echo hello
    hello 0
    hello 1
    hello 2
    hello 3
    hello 4
    hello 5
    hello 6
    hello 7
    hello 8
    hello 9

    Note

    In other words the ^{tt17}$ wrapper lets you populate the queue with strings representing the parameter of your command line

  • 记录一些统计信息:您需要设置一个Reporter来报告您的统计信息。在

    # Install the execo minionizer
    pip install minionize[execo]# Create the queue of params
    # You'll have to run this prior to launching your minions (adapt to
    # your need / make a regular script)
    $) python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(10), save_sweeps=True)"# start your minions
    MINION_ENGINE=execo MINION_REPORTER=json minionize sleep
    
    # read the stats (while running or no)
    MINION_REPORTER=json minion-status
    
  • 在OAR集群(Igrida/Grid5000)上:

    • 例如使用Execo生成队列

      python -c "from execo_engine.sweep import ParamSweeper; ParamSweeper('sweeps', sweeps=range(1000), save_sweeps=True)"
      • 创建oar扫描脚本:
      ^{pr2}$
      • 启动你的仆从
      echo"MINION_ENGINE=execo" > .env
      oarsub --array 10 -S ./oar.sh
      

      注意

      .env文件在缩小开始时被读取,以便扫描脚本可以 无论使用什么引擎,都要保持不变。在

      • 输出示例:
      $) cat OAR.1287856.stdout
      [...]
      hello from 1287856135
      hello from 1287856139
      hello from 1287856143
      hello from 1287856147
      hello from 1287856151
      hello from 1287856155
      hello from 1287856159
      hello from 1287856163
      hello from 1287856167[...]

      注意

      正如预期的那样,参数已经分配给了不同的仆从

自定义回调

发送到程序的参数可以是任何东西(例如python dict)。在 在某些情况下(实际上很多情况下),您需要将这些参数转换为 你的程序可以理解的东西。所以你需要告诉 minionie如何最小化。这是通过使用特定的回调来实现的。在

编写自定义回调的最简单方法是继承 ProcessCallbackFuncCallback。有了这些回电你就不知道了'吨 不得不担心确认逻辑。在

#   a.out is invoked like this: a.out --arg1 varg1 varg2#   but the queue holds json like object:#   {"arg1": varg11, "arg2": varg21}, {"arg1": varg12, "arg2": varg22} ...# we can write a custom ProcessCallback which overrides the to_cmd methodclassMyProcessCallBack(ProcessCallback):defto_cmd(param:Param):returnf"a.out --arg1 {param['arg1']}{param['arg2']}"m=minionizer(MyProcessCallback())m.run()
#   you want to minionize a python function `my_function`#   but the queue holds json like object:#   {"arg1": varg11, "arg2": varg21}, {"arg1": varg12, "arg2": varg22} ...# we can use the FuncCallback for this purposedefmyfunc(...)# this is your functiondef_myfunc(param:Param)# this is the wrapper which invokes myfunc based on the paramsreturnmyfunc(param["arg1"],param["arg2"])m=minionizer(FuncCallback(_myfunc))m.run()

环境变量

minionie是使用环境变量配置的。 默认情况下,它读取当前目录中的.env文件,但不读取 重写现有的系统环境变量。在

默认值

--------------------------------------------

# which engine (queue implementation) to use
MINION_ENGINE=execo # google, pulsar
# Execo
EXECO_PERSISTENCE_DIR=sweeps

# Google
GOOGLE_PROJECT_ID=/mandatory/
GOOGLE_TOPIC_ID=/mandatory/
GOOGLE_SUBSCRIPTIOn=/mandatory/
GOOGLE_APPLICATION_CREDENTIALS=/mandatory/
GOOGLE_DECODER=identity


# Pulsar
PULSAR_CONNECTION=pulsar://localhost:6650
PULSAR_TOPIC=/mandatory/
PULSAR_DECODER=identity

---------------------------------------------

# Stat reporter
MINION_REPORTER=null # json, stdout
# Json
REPORTER_JSON_DIRECTORY=minion-report

路线图

  • 作为docker入口点易于集成
  • minionie python函数(例如@minionie decorator)
  • 支持新队列(Apache pulsar、Redis stream、RabbitMQ、Kakfa…)
  • 支持新的抽象来运行基于容器的应用程序(docker,singularity…)
  • 自动封装使用。最小值.yml在
  • 仆从统计
  • 保持联系(matthieu dot simonin at inria dot fr)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
字符串Java字母替换无效   java Spring Roo JPA MS SQL Server无法打开JPA EntityManager组织。冬眠例外GenericJDBCException:无法打开连接   在scala中使用JavaWS对大型数据文件进行java流式处理   Java编译器是否将字节和短字符识别为文本?   java无法查找符号错误,空指针   mongodb在Java中重用数据库连接   java将多个StringArray从字符串文件获取到活动中   java是一个变量,它只保存最后一次鼠标单击的坐标   c#尺寸有限;添加、删除和洗牌   java如何在Android中显示来自资产文件夹的文本文件中的文本   Android应用程序中的java Tensorflow Lite自定义对象检测模型错误   java如何在foreachloop中使用scanner将来自命令行的输入存储到数组中   java如何定义一个好的存储库接口   Android中的java解析动态json对象