具有树状依赖项的ETL任务的作业管理器。

treetl的Python项目详细描述


treetl:运行具有树状依赖关系的etl任务

批处理作业的管道不需要是线性的。有时有共享的中间转换可以提供 今后的步骤。treetl通过存储和注册来管理和运行依赖etl作业的集合 作为一个polytree

这个包与Spark作业放在一起,因此缓存中间和 把结果放在首位。因此,treetl的主要优点之一是 作业结果可以在内存中共享。

示例

以下作业集将只运行一次,并将其转换后的数据(或对其的某些引用)传递给 工作依赖于他们。

fromtreetlimport(Job,JobRunner,JOB_STATUS)classJobA(Job):deftransform(self,**kwargs):self.transformed_data=1returnself# JobB.transform can take a kwarg named# a_param that corresponds to JobA().transformed_data@Job.dependency(a_param=JobA)classJobB(Job):deftransform(self,a_param=None,**kwargs):self.transformed_data=a_param+1returnselfdefload(self,**kwargs):# could save intermediate result self.transformed_data herepass@Job.dependency(some_b_param=JobB)classJobC(Job):pass@Job.dependency(input_param=JobA)classJobD(Job):deftransform(self,input_param=None,**kwargs):self.transformed_data=input_param+1returnself@Job.dependency(in_one=JobB,in_two=JobD)classJobE(Job):deftransform(self,in_one=None,in_two=None,**kwargs):# do stuff with in_one.transformed_data and in_two.transformed_dataself.transformed_data=in_one+in_two# order submitted doesn't matterjobs=[JobD(),JobC(),JobA(),JobB(),JobE()]job_runner=JobRunner(jobs)ifjob_runner.run().status==JOB_STATUS.FAILED:# to see this section in action add the following to# def transform(self): raise ValueError()# to the definition of JobDprint('Jobs failed')print('Root jobs that caused the failure : {}'.format(job_runner.failed_job_roots()))print('Paths to sources of failure       : {}'.format(job_runner.failed_job_root_paths()))else:print('Success!')print('JobE transformed data: {}'.format(jobs[4].transformed_data))

待办事项

  • 通过顶级JobRunner设置多个作业通用的参数
  • 将/传递状态参数设置为作业方法
  • 支持将JobRunner作为嵌套作业依赖关系图的作业提交。
  • 从树中的特定点运行。允许起始点的父级检索上次加载的数据,而不是重新计算整个依赖项集。
  • 能够将作业属性传递给基于decorator的作业定义中使用的组件函数

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用prepared语句在oracle中插入日期   对点具有双重值的java   使用多个通配符的java请求映射   java Springboot为什么要设置springbootstartertomcat   除了对JavaBean的请求之外,还使用servletContext的servlet   java如何清除OCSID。返回到池的连接时的CLIENTID JDBC客户端信息属性   将整型数组转换为光栅Java   java使用对象引用作为互斥锁   java为什么在编程语言中使用sin函数返回奇怪的sin值不像计算器   java如何将JButton链接到对象并调用相关方法   php用Java发送POST数据   导航属性的java模拟加载   java多个活动错误Android试图对空对象引用调用虚拟方法“”   java Android更改ActionBar文本颜色   如何使用带有java反射且不带开关的parant引用创建子类