Take=Luigi+时间

luiti的Python项目详细描述


![luiti](https://raw.githubusercontent.com/luiti/luiti.github.io/master/images/luiti/luiti_rectangle_logo.png)
============
[![构建状态](https://travis-ci.org/luiti/luiti.svg?branch=master)(https://travis ci.org/luiti/luiti)
[![覆盖状态](https://coveralls.io/repos/luiti/luiti/badge.svg?branch=master&service=github)(https://coveralls.io/github/luiti/luiti?分支=主)
[![健康](https://landscape.io/github/luiti/luiti/master/landscape.svg?style=flat)(https://landscape.io/github/luiti/luiti/master)
[![下载](https://img.shields.io/pypi/dm/luiti.svg?(https://pypi.python.org/pypi/luiti)
[![许可证](https://img.shields.io/pypi/l/luiti.svg?style=flat)"(https://pypi.python.org/pypi/luiti)

a s[luigi](https://github.com/spotify/luigi)的主页上说,是"一个
python模块,帮助您构建批处理作业的复杂管道。它还内置了hadoop支持,"

包**,并强制**每个python文件执行一个任务**。luiti任务类
可以通过"luiti"命令进行管理,支持的操作有ls、new、
generate、info、clean、run和webui。

luiti天生就是要构建**一个分层的数据库仓库**,对应于我们刚才提到的不同包。数据仓库由同步数据源、事实数据表、维度表、常规或临时业务报表组成。


Kly Report是必需的,所以这里有taskday、taskweek、
等等。任务类也有一个hadoop版本,比如taskdayhadoop,
taskweekhadoop等等。

因此,您可以定期运行luiti任务,例如每小时、每天、每周等。**luiti=luigi+time**。




[简介](Luiti)
3.[luiti命令工具](luiti命令工具)
2.[luiti webui屏幕截图](luiti webui屏幕截图)
4.[基于时间管理的核心概念](基于时间管理的核心概念)
5。[内置属性](任务规范和内置属性及建议)
6.[管理Luiti中的多个项目](管理Luiti中的多个项目)
7。[路易吉简易指南](路易吉简易指南)
8.[路易提的一个简单例子](路易提的一个简单例子)
9。[安装和开发](安装和开发需求)
10。[任务建议](任务建议)
11.[任务装饰器](任务装饰器)
12。[与mapreduce相关](与mapreduce相关)
13.[扩展luiti](扩展luiti)
14.[常见问题](常见问题)
15.[运行测试](运行测试)
16。[谁用路易提?](谁使用luiti)

keynote[luiti-脱机任务管理框架](https://speakerdeck.com/mvj3/luiti an offline task management framework)


luiti command tool
--------
安装包后,可以使用e包。

```文本
$luiti
用法:luiti[-h]{ls,new,generate,info,clean,run,webui}…


luiti任务管理器。

可选参数:
-h,--帮助显示此帮助消息并退出

子命令:
有效子命令

{ls,new,generate,info,clean,run,webui}
ls列出所有当前luiti任务。
新建创建新的luiti项目。
生成新的luiti任务python文件。
info显示详细的任务。
清除由luiti任务计算。
运行运行一个luiti任务。
webui启动一个luiti dag可视化程序。
````

luiti webui屏幕截图example_webui_run.py,让您了解luiti的多个python包是如何工作的。luiti webui列表
![luiti webui list](https://raw.githubusercontent.com/luiti/luiti/master/screenshots/luiti-webui-list.png)


2.luiti webui秀
![luiti webui show](https://raw.githubusercontent.com/luiti/luiti/master/screenshots/luiti-webui-show.png)


3.luiti代码显示
![luiti代码显示](https://raw.githubusercontent.com/luiti/luiti/master/screenshots/luiti_code_show.png)




基于时间管理的核心概念
-
日期类型


基本继承任务类:
0。任务库(luigi.task)
1.任务小时(taskbase)
2.任务日(taskbase)
3.任务周(任务库)
4.任务月(taskbase)
5。taskrange(taskbase)

您可以按子类"taskbase"扩展更多日期类型,并确保
日期类型也添加到"taskbase.date types"中。

taskday hadoop(luigi.hadoop.hadoopext,taskday)
2.taskweek hadoop(luigi.hadoop.hadoopext,任务周)
3.taskrange hadoop(luigi.hadoop.hadoopext,taskrange)

根任务(luigi.task)
2.静态文件(luigi.task)
3.mongoimporttask(taskbase)将json文件从hdfs导出到mongodb。





每个文件一个任务类。
2.任务类应为camel-case(例如"englishstudentallexamweek"),文件名应为小写下划线(例如"english-student"all-exam-week.py")。
3.任务文件应位于"luiti_tasks"目录下。luiti使用这个转换来连接pacakges内部和外部的任务。
4.任务类名应以日期类型结尾,例如天、周等。请参阅"taskbase.datetypes"。



\`日期值。必需,即使是范围类型的任务。这将确保"output"将写入day目录。
2.`数据文件`。绝对输出文件路径,它是字符串格式。
3.‘DATAYDIR’。绝对输出文件路径的目录,它是字符串格式。
4。'Roothdidi'。此包的根路径。`数据文件和数据目录都在下面。
5."输出"。基本任务的输出类是localtarget,hadoop任务的输出类是hdfs.hdfstarget。
6。"DATEYSTR"。日期时间字符串,如"20140901"。
7。`日期类型`。从任务类名生成的字符串,例如day、week等
8。`日期值。如果当前日期类型是week,则返回前一周的"date_value"。
8。`按"开始"中的"类型"排列的日期值。如果当前日期类型为周,则返回当前周的星期一零时钟。
9。`按"结束"中的"类型"排列的日期值。如果当前日期类型为"周",则返回当前周的星期日11:59:59时钟。
10。`一个接一个的任务。通常它返回当前日期类型中的上一个任务。如果达到当前日期类型的时间边界,则返回roottask。
11。`是不是到了边缘。这学期是在17左业商业学院。`实例按日期范围排列。类函数,返回属于当前日期范围的所有任务导入列表。
13。`任务类。返回当前任务类。




低格式,这意味着它也是一个普通的python包,例如:

``text
project_a---project directory
setup.py---python package install script
readme.markdown---项目自述
project-a/---python包安装目录---一个目录名,表示它包含多个luiti任务
–-uu init_uu.py---将磁盘上的当前目录标记为python包目录
-uu init_uiti.py---initialize luiti环境变量r/>——初始化py
使用这个包,您需要安装这个包,以确保luiti可以在python模块的搜索路径(`sys.path`)中找到它们。




每个luiti项目都有相同的结构,例如
"project-a/luiti-u tasks/another-u-day.py"。在
`luigi.plug_packages("project_b","project_c==0.0.2"])`in
``uu init_luiti.py`)中进行配置后,可以使用'@luigi.ref_tasks("artiststreamday")`to
指示当前任务以查找当前包中的'artiststreamday'任务
`project_a`,或相关的'project_b`,`project_c`包。



luigi的一个简单指南
路易吉主要由四部分组成:


1。**输出**。它必须在"output"函数中实现,如"localtarget"和"hdfs.hdfstarget"。
2.**输入**。它必须在'requires'函数中实现,
函数应该返回一些或没有任务实例。
3.**参数**。参数应继承自"luigi.parameter"、
例如"dateparameter"等
4。**执行逻辑**。如果在本地运行,请使用"run"函数;如果在分布式mapreduce纱线上运行,请使用"mapper"和"reducer"。

完成业务逻辑实现和测试用例后,可以将任务提交给"luigid"后台守护程序。` luigid`将自动处理任务依赖项,这是通过检查
` output`是否已存在(这是目标类的函数)来完成的。并且
luigi将通过任务类名和参数保证任务实例在当前的
`luigid`后台进程中是uniq。


luiti中的一个简单示例
.org/en/latest/example_top_artists.html

``python
import luigi
from collections import defaultdict


class aggregateartists(luigi.task):
date interval=luigi.dateintervalparameter()

def output(self):
返回luigi.localtarget("/data/artist streams_%s.tsv"%self.date_interval"

def需要(self):
返回[self.date_interval中日期的流(date)]


def run(self):
artist_count=defaultdict(int)

使用inpu输入self.input():
t.open('r')如在文件中:
如在文件中的行:
时间戳,艺术家,track=line.strip().split()
艺术家计数[artist]+=1

ritems():
打印出>;>;输出文件,艺术家,计数
```

``br/>````````````艺术家项目/luiti任务/artist流日.py`

``python
``python从luiti导入*

class artist stream day(静态文件):

@缓存的属性
def filepath(self):
返回targetutils.hdfs("/tmp/streams\u%s.tsv"%self.date\u str
````

*第二个文件:`艺术家项目/luiti任务/aggregate\u艺术家周.py`

``python
来自luiti import*

@luigi.ref\u任务("artiststreamday")
class aggregateArtistsWeek(taskWeek):

def需要(self):
return[self.artiststreamDay(d1)for d1 in self.days_in_week]


def output(self):
return targetutils.hdfs("/data/artist_streams.tsv"%self.date_str

artist_count=defaultdict(int)

_文件:
对于Artist,在Artist_count.iteritems()中计数:
打印>;>输出文件,Artist,计数:
`````

优化注释:

1。luiti的task类内置了"date\u value"属性,并将
转换为"arrow"数据类型。
2。在artiststreamday中,"date_str"从"date_value"转换而来,并在第一次调用后将
从函数转换为实例属性。
3。`@luigi.ref_task s`将artiststreamday绑定为aggregateArtistsWeek的
实例属性,因此我们可以使用'self.artiststreamday(d1)`表单
实例化一些任务实例。
4.aggregateArtistsWeek从"taskWeek"继承后,它将自动具有
"self.days"属性。` targetutils.line_read`替换了需要两行代码才能完成此功能的原始函数,并直接返回生成器。



导入*

@luigi.ref_tasks("artiststreamday")
class aggregateartistsweek(taskweekhadoop):

def requires(self):
return[self.artiststreamday(d1)for d1 in self.days_in_week]

def output(self):
return targetutils.hdfs("/data/weeks/artist_strEAMS_%s.tsv"%self.date_str

def mapper(self,line1):
timestamp,artist,track=line.strip().split()
yield artist,1

def reducer(self,artist,counts):
yield artist,len(counts)
`````

是的,与Luigi几乎没有区别,除了每周的"self.days"属性和"@luigi.ref"任务"decorator"之外。




com/luiti/luiti.git
cd luiti
python setup.py install
````

[node.js](https://nodejs.org/download/)&;[鲍尔](http://bower.io/install bower)
2.来自setup.py
3的PIP要求。[tox](http s://testrun.org/tox/latest/)&;[nose](https://nose.readthedocs.org/)



time library
----


time library是[箭头](http://crsmithdev.com/arrow/),每个任务的
实例的"日期值"属性都是一个箭头。箭头类型。

luiti转换日期p参数自动进入本地时区。如果要自定义时间,请使用
`arrowparameter.get(*strs)`和'arrowparameter.now()`以确保使用本地时区。






`,比如
[werkzeug](http://werkzeug.pocoo.org/docs/0.10/utils/)说,"一个
将函数转换为惰性属性的装饰器。包装的函数在第一次检索结果时称为
,然后在下次访问该值时使用该计算结果。

17zuoye每天都大量使用该函数,我们使用它来缓存
很多东西,例如大数据指令。

`` python
另一类业务日(taskdayhadoop):

def需要(self):
return[task1,task2,…]

def mapper(self,line1):
k1,v1=进程(line1)
yield k1,v1

def reducer(self,k1,vs1):
v2=func2(v1,self.another_dict)
产生k1,v2

@cached_property
def another_dict(self):
基本实用程序,如os、re、json、defaultdict等
2。日期处理实用程序,它们是箭头、箭头参数。
3.缓存实用程序,`cached_property`.
4.其他实用程序,如ioutils、dateutils、targetutils、hdfsutils、mrutils、mathutils、
commandutils、compressutils。



task decorators
=--
``python
1。延迟绑定相关任务,并可直接用作实例属性。
@luigi.ref_tasks(*tasks)

在mapreduce
@luigi.multiple_text_files()

3中支持多文件输出。在本地模式下运行mapreduce,只需添加一个decorator。
@luigi.mr_local()

4。check current task'满足数据源的日期范围。
@luigi.check_date_range()

5.检查当前任务是否可以在当前日期范围内运行。
@luigi.check_runtime_range(hour_num=[4,5,6],weekday_num=[1])

让[luigi.contrib]下的任务模板(https://github.com/spotify/luigi/tree/master/luigi/contrib)跟随luiti的任务转换。
@luigi.a s_a_luiti_task()

——
任务失败时清除临时文件。
执行mr作业时,luigi会立即将结果写入时间戳为
的文件。如果任务成功,则重命名为
任务原始输出文件路径的名称。如果任务失败,yarn将自动删除临时文件。


原汁原味。`对于targetutils.line read(hdfs1)中的line1,`line1'是一种
unicode类型。
2。由json读取。`对于targetutils.json_read(hdfs1)中的json1,`json1`是一个有效的python对象。
3。以k-v格式阅读。`对于targetutils.mr read(hdfs1)`,`k1`
是unicode类型,`v1`是python对象。

此函数压缩由"part-00000"文件块组成的mr文件结果数据格式。将mapreduce输入和输出添加到"mrtest_input"和"mrtest_output"中,
它们模拟mapreduce处理。
2.在测试文件中,使用"@mrtestcase"来修饰测试类,
,并将任务类添加到"mr_task_names"列表中。
3。(可选)在"mrtest_attrs"中添加一些配置dict,以模拟在生产模式下生成的属性。运行你的测试用例!

`购买水果日.py`

`` python
来自luiti import*


类购买水果日(taskdayhadoop):

def requiries(self):



def output(self):




yield uid,fruit

def reducer(self,uid,fruits):
price=sum([self.price_dict[fruit]for fruits in fruits])
yield",mrutils.str dump({"uid":uid,"price":price})

@cache\u property
def price\u dict(self):
result=dict()
对于targetutils中的json1.json读取(一个水果价格列表文件):
result[json1["name"]]=json1["price"]
return result

def mrtest放置(self):
返回"
{"uid":3,"水果":"apple"}
{"uid":3,"水果":"apple"}
{"uid":3,"水果":"香蕉"}
"

返回"
{"uid":3,"价格":7}
""

返回{
"price-dict":{
"apple":3,
"banana":1,
}
}

````

"测试文件"

``python
"python
mes=[
‘classenglishhallexamweek’,

]


ault属性或函数,例如:

``python
taskweek.extend({
'property_1':lambda self:"property_2",
})
````

`extend`类函数与'function`,`property`,`cached_property`,
或任何其他属性同时压缩。如果需要要覆盖
"property"和"cached_property",您只需要一个函数值,并且
"extend"将自动转换为"property"和
"cached_property"类型。



-
q:如何支持原子文件?

a:正如luigi的文档所提到的,"写入临时文件并在close时移动它的简单类"
如果未调用close,也会清理临时文件,因此请使用"self.input().open("r")或"self.output().open("w")",而不是"open"("some_file","w")。

q:c一个路易吉探测到相互依赖的任务?

a:这不是Luigi内部的问题,而是关于[拓扑排序](https://en.wikipedia.org/wiki/topology\u sorting)的问题
作为一个通用的计算机科学主题。任务调度程序在"luigi/scheduler.py"中实现。


a:您可以创建一个键值dict,`date_value`是键,您的
自定义参数是值。





如果您有其他未解决的问题,请随时在[问题](https://github.com/luiti/luiti/issues)上询问


运行测试
-——

``bash
noestests

----
*[17zuoye](http://www.17zuoye.com/)Luiti出生于本公司。

如果贵公司希望在本列表中出现,请告知我们!



许可证

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何向xsi:nil元素添加另一个属性?   Java抽象泛型方法,使用具体类型实现通配符   java使用pcap4j截断pcap文件   当我放置字母a、b和c时,java中的异常预期会下降   java设置活动对话框不可取消   接口类型变量上的Java克隆   使用Java或BouncyCastle对CSR(证书签名请求)进行安全解码/读取   java调用SavingsAccount对象上的函数并打印结果   java如何在Android应用程序上显示地图上的兴趣点(POI)并与之交互?   如果在JavaFX中的ResultSet中未找到任何内容,则显示java警报   java我将springboot和@component与@scheduled一起使用,它每12小时锁定一次   ApachePOI如何使用java删除包含字符串的word表的行   java如果对象(x,y)靠近其他对象(x,y)   从未对JMSException调用java JMS CachingConnectionFactory OneException方法   javascript使用java将HTML页面转换为MS word