sequent是一个基于eventor的编程接口,提供了编写程序流的简单方法

sequent的Python项目详细描述


Overview

Sequent provides programmer with interface to create flow. Sequent is useful when there are multiple batch processes that should link together in processing flow.

For example, programs A1, A2 extract data from database, program B process the extracted files, and programs C1 and C2 transmit process information to remote client.

Sequent can be used to create the flow where A1, A2 would run concurrently, then program B would run. When B is successful, programs C1 and C2 would run.

Sequent uses Eventor (https://github.com/Acrisel/eventor) as its underline infrastructure. With that, it gains Eventor’s recovery capabilities. When running in recovery, successful steps can be skipped.

Considering the above example, this can be useful when C2 transmission program fails. Recovery run would only execute it with out redoing potentially expensive work that was already completed (A1, A2, B, and C1)

It would be easier to show an example. In this example, Step s2 would run after Step s1 would loop twice. When they are done, Step s3 will run. This flow would be repeat twice.

简单示例

 1 importsequentasseq 2 importlogging 3  4 defprog(progname,success=True): 5 logger=logging.getLogger(os.getenv("SEQUENT_LOGGER_NAME")) 6 logger.info("doing what {} is doing".format(progname)) 7 ifnotsuccess: 8 raiseException("{} failed".format(progname)) 9 returnprogname10 11 myflow=seq.Sequent(config={'sleep_between_loops':0.05,12 'LOGGING':{'logging_level':logging.DEBUG}})13 14 s1=myflow.add_step('s1',repeats=[1,2])15 16 s11=s1.add_step('s11',repeats=[1,2,])17 18 s111=s11.add_step('s111',func=prog,kwargs={'progname':'prog1'})19 s112=s11.add_step('s112',func=prog,kwargs={'progname':'prog2'},20 requires=((s111,seq.STEP_SUCCESS),))21 22 s12=s1.add_step('s12',func=prog,kwargs={'progname':'prog3'},23 require=s((s11,seq.STEP_SUCCESS),))24 25 s2=myflow.add_step('s2',func=prog,kwargs={'progname':'prog4'},26 requires=((s1,seq.STEP_SUCCESS),))27 28 myflow.run()

示例输出

The above example with provide the following log output. Note more detailed logging activities was stripped off. Only actual shows actual program activity is shown.

 1 [2016-12-0711:14:29,761][INFO][Eventorstorefile:/sequent/example/runly00.run.db] 2 ... 3 [2016-12-0711:14:29,962][INFO][doingwhatprog1isdoing] 4 ... 5 [2016-12-0711:14:30,124][INFO][doingwhatprog2isdoing] 6 ... 7 [2016-12-0711:14:30,358][INFO][doingwhatprog1isdoing] 8 ... 9 [2016-12-0711:14:30,587][INFO][doingwhatprog2isdoing]10 ...11 [2016-12-0711:14:30,908][INFO][doingwhatprog3isdoing]12 ...13 [2016-12-0711:14:31,234][INFO][doingwhatprog1isdoing]14 ...15 [2016-12-0711:14:31,407][INFO][doingwhatprog2isdoing]16 ...17 [2016-12-0711:14:31,657][INFO][doingwhatprog1isdoing]18 ...19 [2016-12-0711:14:31,894][INFO][doingwhatprog2isdoing]20 ...21 [2016-12-0711:14:32,240][INFO][doingwhatprog3isdoing]22 ...23 [2016-12-0711:14:32,565][INFO][doingwhatprog4isdoing]24 ...25 [2016-12-0711:14:32,713][INFO][Processingfinishedwith:success]

代码突出显示

Flow diagram:

+--S1----------------------+|||+--S11----------+|||||||S111->S112|->S12|->S2|+---------------+|+--------------------------+

For simplicity, code definition of prog (line 6) serves as reusable activity for all the steps in this example.

A Sequent object is defined (line 12) to host myflow. By default, Sequent’s Eventor loops on events and steps. By defaults it sleeps one second between loops. Here ‘sleep_between_loops’ changes this setting to 0.05 seconds.

myflow contains two steps, s1 and s2. s1 is a container step that would repeat twice (defined on line 15). s2 is a processing step (defined on line 26).

s1 contains two steps. s11 (line 17) is container step and s12 is a processing step.

s11 contains two processing steps s111 and s112 (lines 19-20).

Finally, on line 29 the flow is executed using myflow().

logger is set with in step program (line 5) to direct step logging into its dedicated log.

Sequent Interface

顺序类发起程序

Sequent signature in its most simplistic format:

Sequent(name='',store='',run_mode=SEQ.SEQUENT_RESTART,recovery_run=None,config={},config_tag='')

说明

Sequent, when instantiated, provides interface to build program flow. When called upon, Sequent steps are translated to Eventor steps and Step’s requires are translated to Eventor’s Events and Steps’triggers.

Sequent instantiation arguments are the same as Eventor’s.

参数

name: string id for Sequent object initiated.

store: path to file that would store runnable (sqlite) information; if ‘:memory:’ is used, in-memory temporary
storage will be created. If not provided, calling module path and name will be used with db extension instead of ‘py’.
run_mode: can be either RUN_RESTART (default) or RUN_RECOVER; in restart, new instance or the run
will be created. In recovery,
recovery_run: if RUN_RECOVER is used, recovery_run will indicate specific instance of previously recovery
run that would be executed.If not provided, latest run would be used.

config: keyword dictionary of default configurations. Available keywords and their default values:

NameDefault ValueDescription
workdir/tmpplace to create necessry artifacts (not in use)
logdir/var/log/eventorplace to create debug and error log files
task_construct‘process’method to use for execution of steps
max_concurrent1maximum concurrent processing, if value <1, no limit will be pose
stop_on_exceptionTrueif an exception occurs in a step, stop all processes. If True, new processes will not start. But running processes will be permitted to finish
sleep_between_loops1seconds to sleep between iteration of checking triggers and tasks

config_tag:in-config中的键,序列配置从这里开始。

顺序添加事件方法

add_event(require=None)

参数

requires: logical expression ‘sqlalchemy’ style to automatically raise this expression.

syntax:

requires:(requires,requires,...)|or_(requires,requires,...)|event
  • if expression is of the first style, logical and will apply.
  • the second expression will apply logical or.
  • the basic atom in expression is even which is the product of add_event.

返回

Event object to use are require in add_step.

顺序添加步骤法

add_step(name,func,args=(),kwargs={},hosts=[],requires={},delay=0,acquires=[],releases=None,recovery={},config={})

参数

name: string unique id for step

func: callable object that would be call at time if step execution

args: tuple of values that will be passed to func at calling

kwargs: keywords arguments that will be passed to func at calling

hosts: list of hosts step should run on. If not provided, localhost will be used.
if

requires: mapping of step statuses such that when set of events, added step will be launched:

statusdescription
STEP_READYset when task is ready to run (triggered)
STEP_ACTIVEset when task is running
STEP_SUCCESSset when task is successful
STEP_FAILUREset when task fails
STEP_COMPLETEstands for success or failure of task
delay:需要等待几秒钟才能执行一次step。实际执行
如果需要获取资源,则可能会进一步延迟。

获取:启动前要获取的资源池元组列表和资源量。

releases:资源池的元组列表和完成后要释放的资源量。
如果没有,则默认为获取。如果设置为空列表,则任何获取的资源都不会 发布。

recovery:将状态状态映射到恢复中应如何处理步骤:

statusdefaultdescription
STEP_READYSTP_RERUNif in recovery and previous status is ready, rerun
STEP_ACTIVESTP_RERUNif in recovery and previous status is active, rerun
STEP_FAILURESTP_RERUNif in recovery and previous status is failure, rerun
STEP_SUCCESSSTP_SKIPif in recovery and previous status is success, skip

config:步骤配置的关键字映射重写。

namedefaultdescription
stop_on_exceptionTruestop flow if step ends with Exception

返回

Step object to use in add_assoc method.

顺序运行法

run(max_loops=-1)

调用run方法时,将生成信息并执行评估事件和任务启动的循环。 在每个循环中引发事件并执行任务max_loops参数允许控制 要执行的循环。

在简单的例子中,myflow.run()使用Sequent的run()方法

参数

max_loops: max_loops: number of loops to run. If positive, limits number of loops.
defaults to negative, which would run loops until there are no events to raise and no task to run.

返回

If there was a failure that was not followed by event triggered, result will be False.

Distributed Operation

sequent可以在分布式环境中操作步骤。可以使用add_step中的hosts参数将步骤与主机关联。Sequent使用SSH向远程主机提交步骤这意味着集群需要配置SSH密钥为sequent分布式操作设置环境:

  1. 启动Sequent程序的主机应该能够通过SSH连接到参与的主机,而不必只使用密钥

  2. 每个目标主机上的ssh授权密钥都应该有适当的command来启动正确的操作环境。这可能包括激活正确的virtualenv。

  3. 或者,将SSH backdoor设置为原始主机将来sequent可能会使用这个后门作为回调。

  4. 软件需要统一安装在所有参与机器上。

  5. sequent必须使用可从所有参与主机访问的数据库配置启动。sequent及其远程代理将使用该数据库共享操作信息。数据库用户需要具有创建架构的权限(如果未创建关联的架构),还需要具有创建表的权限。

  6. 传递给sequent的任何内容,主要是add_step,都需要是可导入的。例如在简单示例中:

    importexample_progsasexamples111=s11.add_step('s111',func=example.prog,kwargs={'progname':'prog1'})

Recovery

Recovery allows rerun of a program in a way that it will skip successful steps. To use recovery, store mast be physical (cannot use in-memory).

According to step recovery setup, when in recovery, step may be skipped or rerun. By default, only success statuses are skipped.

Here is an example for recovery program and run.

恢复示例

 1 importsequentassqnt 2 importlogging 3  4 appname=os.path.basename(__file__) 5 logger=logging.getLogger(appname) 6  7 defprog(flow,progname,step_to_fail=None,iteration_to_fail=''): 8 logger=logging.getLogger(os.getenv("SEQUENT_LOGGER_NAME")) 9 step_name=flow.get_step_name()10 step_sequence=flow.get_step_sequence()11 logger.info("doing what {} is doing (}/{})".format(progname,step_name,step_sequence))12 ifstep_to_fail==step_nameandstep_sequence==iteration_to_fail:13 raiseException("{} failed ({}/{})".format(progname,step_name,step_sequence))14 returnprogname15 16 defbuild_flow(run_mode=sqnt.RUN_RESTART,run_id=None,step_to_fail=None,iteration_to_fail=''):17 myflow=sqnt.Sequent(name=appname,run_mode=run_mode,run_id=run_id,config={'sleep_between_loops':0.05,},)18 19 s1=myflow.add_step('s1',repeats=[1,2])20 21 s11=s1.add_step('s11',repeats=[1,2,])22 23 s111=s11.add_step('s111',func=prog,kwargs={'flow':myflow,'progname':'prog1',24 'step_to_fail':step_to_fail,25 'iteration_to_fail':iteration_to_fail,})26 s112=s11.add_step('s112',func=prog,kwargs={'flow':myflow,'progname':'prog2',27 'step_to_fail':step_to_fail,28 'iteration_to_fail':iteration_to_fail,},29 requires=((s111,sqnt.STEP_SUCEESS),))30 31 s12=s1.add_step('s12',func=prog,kwargs={'flow':myflow,'progname':'prog3',32 'step_to_fail':step_to_fail,33 'iteration_to_fail':iteration_to_fail,},34 requires=((s11,sqnt.STEP_SUCEESS),))35 36 s2=myflow.add_step('s2',func=prog,kwargs={'flow':myflow,'progname':'prog4',37 'step_to_fail':step_to_fail,38 'iteration_to_fail':iteration_to_fail,},39 requires=((s1,sqnt.STEP_SUCEESS),))40 returnmyflow41 42 # creating flow simulating failure43 myflow=build_flow(step_to_fail='s1_s11_s111',iteration_to_fail='1.2.2')44 myflow.run()45 46 run_id=myflow.run_id47 48 # creating recovery flow49 myflow=build_flow(run_mode=RUN_RECOVER,run_id=run_id)50 myflow.run()

示例输出

 1 [2016-12-0714:49:24,437][INFO][Eventorstorefile:/sequent/example/runly04.run.db] 2 ... 3 [2016-12-0714:49:24,645][INFO][doingwhatprog1isdoing(s1_s11_s111/1.1.1)] 4 ... 5 [2016-12-0714:49:24,805][INFO][doingwhatprog2isdoing(s1_s11_s112/1.1.1)] 6 ... 7 [2016-12-0714:49:25,047][INFO][doingwhatprog1isdoing(s1_s11_s111/1.1.2)] 8 ... 9 [2016-12-0714:49:25,272][INFO][doingwhatprog2isdoing(s1_s11_s112/1.1.2)]10 ...11 [2016-12-0714:49:25,587][INFO][doingwhatprog3isdoing(s1_s12/1.1)]12 ...13 [2016-12-0714:49:25,909][INFO][doingwhatprog1isdoing(s1_s11_s111/1.2.1)]14 ...15 [2016-12-0714:49:26,073][INFO][doingwhatprog2isdoing(s1_s11_s112/1.2.1)]16 ...17 [2016-12-0714:49:26,321][INFO][doingwhatprog1isdoing(s1_s11_s111/1.2.2)]18 [2016-12-0714:49:26,323][INFO][[Steps1_s11_s111/1.2.2]Completed,status:TaskStatus.failure]19 [2016-12-0714:49:26,397][ERROR][Exceptioninrun_action:20 <Task(id='15',step_id='s1_s11_s111',sequence='1.2.2',recovery='0',pid='10276',status='TaskStatus.failure',created='2016-12-07 20:49:26.300030',updated='2016-12-07 20:49:26.311884')>]21 [2016-12-0714:49:26,397][ERROR][Exception('prog1 failed (s1_s11_s111/1.2.2)',)]22 [2016-12-0714:49:26,397][ERROR][File"/eventor/eventor/main.py",line63,intask_wrapper23 result=step(seq_path=task.sequence)24 File"/eventor/eventor/step.py",line82,in__call__25 result=func(*func_args,**func_kwargs)26 File"/sequent/example/runly04.py",line34,inprog27 raiseException("%s failed (%s/%s)"%(progname,step_name,step_sequence))]28 [2016-12-0714:49:26,397][INFO][Stoppingrunningprocesses]29 [2016-12-0714:49:26,401][INFO][Processingfinishedwith:failure]30 [2016-12-0714:49:26,404][INFO][Eventorstorefile:/sequent/example/runly04.run.db]31 ...32 [2016-12-0714:49:27,921][INFO][doingwhatprog1isdoing(s1_s11_s111/1.2.2)]33 ...34 [2016-12-0714:49:28,159][INFO][doingwhatprog2isdoing(s1_s11_s112/1.2.2)]35 ...36 [2016-12-0714:49:28,494][INFO][doingwhatprog3isdoing(s1_s12/1.2)]37 ...38 [2016-12-0714:49:28,844][INFO][doingwhatprog4isdoing(s2/1)]39 [2016-12-0714:49:28,845][INFO][[Steps2/1]Completed,status:TaskStatus.success]40 [2016-12-0714:49:29,002][INFO][Processingfinishedwith:success]

示例突出显示

The function build_flow (code line 14) build a Sequent flow similarly to simple example above. Since no specific store is provided in Sequent instantiation, a default runner store is assigned (code line 15). In this build, steps will use default recovery directives whereby successful steps are skipped.

The first build and run is done in lines 42-43. In this run, a parameter is passed to cause step s111 in its fourth iteration to fail. As a result, flow fails. Output lines 1-29 is associated with the first run.

The second build and run is then initiated. In this run, parameter is set to a value that would pass step s111 and run mode is set to recovery (code lines 45-46). Eventor skips successful steps and start executing from failed steps onwards. Output lines 30-40 reflects successful second run.

Note that the second run required a run_id of the run that is reactivated. run_id is fetched from its corresponding attribute in Sequent Objects.

For prog to know when to default, it uses the following methods flow.get_step_name() and flow.get_step_sequence() (lines 7-8). Those Sequent methods allow access to Eventor step attributes. Another way to access these attributes is via os.environ:

name=os.getenv('SEQUENT_STEP_NAME')sequence=os.getenv('SEQUENT_STEP_SEQUENCE')recovery=os.getenv('SEQUENT_STEP_RECOVERY')logger_name=os.getenv('SEQUENT_LOGGER_NAME')

分布式Example

Resources

add_step allows association of step with resources. If acquires argument is provided, before step starts, Eventor will attempt to reserve resources. Step will be executed only when resources are secured.

When release argument is provided, resources resources listed as its value will be released when step is done. If release is None, whatever resources stated by acquires would be released. If the empty list is set as value, no resource would be released.

To use resources, program to use Resource and ResourcePool from acris.virtual_resource_pool. Example for such definitions are below.

资源定义示例

 1 importsequentassqnt 2 fromacrisimportvirtual_resource_poolasvrp 3  4 classResources1(vrp.Resource):pass 5 classResources2(vrp.Resource):pass 6  7 rp1=vrp.ResourcePool('RP1',resource_cls=Resources1,policy={'resource_limit':2,}).load() 8 rp2=vrp.ResourcePool('RP2',resource_cls=Resources2,policy={'resource_limit':2,}).load() 9 10 myflow=sqnt.Sequent(config={'sleep_between_loops':0.05,},)11 s1=myflow.add_step('s1',repeats=[1,2],acquires=[(rp1,2),])

Additional Information

Sequent github project (https://github.com/Acrisel/sequent) has additional examples with more complicated flows.

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
mysql java语言非法参数异常?   java绘制控制流图   无法通过java eclipse中的ssh隧道远程连接到mysql数据库   直接在@SuppressWarnings之后出现java Eclipse错误(“未选中”)   java如何使用远程api google应用程序引擎api客户端?   工具栏中的java导航图标颜色未更改?   java返回带有映射子集的实体   java Spring重新加载属性Bean未更新   fileoutputstream正在获取java。木卫一。IOException:流关闭错误,但未显式关闭它   jsp标记显示java。时间使用jstl的LocalDateTime   java如何设置onFailure事件(Spring,Kafka)的超时?   将java元素添加到多个线程中的列表时,会跳过这些元素   JAVAutil。扫描程序Java读取空格后的整数值   java JMonkey过滤器着色器获取片段的世界位置   java从mongodb获取随机值   java如何使用Intent。标记\u活动\u清除\u顶部以清除活动堆栈?   多线程如何控制在多个端口上监听的java服务器应用程序   在具有复合键的相关实体之间插入数据时发生java Hibernate错误