{{description}}
datapackage-pipelines的Python项目详细描述
数据包管道
基本知识
这是什么?
datapackage pipelines
是用于表格式数据的声明性流处理的框架。它建立在无摩擦数据项目的概念和工具之上。
管道
这个框架的基本概念是管道。
管道有一个处理步骤列表,它生成一个数据包作为其输出。每个步骤在处理器中执行,包括以下阶段:
- 修改数据包描述符-例如:添加元数据、添加或删除资源、更改资源的数据架构等。
- 处理资源-按顺序处理每个资源的每一行。处理器可以删除行、添加新行或修改其内容。
- 返回统计信息-如有必要,处理器可以报告一个数据字典,当管道执行终止时,该字典将返回给用户。例如,这可用于计算处理数据的质量度量。
并非每个处理器都需要完成所有这些任务。事实上,您经常会发现每个处理步骤只执行其中的一个。
pipeline-spec.yaml
文件
管道是以声明的方式定义的,而不是在代码中。可以在pipeline-spec.yaml
文件中定义一个或多个管道。此文件指定每个处理器的处理器列表(按名称引用)和执行参数。
下面是一个pipeline-spec.yaml
文件的示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
在本例中,我们看到一条名为worldbank-co2-emissions
的管道。它的管道由4个步骤组成:
元数据
:这是一个库处理器(见下文),它修改数据包的描述符(在本例中为初始的空描述符)-向数据包添加名称
,标题
和其他属性。加载
:这是另一个库处理器,它将数据加载到数据包中。 此资源有一个名称和一个
from
属性,指向数据的远程位置。设置类型
:此处理器将数据类型分配给数据中的字段。在本例中,看起来像年份的字段标题将被指定为数字
类型。dump_to_zip
:使用提供的文件名创建一个经过压缩和验证的数据包。
力学
管道如何运行的一个重要方面是数据以流的形式从一个处理器传递到另一个处理器。如果我们在这里得到"technical",那么每个处理器都在其自己的专用进程中运行,数据包从其stdin
中读取并输出到其stdout
。这里要注意的重要一点是,没有处理器在任何一点保存整个数据集。
限制是通过设计来实现的—使每个处理器的内存和磁盘需求受到限制,并且与数据集大小无关。
快速启动
首先,在当前目录中创建一个pipeline-spec.yaml
文件。如果您只是想试用一下,您可以使用上面的文件。
然后,您可以在本地安装数据包管道
注意,由于使用了类型提示和高级异步使用:
$ pip install datapackage-pipelines
现在您应该可以使用dpp
命令:
$ dpp Available Pipelines: - ./worldbank-co2-emissions (*) $ $ dpp run --verbose ./worldbank-co2-emissions RUNNING ./worldbank-co2-emissions Collecting dependencies Running async task Waiting for completion Async task starting Searching for existing caches Building process chain: - update_package - load - set_types - dump_to_zip - (sink) DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80 load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736 load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736 load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80 load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736 load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736 set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,) load: INFO :Processed 264 rows set_types: INFO :Processed 264 rows DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py dump_to_zip: INFO :Processed 264 rows DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'} INFO :RESULTS: INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
或者,您可以使用我们的Docker图像:
$ docker run -it -v `pwd`:/pipelines:rw \ frictionlessdata/datapackage-pipelines <available-pipelines> $ docker run -it -v `pwd`:/pipelines:rw \ frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions <execution-logs>
命令行界面-dpp
使用dpp
工具从命令行运行管道。
在不带任何参数的情况下运行dpp,将显示可用管道的列表。这是通过扫描当前目录及其子目录、搜索pipeline-spec.yaml
文件并提取其中描述的管道规范列表来完成的。
每个管道都有一个标识符,由pipeline-spec.yaml
文件的路径和该描述文件中定义的管道名称组成。
要运行管道,请使用dpp run<;pipeline id>;
您还可以使用dpp run all
来运行所有管道,并使用dpp run dirty
来运行仅dirty管道(稍后详细说明)。
深入了解管道
处理器分辨率
如前所述,处理器是按名称引用的。
实际上,这个名称是包含处理代码的python脚本的名称(减去.py
扩展名)。当试图找到需要执行的实际代码的位置时,处理器解析器将在这些预定义的位置进行搜索:
- 首先,它将尝试在
pipeline-spec.yaml
文件的目录中找到具有该名称的自定义处理器。 处理器名称支持点表示法,因此您可以编写mycode.custom_processor
,它将尝试在mycode
目录中找到名为custom_processor.py
的处理器,路径与管道规范文件相同。 对于这个特定的解析阶段,如果您要编写。custom_processor
它将尝试在管道规范文件的父目录中找到该处理器。 (继续阅读有关如何编写自定义处理器的说明) - 如果处理器名称看起来像
myplugin.somename
,它将尝试在myplugin
插件中找到名为somename
的处理器。也就是说,它将看到是否有一个名为myplugin
的插件,如果有,该插件是否发布了一个名为somename
的处理器(更多关于下面的插件)。 - 如果在此之前未找到处理器,则它将尝试在处理器搜索路径中搜索此处理器。处理器搜索路径取自环境变量
dpp_processor_path
。路径中的每个分离路径都被视为解析处理器的可能起点。 - 最后,它将尝试在与此软件包捆绑在一起的标准处理器库中找到该处理器。
从扫描管道规格中排除目录
默认情况下,*
目录不包括在扫描范围内,您可以为
通过在项目根目录中创建.dpp_spec_ignore
文件排除。这个文件有类似的语法
到.gitignore并将根据全局模式匹配从扫描中排除目录。
例如,下面的文件将忽略test*
目录,包括inside子目录
并且/docs
目录将只在项目根目录中被忽略
test*
/docs
缓存
通过将特定管道步骤上的cached
属性设置为true
,此步骤的输出将存储在磁盘上(在.cache
目录中,与pipeline-spec.yaml
文件位于同一位置)。
重新运行管道将利用该缓存,从而避免执行缓存步骤及其前身。
在内部,为管道中的每一步计算一个散列,这是基于处理器的代码、它的参数和它的前一步的散列。如果存在与特定步骤具有完全相同哈希值的缓存文件,则可以将其删除(及其前置任务),并将该缓存文件用作管道的输入
这样,当代码或执行参数更改时(对于缓存的处理器或之前的任何处理器),缓存将变为无效。
脏任务和保持状态
缓存哈希还用于查看管道是否"脏"。当管道成功完成执行时,dpp
将缓存哈希与管道id一起存储。如果存储的哈希与当前计算的哈希不同,则表示代码或执行参数已被修改,管道需要重新运行。
dpp
可用于两个存储后端。对于本地运行,它使用pythonsqlite db来存储每个运行任务的当前状态,包括最后的结果和缓存散列。state db文件存储在名为.dpp.db
的文件中,该文件位于运行dpp
的同一目录中。
对于其他安装,特别是使用任务调度程序的安装,建议使用redis后端。要启用redis连接,只需将dpp_redis_host
环境变量设置为指向正在运行的redis实例即可。
管道依赖项
您可以声明一个管道依赖于另一个管道或数据包。计算管道的缓存哈希值时会考虑此依赖关系,这反过来会影响缓存文件的有效性和"脏"状态:
- 对于管道依赖项,在计算中使用该管道的哈希值
- 对于数据包依赖项,在计算中使用数据包中的
hash
属性
如果缺少依赖项,则管道将标记为"无法执行"。
通过pipeline-spec.yaml
文件中的管道定义的dependencies
属性来声明依赖项。
此属性应包含依赖项列表,每个依赖项都是具有以下格式的对象:
- 一个名为
pipeline
的键,其值是要依赖的管道id - 名为
datapackage
的单个键,其值是要依赖的数据包的标识符(或url)。
示例:
cat-vs-dog-populations:dependencies:-pipeline:./geo/region-areal-datapackage:http://pets.net/data/dogs-per-region/datapackage.json-datapackage:http://pets.net/data/dogs-per-region...
正在验证
自动验证每个处理器的输入是否正确:
数据包在传递给处理器之前总是经过验证,因此处理器不可能以使其无效的方式修改数据包。
除非通过在步骤信息中将
validate
标志设置为true显式请求,否则不会根据相应的json表模式验证数据。 这样做有两个主要原因:- 从性能上看,验证每个步骤中的数据都非常占用CPU
- 在某些情况下,您在一个步骤中修改模式,在另一个步骤中修改数据,因此您只想在所有更改完成后验证数据
在任何情况下,当使用
set_types
标准处理器时,它将使用新类型验证和转换输入数据。
数据流集成
数据流是数据包管道的继承者,提供了更多
与正在运行的管道的pythonic接口。您可以使用flow
属性在管道规范中集成数据流
而不是运行
。例如,给定以下流文件,保存在my flow.py
下:
from dataflows import Flow, dump_to_path, load, update_package
def flow(parameters, datapackage, resources, stats):
stats['multiplied_fields'] = 0
def multiply(field, n):
def step(row):
row[field] = row[field] * n
stats['multiplied_fields'] += 1
return step
return Flow(update_package(name='my-datapackage'),
multiply('my-field', 2))
以及同一目录下的pipeline-spec.yaml
my-flow:
pipeline:
- run: load_resource
parameters:
url: http://example.com/my-datapackage/datapackage.json
resource: my-resource
- flow: my-flow
- run: dump_to_path
您可以使用dpp run my flow来运行管道
标准处理器库
库中提供了一些内置处理器。
更新软件包
将元数据添加到数据包。
参数:
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip0
加载
将数据加载到包中,推断架构并可选地强制转换值。
参数:
来自
要加载的数据的位置。这可以是:- 本地路径(例如/path/to/the/data.csv)
- 一个远程url(例如,https://path.to/the/data.csv" rel="nofollow">https://path.to/the/data.csv)
- 其他支持的链接,基于当前对制表器中的方案和格式的支持
- data package.json文件的本地路径或远程url(例如https://path.to/data-package/datapackage.json" rel="nofollow">https://path.to/data-package/datapackage.json)
- 对包含源位置的环境变量的引用,格式为
- 元组包含(数据包描述符、资源迭代器)
资源
-可选,仅当源指向datapackage.json文件或datapackage/resource元组时才相关。值应为以下值之一:- 要加载的单个资源的名称
- 匹配要加载的资源名称的正则表达式
- 要加载的资源名称列表
- "none"表示加载所有资源
- 包中资源的索引
validate
-是否应将数据强制转换为推断的数据类型。仅当不从数据包加载数据时才相关。- 其他选项-基于加载的文件,附加选项(例如,Excel文件的工作表等,请参见上面表格的链接)
打印机
只要把看到的都打印出来。有利于调试。
参数:
行数
-修改要预览的行数,打印机将从流中的不同位置打印此行数的多个示例最后一行
-流中要打印的最后几行。可选,默认为num_rows的值字段
-可选,要预览的字段名列表资源
-可选,允许限制打印的资源,语义与加载处理器资源参数
设置类型
将数据类型和类型选项设置为流式资源中的字段,并确保数据仍然使用新类型进行验证。
这允许对现有的表模式进行修改,通常也可以从stream\u remote\u resources
参数:
资源
-要修改哪些资源。可以是:- 字符串列表,解释为要传输的资源名称
- 字符串,解释为用于匹配资源名称的正则表达式
如果省略,则数据包中的所有资源都将流式传输。
regex
-如果设置为false
字段名将被解释为字符串而不是正则表达式(默认情况下true
)类型
-字段名和字段定义之间的映射。- 字段名可以是字段名,也可以是匹配多个字段的正则表达式。
- 字段定义是一个遵循json表模式规范的对象。您可以使用
null而不是对象从架构中删除字段。
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip1
加载元数据
从现有数据包加载元数据。
参数:
从位于url的数据包加载元数据
将复制加载的数据包的所有属性(除了资源
)
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip2
加载资源
从现有数据包加载表格资源。
参数:
从位于url
的数据包加载resource
参数中指定的资源。
将复制加载资源的所有属性-路径
和架构
包括在内。
url
-指向所需资源所在的数据包的url资源
-可以是- 字符串列表,解释为要加载的资源名
- 字符串,解释为用于匹配资源名称的正则表达式
- 一个整数,表示数据包中资源的索引(基于0)
限制行
-如果提供,将限制从源中获取的行数。获取一个整数值,该整数值指定要传输的源的行数。记录进度行
-如果提供,将记录加载进度。取一个整数值,指定记录进度的行数间隔。流
-如果提供并设置为false,则资源将添加到数据包中,但不会流化。资源
-可以使用而不是资源
属性来支持加载资源和修改输出资源元数据- 值是一个dict,它包含要加载的源资源名称和要应用于已加载资源的dict包含描述符更新之间的映射
必需
-如果提供并设置为false,则在数据包不可用或缺少资源时不会失败
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip3
连接
连接多个流式资源并将其转换为单个资源。
参数:
源
-要连接哪些资源。语义与stream\u remote\u resources中的
resources
相同如果省略,数据包中的所有资源都将连接起来。
<资源>连接必须在数据包中以连续顺序出现。target
-保存连接数据的目标资源。应至少定义以下属性:名称
-资源的名称路径
-此文件的数据包中的路径。
如果省略,目标资源将接收名称
concat
,并将保存在数据包中的data/concat.csv
。字段
-源和目标之间的字段映射,以便键是目标字段名,值是源字段名的列表。此映射用于创建目标资源架构。
请注意,目标字段名总是假定映射到自身。
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip4
在本例中,我们将所有看起来像report year-<;year>;
的资源连接起来,并将它们输出到多年报告资源中。
输出包含两个字段:
活动
,在所有源中称为活动。金额
,在不同的资源中有不同的名称(例如金额
,2009年金额
,金额
等)
加入
加入两个流式资源。
"加入"在我们的例子中意味着获取目标资源,并通过在源资源中查找数据向其每一行添加字段。
join操作的一个特殊情况是没有目标流,并且源中的所有唯一行都用于创建它。 此模式称为"重复数据消除"模式-将创建目标资源,并将源中的重复数据消除行添加到其中。
参数:
source
-有关源的信息资源名称
-资源的名称键
- 应用作查找键的字段名列表
- 字符串,它将被解释为python格式的字符串,用于形成键(例如
{<;field_name_1>;}:{field_name_2}
)
删除
-加入后从数据包中删除(false
默认)
target
-保存关联数据的目标资源。应至少定义以下属性:名称
-如源
键
-如源
中所示,或空
用于创建目标资源并执行重复数据消除
字段
-从源资源到目标资源的字段映射。 键应该是目标资源中的字段名。 值可以定义两个属性:名称
-源中的字段名(默认情况下与目标字段名相同)聚合
-聚合策略(如何使用同一个键处理多个源行)。可以选择以下选项:求和
-汇总聚合值。 对于数值,它是算术和;对于字符串,字符串和其他类型的连接将出错。平均值
-计算聚合值的平均值。对于数值,它是算术平均值,对于其他类型,它将出错。
max
-计算聚合值的最大值。对于数值,它是算术最大值;对于字符串,它是字典最大值;对于其他类型,它将出错。
min
-计算聚合值的最小值。对于数值,它是算术最小值;对于字符串,它是字典最小值;对于其他类型,它将出错。
第一个
-获取遇到的第一个值最后一个
-获取遇到的最后一个值计数
-计数特定键的出现次数 对于此方法,不需要指定名称
。如果指定了,则count
将计算该源字段的非空值数。计数器
-计算不同值的出现次数 将返回一个2元组数组,格式为[值,值的计数]
设置
-收集聚集字段的所有不同值,无序数组
-按外观顺序收集聚合字段的所有值任意
-选择任意值。
默认情况下,
聚合
接受任意
值。
如果不需要指定
名称
或聚合
,则映射可以映射到空对象{}
或null
完整
-布尔型,- 如果
true
(默认值),在源中查找失败将在源中产生"空"值。 - 如果
false
,则在源中查找失败将导致从目标中删除行。
- 如果
重要提示:"源"资源必须出现在数据包中"目标"资源之前。
示例:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip5
上面的例子旨在创建一个包含世界上每个国家的GDP和人口的包。
我们有一个资源(世界人口
)数据如下:
另一种资源(Country_GDP_2015
)的数据如下:
join
命令将基于国家代码
/cc
字段匹配两个数据集中的行,然后将2015年人口普查
字段中的值复制到新的人口
字段中。
结果数据包将删除世界人口
资源和2015年国家GDP
资源,如下所示:
一个更复杂的例子:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip6
本例旨在分析米高梅电影公司的银幕演员的工资。
再一次,我们有一个资源(screen\u actor\u salaries
)的数据看起来像:
另一个资源(米高梅电影
)的数据如下:
join
命令将根据电影名称和制作年份匹配两个数据集中的行。注意我们如何通过使用不同的键模式来克服不兼容的字段。
生成的数据集可能如下所示:
<表><广告>过滤器
过滤流式资源。
筛选器
接受相等和不相等条件,并测试选定资源中的每一行。如果所有条件都无效,则该行将被丢弃。
参数:
资源
-要对哪些资源应用筛选器。与stream\u remote\u资源中的
resources
的语义相同在
中-键到值的映射,这些值转换为行[键]==值
条件out
-键到值的映射,这些值转换为行[key]!=值
条件
in和out
都应该是一个对象列表。但是,
out
应该只有一个元素。
示例:
仅过滤美国和欧洲国家,排除主要语言为英语的国家:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip7
要按多个值筛选出out
,需要多个筛选处理器,而不是多个out
元素。否则,某些条件将始终有效,并且不会对任何行进行讨论:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip8
排序
按键对流式资源进行排序。
sort
接受资源列表和键(作为行字段上的python格式字符串)。
它将输出每个资源的行,按键排序(默认按升序)。
参数:
资源
-要排序的资源。与stream\u remote\u资源中的
resources
的语义相同按
-string排序,它将被解释为用于形成键的python格式字符串(例如,{<;field_name_1>;}:{field_name_2}
)反转
-可选布尔值,如果设置为true-按反转顺序排序
示例:
仅过滤美国和欧洲国家,排除主要语言为英语的国家:
worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip9
重复
复制资源。
duplicate
接受数据包中单个资源的名称。
然后它将在输出数据包中复制它,并使用不同的名称和路径。
复制的资源将立即出现在其原始资源之后。
参数:
源代码
-要复制哪些资源。资源的名称。目标名称
-新的重复资源的名称。目标路径
-新的重复资源的路径。
示例:
仅过滤美国和欧洲国家,排除主要语言为英语的国家:
$ pip install datapackage-pipelines0
删除字段
从流式资源中删除字段(列)
删除字段
接受要删除的资源列表和字段列表
注意:如果提供了多个资源,则所有资源都应包含要删除的所有字段
参数: 示例: 从 向流式资源添加字段 参数: 示例: 下面的示例将4个新字段添加到 我们有一个资源( 生成的数据集可能如下所示: 从字段值中查找并替换字符串或模式 参数: 示例: 下面的示例使用正则表达式和精确的字符串模式替换字段值 我们有一个资源( 生成的数据集可能如下所示: 取消ivots,转换表格数据,使每行只有一条记录。 参数: 示例: 下面的示例将数据取消激活为3个新字段: 我们有一个资源( 生成的数据集可能如下所示: 类似的结果可以通过定义正则表达式而不是常量来实现 将数据包保存到SQL数据库。 参数: 将数据包保存到文件系统路径。 参数: 如果该路径不存在,则将创建该路径以及内部数据包路径。 如果省略,则假定为 将数据包保存到压缩存档。 参数: 这些处理器将在下一个主要版本中删除。 由于向后兼容的原因,保留了 将新的外部表格资源添加到数据包。 参数: 您应该提供a 请注意, 参数基本上是传递给a 您也可以在这里提供一个模式,或者使用由 示例: 将外部资源转换为流式资源。 外部资源是指链接到远程数据源(URL或文件路径)的资源,但不由管道处理并保持原样。 流式资源是可以由管道处理的资源,其输出作为结果数据包的一部分保存。 如果某个资源没有模式,则通过从数据源中的每一列创建 参数: 如果省略,则数据包中的所有资源都将流式传输。 示例: 此处理器还支持加载纯文本资源(例如HTML页面)并将其作为表格数据处理-使用单个"数据"列拆分成行。
要启用此行为,请将以下属性添加到资源: 出于向后兼容的原因,保留了"转储到SQL"的别名。 将数据包保存到文件系统路径。 参数: 如果该路径不存在,则将创建该路径以及内部数据包路径。 如果省略,则假定为 将数据包保存到压缩存档。 参数: 对于任何非琐碎的处理任务,您可能会遇到使用标准库处理器无法解决的问题,这是非常合理的。 为此,您可能需要编写自己的处理器-以下是处理方法。 有两种用于编写处理器的API—高级API和低级API。 高级api对大多数处理器类型都非常有用: 高级api由一个方法 必要时也可用于初始化代码。 它有以下参数: 并生成零个或多个已处理行。 在某些情况下,高级api可能过于受限。在这种情况下,您应该考虑使用低级api。 上面的代码片段显示了大多数低级处理器的结构。 我们总是从调用 我们通过调用 我们将从上面的相同处理器开始,现在用低级API实现。 下一个例子展示了如何实现一个简单的web scraper。虽然不是严格要求,但web刮板通常是管道中的第一个处理器。因此,它们可以忽略传入的数据包和资源迭代器,因为没有以前的处理器生成数据: 在这个例子中,我们可以看到初始的数据包是从头开始生成的,而资源迭代器实际上是一个scraper,在从ckan实例api接收行时生成它们。 在特定问题域中编写管道时,可能会发现开发的处理管道遵循某种模式。抓取或获取源数据往往是相似的。处理、数据清理和验证通常是相同的。 为了便于维护和避免样板,可以编写数据包管道插件。 插件是名为 源描述符是包含用于创建完整管道的信息的yaml文件。 这个类应该继承自 假设我们编写了一个数据包管道插件,用于从ckan实例中提取数据。 下面是这样一个假设生成器的外观: 在这种情况下,如果我们存储一个类似这样的文件: 然后当运行dpp时,我们将看到一个名为 这个管道在内部由3个步骤组成: 源描述符可以具有与输出管道的参数域最匹配的任何结构。但是,它必须有一个一致的结构,由json模式文件支持。在我们的例子中,模式可能如下: 在某些情况下,生成器也希望提供处理器代码(与管道定义一起)。
为此,生成器可以向包含处理器代码的任何步骤添加 为了实现这一点,您只需将 在本例中,此管道设置为每小时运行一次,每小时运行一次。 要运行芹菜守护进程,请使用 运行此服务器将从执行所有"脏"任务开始,并根据任务的计划继续执行任务。 作为启动调度程序和仪表板的快捷方式(见下文),您可以使用预先构建的Docker图像: 然后浏览到 当安装在服务器上或使用任务调度程序运行时,通常很难确切地知道每个管道的运行情况和状态。 为了简化操作,您可以旋转Web仪表板,以提供每个管道的状态、基本信息和最新执行结果的概述。 要启动web服务器,请从命令行运行dpp serve并浏览到http://localhost:5000
环境变量 仪表板端点可以通过添加带有环境变量 对于单个管道和管道集合,甚至更简单的管道状态都可以使用状态标记。对于单个管道,将完整的管道ID添加到徽标端点: 或者对于管道集合: 请注意,无论 数据包管道可以对任何管道事件调用预定义的webhook。这可能允许潜在的集成与其他应用程序一起使用。 要在特定管道中添加Webhook,请在管道定义中添加资源
-要从中删除列的资源。与资源的语义相同
在stream_remote_资源中
字段
-要删除的字段(列)名称列表(用于匹配字段名称的精确名称或正则表达式)regex
-如果设置为false
字段名将被解释为字符串而不是正则表达式(默认情况下true
)世界人口
资源中删除
国家名称
和
人口普查
列:
$ pip install datapackage-pipelines
1
添加计算字段
添加计算字段
接受要添加到现有资源的资源和字段列表。它将为每个资源输出包含新字段(列)的行。添加计算字段
允许在将值插入目标字段之前执行各种操作。资源
-要添加的资源字段。与stream\u remote\u资源中的
resources
的语义相同
字段
-要对目标字段执行的操作列表。
操作
:对同一行的预定义列的值执行的操作。可用操作:
常量
-添加常量值求和
-行中给定列的求和值。平均值
-一行中给定列的平均值。min
-行中给定列的最小值。max
-行中给定列的最大值。相乘
-一行中给定列的乘积。连接
-将两个或多个列值连接到一行中。格式
-用于形成值的python格式字符串,例如:我的名字是{名字}
目标
-新字段的名称。source
-应该对其执行操作的列的列表(在format
和constant
的情况下不需要)。带
-字符串传递给常量
,格式
或连接操作
常量
-用作常量值格式中-用作具有现有列值的python格式字符串,例如:
{名字}{姓氏}
连接中-用作分隔符
salaries
resource$ pip install datapackage-pipelines
2
工资
)的数据如下: < /广告><正文>名字
姓氏
< < < < / > > FEB < < < / > >约翰
DOE … < /广告><正文>名字
姓氏
姓氏
< < < < / > > FEB < < < / > >平均值
总计
状态 约翰
DOE 无名氏
单曲 … 查找并替换
资源
-用于清除字段值的资源。与stream\u remote\u资源中的
资源的语义相同
字段
-要替换值的字段列表名称
-要替换值的字段的名称模式
-要从字段中查找和替换的模式列表
查找
-字符串,i作为正则表达式尝试匹配字段值替换
-字符串,解释为替换匹配模式的正则表达式$ pip install datapackage-pipelines
3
日期
)的数据如下: < /广告><正文>年
四分之一
2000(1) 2000年第一季度 … < /广告><正文>年
四分之一
2000年 2000年3月31日
… 取消激活
资源
-要取消激活的资源。与stream\u remote\u资源中的
resources
的语义相同
extrakeyfields
-目标字段定义列表,每个定义是一个至少包含这些属性的对象(此处将显示未激活的列值)
名称
-目标字段的名称键入
-目标字段的类型extravaluefield
-目标字段定义-至少包含这些属性的对象(此处将显示未激活的单元格值)
名称
-目标字段的名称键入
-目标字段的类型unpivot
-源字段定义列表,每个定义是一个至少包含这些属性的对象
名称
-要么是简单的名称,要么是一个正则表达式,将原始字段的名称与unpivot匹配。键
-目标字段名和原始字段值之间的映射
extrakeyfields
名称的正则表达式
year
,direction
和amount
$ pip install datapackage-pipelines
4
balance
)包含如下数据: < /广告><正文>公司
2015年收入
2015年费用
2016年收入
2016年费用
Inc/ 1000 2000年 1700 2000年 3000 2000年 … < /广告><正文>公司
年
方向
金额 Inc/ 2015年 < < > >1000 Inc/ 2015年 外 Inc/ 2016年 2000年 Inc/ 2016年 外 1700 2015年 2000年 2015年 外 2016年 3000 2016年 外 2000年 … $ pip install datapackage-pipelines
5
转储到SQL
引擎
-用于连接到SQL数据库的连接字符串(URL语法)
还支持env://<;环境变量>;
,它表示应该从指定的环境变量中获取连接字符串。
如果未指定,则假定默认值为env://dpp_db_engine
表
-资源与数据库表之间的映射。键是表名,值是具有以下属性的对象:
资源名称
-应转储到表中的资源的名称模式
如何将数据写入数据库。
可能值:
重写
(默认值)-重写表,将删除所有以前的数据(如果有)。追加
-在不更改现有数据的情况下写入新行。更新
-根据一组"更新键"更新表。
对于每个新行,查看数据库中是否已经存在可以更新的现有行(即现有行
所有更新键中的值都相同)。
如果是,则更新现有行中的其余列。否则-在数据库中插入新行。更新键
-仅适用于更新
模式。用于检查行是否存在的字段名列表。
如果未指定,则将使用架构的primarykey
作为默认值。索引
-tbdupdated_column
-用布尔值添加到spewed数据的列的可选名称
true
-行已更新false
-已插入行updated_id_column
-将添加到spewed数据并包含数据库中已更新行的id的列的可选名称。转储到路径
输出路径
-存储datapackage.json的输出路径的名称。
(当前目录)。
强制格式
-指定是否强制以相同格式生成所有输出文件true
(默认值),则所有资源都将使用相同的格式
false
,则将从文件扩展名推导格式。具有未知扩展名的资源将被丢弃。format
-指定要生成的输出文件类型(如果force format
为true):csv
(默认值)或json
将文件哈希添加到路径
:指定是否将文件md5哈希包含到资源路径中。默认为false
。如果true
将散列嵌入如下路径:路径/to/the/file.ext
path/to/the/hash/file.ext
计数器
-指定是对数据的行、字节还是MD5哈希进行计数,以及数据应存储在哪里。具有以下属性的对象:datapackage row count
:数据包的总行计数应存储在何处(默认值:count/u rows
)数据包字节
:数据包的总字节计数应存储在哪里(默认值:字节
)数据包哈希
:数据包的md5哈希应存储在哪里(默认值:哈希
)资源行数
:每个资源的总行数应存储在哪里(默认值:行数
)资源字节
:每个资源的总字节计数应存储在哪里(默认值:字节
)资源哈希
:每个资源的md5哈希应存储在哪里(默认值:哈希
)
每个属性都可以ld设置为空以防止计数。
每个属性都可以是点分隔的字符串,用于将数据存储在嵌套对象中(例如stats.rowcount
)pretty descriptor
:指定数据包描述符(datapackage.json
)文件的外观:false
(默认)-描述符将写入一行。true
-描述符对每个键都有缩进和新行,因此它变得更易于阅读。转储到邮政编码
输出文件
-压缩数据将存储的输出文件的名称强制格式
和格式
-与转储到路径中相同
将文件哈希添加到路径
-与dump\u to\u path中的相同
计数器
-与转储到路径中的相同
漂亮的描述符
-与"转储到"路径中的相同不推荐使用的处理器
添加元数据
update_package
的别名。添加资源
名称
和URL
属性,以及其他可选属性,这些属性是在规范中定义的。
url
指示此资源的数据所在的位置。稍后,当stream_remote_resources
运行时,它将使用url
(存储在dpp:streamedfrom
属性的资源中)读取数据行并将其推入管道。url
还支持env://<;环境变量>;
,这表示应该从指定的环境变量获取资源url。这在为字符串提供敏感信息(例如用于从数据库表流式处理的SQL连接字符串)时非常有用。tabulator.stream
实例的参数(请参见api引用rel="nofollow">api)。
除此之外,还可以传递一个常量
参数,该参数应该是头到字符串值的映射。
当与stream_remote_resources一起使用时,这些常量值将添加到每个生成的行中
(以及默认模式)。stream_remote_resources
处理器生成的默认模式。
如果指定了路径,则将使用该路径。否则,stream_remote_resources
处理器将为您分配路径
和csv
扩展名。$ pip install datapackage-pipelines
6
流远程资源
字符串
字段,在此处自动生成默认模式。资源
-要传输哪些资源。可以是:忽略丢失
-如果为true,则丢失的资源不会引发错误,而是将被视为"空"(即零行)。
具有空URL的资源将被视为相同的资源(即将生成"空"资源)。限制行
-如果提供,将限制从源中获取的行数。获取一个整数值,该整数值指定要传输的源的行数。$ pip install datapackage-pipelines
7
"format":"txt"
转储到SQL
转储到路径
输出路径
-存储datapackage.json的输出路径的名称。
(当前目录)。
强制格式
-指定是否强制以相同格式生成所有输出文件true
(默认值),则所有资源都将使用相同的格式
false
,则将从文件扩展名推导格式。具有未知扩展名的资源将被丢弃。format
-指定要生成的输出文件类型(如果force format
为true):csv
(默认值)或json
handle non-tabular
-指定是否应将非表格式资源(即没有架构的资源
)转储到结果数据包。
(有关详细信息,请参见下面的注释)将文件哈希添加到路径
:指定是否将文件md5哈希包含到资源路径中。默认为false
。如果true
将散列嵌入如下路径:路径/to/the/file.ext
path/to/the/hash/file.ext
计数器
-指定是对数据的行、字节还是MD5哈希进行计数,以及数据应存储在哪里。具有以下属性的对象:datapackage row count
:数据包的总行计数应存储在何处(默认值:count/u rows
)数据包字节
:数据包的总字节计数应存储在哪里(默认值:字节
)数据包哈希
:数据包的md5哈希应存储在哪里(默认值:哈希
)资源行数
:每个资源的总行数应存储在哪里(默认值:行数
)资源字节
:每个资源的总字节计数应存储在哪里(默认值:字节
)资源哈希
:每个资源的md5哈希应存储在哪里(默认值:哈希
)
为了防止计数,这些属性中的每一个都可以设置为空。
每个属性都可以是点分隔的字符串,用于将数据存储在嵌套对象中(例如stats.rowcount
)pretty descriptor
:指定数据包描述符(datapackage.json
)文件的外观:false
(默认)-描述符将写入一行。true
-描述符对每个键都有缩进和新行,因此它变得更易于阅读。文件格式化程序
:指定自定义文件格式处理程序。一个具有格式名到python模块和类名映射的对象。csv
和json
格式处理程序或添加对新格式的支持。自定义格式化程序
管道下),xlsxformat类转储.to_zip
输出文件
-压缩数据将存储的输出文件的名称强制格式
和格式
-与转储到路径中相同
处理非表格格式
-与转储到路径中的相同
将文件哈希添加到路径
-与dump\u to\u path中的相同
计数器
-与转储到路径中的相同
漂亮的描述符
-与"转储到"路径中的相同文件格式化程序
-与"转储到"路径中的相同注意
dump.to_path
和dump.to_zip
处理器也将处理非表格资源。
这些资源必须同时具有url
和path
属性,并且不能包含schema
属性。
在这种情况下,将从url
下载文件,并将其放置在提供的路径
中自定义处理器
高级处理器API
$ pip install datapackage-pipelines
8
process
组成,它有两个函数:修改数据包
-对数据包描述符进行更改(如有必要),例如添加元数据、添加资源、修改资源的架构等。data package
是需要修改的当前数据包描述符。
需要返回修改后的数据包描述符。parameters
是包含处理器参数的dict,如pipeline-spec.yaml
文件中提供的。stats
是一个dict,为了收集过程中的度量和测量(例如验证检查、行计数等),应该对其进行修改。处理行
-修改流中的一行。它接收这些参数:行
是包含要处理的行的字典行索引
是资源中该行的索引资源描述符
是当前正在处理的资源的描述符对象资源索引
是数据包中资源的索引parameters
是包含处理器参数的dict,如pipeline-spec.yaml
文件中提供的。stats
是一个dict,为了在过程(例如验证检查、行计数等)几个例子
$ pip install datapackage-pipelines
9
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
0
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
1
低级处理器API
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
2
insert()
开始-此方法为我们提供上下文,其中包含执行参数、数据包描述符(从上一步输出)和所有流式资源行上的迭代器。spew()
来完成处理,后者将处理后的数据发送到管道中的下一个处理器。spew
接收:finalizer
函数,它在完成对资源的迭代之后,但在向其他处理器发送完成的信号之前将被调用。例如,您可以使用它关闭任何打开的文件。更深入的解释
spew
按以下顺序写入接收到的数据:datapackage
参数写入流。
这意味着对数据包描述符的所有修改都必须在调用spew之前完成。
一个常见的陷阱是修改资源迭代器中的数据包描述符-尽量避免这样做,因为下一个处理器将接收到的描述符将是错误的。摄取
返回的资源迭代器)的迭代。反过来,这会导致读取进程的输入流。由于操作系统中缓冲的工作方式,"慢"处理器将缓慢读取其输入,导致之前的处理器在其CPU密集型处理器完成处理时在IO上休眠。"快速"处理器不会漫无目的地工作,而是在等待输入数据或等待输出缓冲区耗尽时休眠。
这里所得到的结果是,数据中的所有行都或多或少地同时被处理,并且没有一个处理器在可能在后续处理步骤中失败的行上工作得太"超前"。
finalizer
方法(如果我们收到一个)。几个例子
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
3
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
4
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
5
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
6
插件和源描述符
datapackage\u pipelines\lt;plugin name>;
的python模块。插件可以提供两种功能:foo
都可以在管道内用作<;plugin name>;.foo
模块。
generator
存在于datapackage\u pipelines\lt;plugin name>;
模块中,则它将用于基于模板生成管道-我们称之为"源描述符"。源描述符
dpp
将查找名为<;plugin name>;.source-spec.yaml
的文件,并将它们视为管道生成代码的输入-应在datapackage\u pipelines\lt;plugin name>;中名为
模块。generator
的类中实现;generatorbase
并且应该实现两个方法:生成管道-
它接收源描述并返回形式为
(id,details)
的元组迭代器。
id
可能是管道id,在这种情况下,详细信息将是包含管道定义的对象。
如果id
的格式为:module:
,则详细信息将被视为来自指定模块的源规范。这样,生成器可能会生成其他源规范。获取模式
-它应该返回一个json模式,用于验证源描述的结构示例
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
7
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
8
/example com的可用管道包列表
ckan.scraper
,元数据
和转储到压缩文件
验证源描述符
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)
$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO :Processed 264 rows
set_types: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO :RESULTS:
INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
9
dpp
将确保源描述符文件在尝试使用generator
类将其转换为管道之前符合该架构。提供处理器代码
code
属性。执行此步骤时,不会像往常一样尝试解析处理器,而是会改为解析提供的代码。按计划运行
datapackage pipelines
带有芹菜式集成,允许通过类似于crontab的语法在特定时间运行管道。
schedule
部分添加到pipeline-spec.yaml
文件中(或者从生成器类返回一个schedule,见上文),如下所示:$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
0
celeri
的命令行界面运行datapackage\u pipelines.app
。有一种方法$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
1
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
2
http://<;docker machine的ip address>;:5000/
查看当前执行状态仪表板。管道仪表板和状态标牌
dpp_base_path
将确定仪表板是从根路径还是从另一个基本路径(示例值:/pipelines/
)提供服务。dpp_basic_auth_username
和dpp_basic_auth_password
的用户名和密码来要求验证。
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
3
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
4
dpp_basic_auth_password
和dpp_basic_auth_username
设置如何,这些徽章端点都将始终公开。与其他服务集成
hooks
属性,该属性应该是URL列表。
每当该管道排队、开始运行或结束运行时,所有URL都将使用此负载进行发布:$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines
<available-pipelines>
$ docker run -it -v `pwd`:/pipelines:rw \
frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
5
已知问题
推荐PyPI第三方库