具有树状依赖项的ETL任务的作业管理器。
treetl的Python项目详细描述
treetl:运行具有树状依赖关系的etl任务
批处理作业的管道不需要是线性的。有时有共享的中间转换可以提供 今后的步骤。treetl通过存储和注册来管理和运行依赖etl作业的集合 作为一个polytree。
这个包与Spark作业放在一起,因此缓存中间和 把结果放在首位。因此,treetl的主要优点之一是 作业结果可以在内存中共享。
示例
以下作业集将只运行一次,并将其转换后的数据(或对其的某些引用)传递给 工作依赖于他们。
fromtreetlimport(Job,JobRunner,JOB_STATUS)classJobA(Job):deftransform(self,**kwargs):self.transformed_data=1returnself# JobB.transform can take a kwarg named# a_param that corresponds to JobA().transformed_data@Job.dependency(a_param=JobA)classJobB(Job):deftransform(self,a_param=None,**kwargs):self.transformed_data=a_param+1returnselfdefload(self,**kwargs):# could save intermediate result self.transformed_data herepass@Job.dependency(some_b_param=JobB)classJobC(Job):pass@Job.dependency(input_param=JobA)classJobD(Job):deftransform(self,input_param=None,**kwargs):self.transformed_data=input_param+1returnself@Job.dependency(in_one=JobB,in_two=JobD)classJobE(Job):deftransform(self,in_one=None,in_two=None,**kwargs):# do stuff with in_one.transformed_data and in_two.transformed_dataself.transformed_data=in_one+in_two# order submitted doesn't matterjobs=[JobD(),JobC(),JobA(),JobB(),JobE()]job_runner=JobRunner(jobs)ifjob_runner.run().status==JOB_STATUS.FAILED:# to see this section in action add the following to# def transform(self): raise ValueError()# to the definition of JobDprint('Jobs failed')print('Root jobs that caused the failure : {}'.format(job_runner.failed_job_roots()))print('Paths to sources of failure : {}'.format(job_runner.failed_job_root_paths()))else:print('Success!')print('JobE transformed data: {}'.format(jobs[4].transformed_data))
待办事项
- 通过顶级JobRunner设置多个作业通用的参数
- 将/传递状态参数设置为作业方法
- 支持将JobRunner作为嵌套作业依赖关系图的作业提交。
- 从树中的特定点运行。允许起始点的父级检索上次加载的数据,而不是重新计算整个依赖项集。
- 能够将作业属性传递给基于decorator的作业定义中使用的组件函数