{{description}}

datapackage-pipelines的Python项目详细描述


数据包管道

特拉维斯工作服pypi-python version

基本知识

这是什么?

datapackage pipelines是用于表格式数据的声明性流处理的框架。它建立在无摩擦数据项目的概念和工具之上。

管道

这个框架的基本概念是管道。

管道有一个处理步骤列表,它生成一个数据包作为其输出。每个步骤在处理器中执行,包括以下阶段:

  • 修改数据包描述符-例如:添加元数据、添加或删除资源、更改资源的数据架构等。
  • 处理资源-按顺序处理每个资源的每一行。处理器可以删除行、添加新行或修改其内容。
  • 返回统计信息-如有必要,处理器可以报告一个数据字典,当管道执行终止时,该字典将返回给用户。例如,这可用于计算处理数据的质量度量。

并非每个处理器都需要完成所有这些任务。事实上,您经常会发现每个处理步骤只执行其中的一个。

pipeline-spec.yaml文件

管道是以声明的方式定义的,而不是在代码中。可以在pipeline-spec.yaml文件中定义一个或多个管道。此文件指定每个处理器的处理器列表(按名称引用)和执行参数。

下面是一个pipeline-spec.yaml文件的示例:

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip

在本例中,我们看到一条名为worldbank-co2-emissions的管道。它的管道由4个步骤组成:

  • 元数据:这是一个库处理器(见下文),它修改数据包的描述符(在本例中为初始的空描述符)-向数据包添加名称标题和其他属性。
  • 加载:这是另一个库处理器,它将数据加载到数据包中。 此资源有一个名称和一个from属性,指向数据的远程位置。
  • 设置类型:此处理器将数据类型分配给数据中的字段。在本例中,看起来像年份的字段标题将被指定为数字类型。
  • dump_to_zip:使用提供的文件名创建一个经过压缩和验证的数据包。

力学

管道如何运行的一个重要方面是数据以流的形式从一个处理器传递到另一个处理器。如果我们在这里得到"technical",那么每个处理器都在其自己的专用进程中运行,数据包从其stdin中读取并输出到其stdout。这里要注意的重要一点是,没有处理器在任何一点保存整个数据集。

限制是通过设计来实现的—使每个处理器的内存和磁盘需求受到限制,并且与数据集大小无关。

快速启动

首先,在当前目录中创建一个pipeline-spec.yaml文件。如果您只是想试用一下,您可以使用上面的文件。

然后,您可以在本地安装数据包管道注意,由于使用了类型提示和高级异步使用:

$ pip install datapackage-pipelines

现在您应该可以使用dpp命令:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}

或者,您可以使用我们的Docker图像:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>

命令行界面-dpp

使用dpp工具从命令行运行管道。

在不带任何参数的情况下运行dpp,将显示可用管道的列表。这是通过扫描当前目录及其子目录、搜索pipeline-spec.yaml文件并提取其中描述的管道规范列表来完成的。

每个管道都有一个标识符,由pipeline-spec.yaml文件的路径和该描述文件中定义的管道名称组成。

要运行管道,请使用dpp run<;pipeline id>;

您还可以使用dpp run all来运行所有管道,并使用dpp run dirty来运行仅dirty管道(稍后详细说明)。

深入了解管道

处理器分辨率

如前所述,处理器是按名称引用的。

实际上,这个名称是包含处理代码的python脚本的名称(减去.py扩展名)。当试图找到需要执行的实际代码的位置时,处理器解析器将在这些预定义的位置进行搜索:

  • 首先,它将尝试在pipeline-spec.yaml文件的目录中找到具有该名称的自定义处理器。 处理器名称支持点表示法,因此您可以编写mycode.custom_processor,它将尝试在mycode目录中找到名为custom_processor.py的处理器,路径与管道规范文件相同。 对于这个特定的解析阶段,如果您要编写。custom_processor它将尝试在管道规范文件的父目录中找到该处理器。 (继续阅读有关如何编写自定义处理器的说明)
  • 如果处理器名称看起来像myplugin.somename,它将尝试在myplugin插件中找到名为somename的处理器。也就是说,它将看到是否有一个名为myplugin的插件,如果有,该插件是否发布了一个名为somename的处理器(更多关于下面的插件)。
  • 如果在此之前未找到处理器,则它将尝试在处理器搜索路径中搜索此处理器。处理器搜索路径取自环境变量dpp_processor_path。路径中的每个分离路径都被视为解析处理器的可能起点。
  • 最后,它将尝试在与此软件包捆绑在一起的标准处理器库中找到该处理器。

从扫描管道规格中排除目录

默认情况下,*目录不包括在扫描范围内,您可以为 通过在项目根目录中创建.dpp_spec_ignore文件排除。这个文件有类似的语法 到.gitignore并将根据全局模式匹配从扫描中排除目录。

例如,下面的文件将忽略test*目录,包括inside子目录 并且/docs目录将只在项目根目录中被忽略

test*
/docs

缓存

通过将特定管道步骤上的cached属性设置为true,此步骤的输出将存储在磁盘上(在.cache目录中,与pipeline-spec.yaml文件位于同一位置)。

重新运行管道将利用该缓存,从而避免执行缓存步骤及其前身。

在内部,为管道中的每一步计算一个散列,这是基于处理器的代码、它的参数和它的前一步的散列。如果存在与特定步骤具有完全相同哈希值的缓存文件,则可以将其删除(及其前置任务),并将该缓存文件用作管道的输入

这样,当代码或执行参数更改时(对于缓存的处理器或之前的任何处理器),缓存将变为无效。

脏任务和保持状态

缓存哈希还用于查看管道是否"脏"。当管道成功完成执行时,dpp将缓存哈希与管道id一起存储。如果存储的哈希与当前计算的哈希不同,则表示代码或执行参数已被修改,管道需要重新运行。

dpp可用于两个存储后端。对于本地运行,它使用pythonsqlite db来存储每个运行任务的当前状态,包括最后的结果和缓存散列。state db文件存储在名为.dpp.db的文件中,该文件位于运行dpp的同一目录中。

对于其他安装,特别是使用任务调度程序的安装,建议使用redis后端。要启用redis连接,只需将dpp_redis_host环境变量设置为指向正在运行的redis实例即可。

管道依赖项

您可以声明一个管道依赖于另一个管道或数据包。计算管道的缓存哈希值时会考虑此依赖关系,这反过来会影响缓存文件的有效性和"脏"状态:

  • 对于管道依赖项,在计算中使用该管道的哈希值
  • 对于数据包依赖项,在计算中使用数据包中的hash属性

如果缺少依赖项,则管道将标记为"无法执行"。

通过pipeline-spec.yaml文件中的管道定义的dependencies属性来声明依赖项。 此属性应包含依赖项列表,每个依赖项都是具有以下格式的对象:

  • 一个名为pipeline的键,其值是要依赖的管道id
  • 名为datapackage的单个键,其值是要依赖的数据包的标识符(或url)。

示例:

cat-vs-dog-populations:dependencies:-pipeline:./geo/region-areal-datapackage:http://pets.net/data/dogs-per-region/datapackage.json-datapackage:http://pets.net/data/dogs-per-region...

正在验证

自动验证每个处理器的输入是否正确:

  • 数据包在传递给处理器之前总是经过验证,因此处理器不可能以使其无效的方式修改数据包。

  • 除非通过在步骤信息中将validate标志设置为true显式请求,否则不会根据相应的json表模式验证数据。 这样做有两个主要原因:

    • 从性能上看,验证每个步骤中的数据都非常占用CPU
    • 在某些情况下,您在一个步骤中修改模式,在另一个步骤中修改数据,因此您只想在所有更改完成后验证数据

    在任何情况下,当使用set_types标准处理器时,它将使用新类型验证和转换输入数据。

数据流集成

数据流是数据包管道的继承者,提供了更多 与正在运行的管道的pythonic接口。您可以使用flow属性在管道规范中集成数据流 而不是运行。例如,给定以下流文件,保存在my flow.py下:

from dataflows import Flow, dump_to_path, load, update_package

def flow(parameters, datapackage, resources, stats):
    stats['multiplied_fields'] = 0

    def multiply(field, n):
        def step(row):
            row[field] = row[field] * n
            stats['multiplied_fields'] += 1
        return step

    return Flow(update_package(name='my-datapackage'),
                multiply('my-field', 2))

以及同一目录下的pipeline-spec.yaml

my-flow:
  pipeline:
  - run: load_resource
    parameters:
      url: http://example.com/my-datapackage/datapackage.json
      resource: my-resource
  - flow: my-flow
  - run: dump_to_path

您可以使用dpp run my flow来运行管道

标准处理器库

库中提供了一些内置处理器。

更新软件包

将元数据添加到数据包。

参数

可以在此处提供任何允许的属性(根据规范)。

示例

-run:update_packageparameters:name:routes-to-mordorlicense:CC-BY-SA-4author:Frodo Baggins <frodo@shire.me>contributors:-samwise gamgee <samwise1992@yahoo.com>

更新资源

将元数据添加到资源中。

参数

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
0

加载

将数据加载到包中,推断架构并可选地强制转换值。

参数

  • 来自要加载的数据的位置。这可以是:
    • 本地路径(例如/path/to/the/data.csv)
    • 一个远程url(例如,https://path.to/the/data.csv" rel="nofollow">https://path.to/the/data.csv)
    • 其他支持的链接,基于当前对制表器中的方案和格式的支持
    • data package.json文件的本地路径或远程url(例如https://path.to/data-package/datapackage.json" rel="nofollow">https://path.to/data-package/datapackage.json)
    • 对包含源位置的环境变量的引用,格式为
    • 元组包含(数据包描述符、资源迭代器)
  • 资源-可选,仅当源指向datapackage.json文件或datapackage/resource元组时才相关。值应为以下值之一:
    • 要加载的单个资源的名称
    • 匹配要加载的资源名称的正则表达式
    • 要加载的资源名称列表
    • "none"表示加载所有资源
    • 包中资源的索引
  • validate-是否应将数据强制转换为推断的数据类型。仅当不从数据包加载数据时才相关。
  • 其他选项-基于加载的文件,附加选项(例如,Excel文件的工作表等,请参见上面表格的链接)

打印机

只要把看到的都打印出来。有利于调试。

参数

  • 行数-修改要预览的行数,打印机将从流中的不同位置打印此行数的多个示例
  • 最后一行-流中要打印的最后几行。可选,默认为num_rows的值
  • 字段-可选,要预览的字段名列表
  • 资源-可选,允许限制打印的资源,语义与加载处理器资源参数

设置类型

将数据类型和类型选项设置为流式资源中的字段,并确保数据仍然使用新类型进行验证。

这允许对现有的表模式进行修改,通常也可以从stream\u remote\u resources

参数

  • 资源-要修改哪些资源。可以是:

    • 字符串列表,解释为要传输的资源名称
    • 字符串,解释为用于匹配资源名称的正则表达式

    如果省略,则数据包中的所有资源都将流式传输。

  • regex-如果设置为false字段名将被解释为字符串而不是正则表达式(默认情况下true

  • 类型-字段名和字段定义之间的映射。

    • 字段名可以是字段名,也可以是匹配多个字段的正则表达式。
    • 字段定义是一个遵循json表模式规范的对象。您可以使用null而不是对象从架构中删除字段。

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
1

加载元数据

从现有数据包加载元数据。

参数

从位于url的数据包加载元数据

将复制加载的数据包的所有属性(除了资源

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
2

加载资源

从现有数据包加载表格资源。

参数

从位于url的数据包加载resource参数中指定的资源。 将复制加载资源的所有属性-路径架构包括在内。

  • url-指向所需资源所在的数据包的url

  • 资源-可以是

    • 字符串列表,解释为要加载的资源名
    • 字符串,解释为用于匹配资源名称的正则表达式
    • 一个整数,表示数据包中资源的索引(基于0)
  • 限制行-如果提供,将限制从源中获取的行数。获取一个整数值,该整数值指定要传输的源的行数。

  • 记录进度行-如果提供,将记录加载进度。取一个整数值,指定记录进度的行数间隔。

  • -如果提供并设置为false,则资源将添加到数据包中,但不会流化。

  • 资源-可以使用而不是资源属性来支持加载资源和修改输出资源元数据

    • 值是一个dict,它包含要加载的源资源名称和要应用于已加载资源的dict包含描述符更新之间的映射
  • 必需-如果提供并设置为false,则在数据包不可用或缺少资源时不会失败

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
3

连接

连接多个流式资源并将其转换为单个资源。

参数

  • -要连接哪些资源。语义与stream\u remote\u resources中的resources相同

    如果省略,数据包中的所有资源都将连接起来。

    <资源>连接必须在数据包中以连续顺序出现。

  • target-保存连接数据的目标资源。应至少定义以下属性:

    • 名称-资源的名称
    • 路径-此文件的数据包中的路径。

    如果省略,目标资源将接收名称concat,并将保存在数据包中的data/concat.csv

  • 字段-源和目标之间的字段映射,以便键是目标字段名,值是字段名的列表。

    此映射用于创建目标资源架构。

    请注意,目标字段名总是假定映射到自身。

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
4

在本例中,我们将所有看起来像report year-<;year>;的资源连接起来,并将它们输出到多年报告资源中。

输出包含两个字段:

  • 活动,在所有源中称为活动
  • 金额,在不同的资源中有不同的名称(例如金额2009年金额金额等)

加入

加入两个流式资源。

"加入"在我们的例子中意味着获取目标资源,并通过在源资源中查找数据向其每一行添加字段。

join操作的一个特殊情况是没有目标流,并且源中的所有唯一行都用于创建它。 此模式称为"重复数据消除"模式-将创建目标资源,并将源中的重复数据消除行添加到其中。

参数

  • source-有关源的信息资源

    • 名称-资源的名称
      • 应用作查找键的字段名列表
      • 字符串,它将被解释为python格式的字符串,用于形成键(例如{<;field_name_1>;}:{field_name_2}
    • 删除-加入后从数据包中删除(false默认)
  • target-保存关联数据的目标资源。应至少定义以下属性:

    • 名称-如
    • -如中所示,或用于创建目标资源并执行重复数据消除
  • 字段-从源资源到目标资源的字段映射。 键应该是目标资源中的字段名。 值可以定义两个属性:

    • 名称-源中的字段名(默认情况下与目标字段名相同)

    • 聚合-聚合策略(如何使用同一个键处理多个行)。可以选择以下选项:

      • 求和-汇总聚合值。 对于数值,它是算术和;对于字符串,字符串和其他类型的连接将出错。

      • 平均值-计算聚合值的平均值。

        对于数值,它是算术平均值,对于其他类型,它将出错。

      • max-计算聚合值的最大值。

        对于数值,它是算术最大值;对于字符串,它是字典最大值;对于其他类型,它将出错。

      • min-计算聚合值的最小值。

        对于数值,它是算术最小值;对于字符串,它是字典最小值;对于其他类型,它将出错。

      • 第一个-获取遇到的第一个值

      • 最后一个-获取遇到的最后一个值

      • 计数-计数特定键的出现次数 对于此方法,不需要指定名称。如果指定了,则count将计算该源字段的非空值数。

      • 计数器-计算不同值的出现次数 将返回一个2元组数组,格式为[值,值的计数]

      • 设置-收集聚集字段的所有不同值,无序

      • 数组-按外观顺序收集聚合字段的所有值

      • 任意-选择任意值。

      默认情况下,聚合接受任意值。

    如果不需要指定名称聚合,则映射可以映射到空对象{}null

  • 完整-布尔型,

    • 如果true(默认值),在源中查找失败将在源中产生"空"值。
    • 如果false,则在源中查找失败将导致从目标中删除行。

重要提示:"源"资源必须出现在数据包中"目标"资源之前。

示例

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
5

上面的例子旨在创建一个包含世界上每个国家的GDP和人口的包。

我们有一个资源(世界人口)数据如下:

<表><广告>国家代码 国家名称 2000年人口普查 2015年人口普查 < /广告><正文>英国英国5885700464715810…

另一种资源(Country_GDP_2015)的数据如下:

<表><广告> cc国内生产总值(百万英镑)净债务(百万英镑)< /广告><正文>英国18323181606600…

join命令将基于国家代码/cc字段匹配两个数据集中的行,然后将2015年人口普查字段中的值复制到新的人口字段中。

结果数据包将删除世界人口资源和2015年国家GDP资源,如下所示:

<表><广告> cc国内生产总值(百万英镑)净债务(百万英镑)人口 < /广告><正文>英国1832318160660064715810…

一个更复杂的例子:

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
6

本例旨在分析米高梅电影公司的银幕演员的工资。

再一次,我们有一个资源(screen\u actor\u salaries)的数据看起来像:

<表><广告>年 生产 演员 工资 < /广告><正文>2016年眩晕2T先生150000002016年眩晕2小罗伯特·唐尼700万2015年秋天-复活詹尼弗·劳伦斯1800万2015年alf-返回melmack岩石12000000…

另一个资源(米高梅电影)的数据如下:

<表><广告>标题 导演 制作人 < /广告><正文>眩晕2(2016年)林赛·罗汉李嘉诚iRobot-电影(2018年)T先生T先生…

join命令将根据电影名称和制作年份匹配两个数据集中的行。注意我们如何通过使用不同的键模式来克服不兼容的字段。

生成的数据集可能如下所示:

<表><广告>标题 导演 制作人 演员数量 平均工资 工资总额 < /广告><正文>眩晕2(2016年)林赛·罗汉李嘉诚1100000022000000…

过滤器

过滤流式资源。

筛选器接受相等和不相等条件,并测试选定资源中的每一行。如果所有条件都无效,则该行将被丢弃。

参数

  • 资源-要对哪些资源应用筛选器。与stream\u remote\u资源中的resources的语义相同
  • 中-键到值的映射,这些值转换为行[键]==值条件
  • out-键到值的映射,这些值转换为行[key]!=值条件

in和out都应该是一个对象列表。但是,out应该只有一个元素。

示例

仅过滤美国和欧洲国家,排除主要语言为英语的国家:

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
7

要按多个值筛选出out,需要多个筛选处理器,而不是多个out元素。否则,某些条件将始终有效,并且不会对任何行进行讨论:

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
8

排序

按键对流式资源进行排序。

sort接受资源列表和键(作为行字段上的python格式字符串)。 它将输出每个资源的行,按键排序(默认按升序)。

参数

  • 资源-要排序的资源。与stream\u remote\u资源中的resources的语义相同
  • -string排序,它将被解释为用于形成键的python格式字符串(例如,{<;field_name_1>;}:{field_name_2}
  • 反转-可选布尔值,如果设置为true-按反转顺序排序

示例

仅过滤美国和欧洲国家,排除主要语言为英语的国家:

worldbank-co2-emissions:title:CO2 emission data from the World Bankdescription:Data per year, provided in metric tons per capita.pipeline:-run:update_packageparameters:name:'co2-emissions'title:'CO2emissions(metrictonspercapita)'homepage:'http://worldbank.org/'-run:loadparameters:from:"http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel"name:'global-data'format:xlsheaders:4-run:set_typesparameters:resources:global-datatypes:"[12][0-9]{3}":type:number-run:dump_to_zipparameters:out-file:co2-emissions-wb.zip
9

重复

复制资源。

duplicate接受数据包中单个资源的名称。 然后它将在输出数据包中复制它,并使用不同的名称和路径。 复制的资源将立即出现在其原始资源之后。

参数

  • 源代码-要复制哪些资源。资源的名称。
  • 目标名称-新的重复资源的名称。
  • 目标路径-新的重复资源的路径。

示例

仅过滤美国和欧洲国家,排除主要语言为英语的国家:

$ pip install datapackage-pipelines
0

删除字段

从流式资源中删除字段(列)

删除字段接受要删除的资源列表和字段列表

注意:如果提供了多个资源,则所有资源都应包含要删除的所有字段

参数

  • 资源-要从中删除列的资源。与资源的语义相同stream_remote_资源中
  • 字段-要删除的字段(列)名称列表(用于匹配字段名称的精确名称或正则表达式)
  • regex-如果设置为false字段名将被解释为字符串而不是正则表达式(默认情况下true

示例

世界人口资源中删除国家名称人口普查列:

$ pip install datapackage-pipelines
1

添加计算字段

向流式资源添加字段

添加计算字段接受要添加到现有资源的资源和字段列表。它将为每个资源输出包含新字段(列)的行。添加计算字段允许在将值插入目标字段之前执行各种操作。

参数

  • 资源-要添加的资源字段。与stream\u remote\u资源中的resources的语义相同
  • 字段-要对目标字段执行的操作列表。
    • 操作:对同一行的预定义列的值执行的操作。可用操作:
      • 常量-添加常量值
      • 求和-行中给定列的求和值。
      • 平均值-一行中给定列的平均值。
      • min-行中给定列的最小值。
      • max-行中给定列的最大值。
      • 相乘-一行中给定列的乘积。
      • 连接-将两个或多个列值连接到一行中。
      • 格式-用于形成值的python格式字符串,例如:我的名字是{名字}
    • 目标-新字段的名称。
    • source-应该对其执行操作的列的列表(在formatconstant的情况下不需要)。
    • -字符串传递给常量格式连接操作
      • in常量-用作常量值
      • 格式中-用作具有现有列值的python格式字符串,例如:{名字}{姓氏}
      • 连接中-用作分隔符

示例

下面的示例将4个新字段添加到salariesresource

$ pip install datapackage-pipelines
2

我们有一个资源(工资)的数据如下:

<表><广告>名字 姓氏 < < < < / > > FEB < < < / > >< /广告><正文>约翰 DOE

生成的数据集可能如下所示:

<表><广告>名字 姓氏 姓氏 < < < < / > > FEB < < < / > >平均值 总计 状态< /广告><正文>约翰 DOE 无名氏 单曲…

查找并替换

从字段值中查找并替换字符串或模式

参数

  • 资源-用于清除字段值的资源。与stream\u remote\u资源中的资源的语义相同

  • 字段-要替换值的字段列表

    • 名称-要替换值的字段的名称
    • 模式-要从字段中查找和替换的模式列表
      • 查找-字符串,i作为正则表达式尝试匹配字段值
      • 替换-字符串,解释为替换匹配模式的正则表达式

示例

下面的示例使用正则表达式和精确的字符串模式替换字段值

$ pip install datapackage-pipelines
3

我们有一个资源(日期)的数据如下:

<表><广告>年 四分之一 < /广告><正文>2000(1)2000年第一季度…

生成的数据集可能如下所示:

<表><广告>年 四分之一 < /广告><正文>2000年2000年3月31日 …

取消激活

取消ivots,转换表格数据,使每行只有一条记录。

参数

  • 资源-要取消激活的资源。与stream\u remote\u资源中的resources的语义相同
  • extrakeyfields-目标字段定义列表,每个定义是一个至少包含这些属性的对象(此处将显示未激活的列值)
    • 名称-目标字段的名称
    • 键入-目标字段的类型
  • extravaluefield-目标字段定义-至少包含这些属性的对象(此处将显示未激活的单元格值)
    • 名称-目标字段的名称
    • 键入-目标字段的类型
  • unpivot-源字段定义列表,每个定义是一个至少包含这些属性的对象
    • 名称-要么是简单的名称,要么是一个正则表达式,将原始字段的名称与unpivot匹配。
    • -目标字段名和原始字段值之间的映射
      • 键应该是extrakeyfields
      • 中的目标字段名
      • 值可以只是要插入的常量值,也可以是匹配名称的正则表达式

示例

下面的示例将数据取消激活为3个新字段:yeardirectionamount

$ pip install datapackage-pipelines
4

我们有一个资源(balance)包含如下数据:

<表><广告>公司 2015年收入 2015年费用 2016年收入 2016年费用 < /广告><正文> Inc/10002000年1700 > org < /td>2000年30002000年…

生成的数据集可能如下所示:

<表><广告>公司 年 方向 金额< /广告><正文> Inc/2015年< < > >1000 Inc/2015年外 Inc/2016年2000年 Inc/2016年外1700 > org < /td>2015年2000年 > org < /td>2015年外 > org < /td>2016年3000 > org < /td>2016年外2000年…

类似的结果可以通过定义正则表达式而不是常量来实现

$ pip install datapackage-pipelines
5

转储到SQL

将数据包保存到SQL数据库。

参数

  • 引擎-用于连接到SQL数据库的连接字符串(URL语法) 还支持env://<;环境变量>;,它表示应该从指定的环境变量中获取连接字符串。 如果未指定,则假定默认值为env://dpp_db_engine
  • -资源与数据库表之间的映射。键是表名,值是具有以下属性的对象:
    • 资源名称-应转储到表中的资源的名称
    • 模式如何将数据写入数据库。 可能值:
      • 重写(默认值)-重写表,将删除所有以前的数据(如果有)。
      • 追加-在不更改现有数据的情况下写入新行。
      • 更新-根据一组"更新键"更新表。 对于每个新行,查看数据库中是否已经存在可以更新的现有行(即现有行 所有更新键中的值都相同)。 如果是,则更新现有行中的其余列。否则-在数据库中插入新行。
    • 更新键-仅适用于更新模式。用于检查行是否存在的字段名列表。 如果未指定,则将使用架构的primarykey作为默认值。
    • 索引-tbd
  • updated_column-用布尔值添加到spewed数据的列的可选名称
    • true-行已更新
    • false-已插入行
  • updated_id_column-将添加到spewed数据并包含数据库中已更新行的id的列的可选名称。

转储到路径

将数据包保存到文件系统路径。

参数

  • 输出路径-存储datapackage.json的输出路径的名称。

    如果该路径不存在,则将创建该路径以及内部数据包路径。

    如果省略,则假定为(当前目录)。

  • 强制格式-指定是否强制以相同格式生成所有输出文件

    • 如果true(默认值),则所有资源都将使用相同的格式
    • 如果false,则将从文件扩展名推导格式。具有未知扩展名的资源将被丢弃。
  • format-指定要生成的输出文件类型(如果force format为true):csv(默认值)或json

  • 将文件哈希添加到路径:指定是否将文件md5哈希包含到资源路径中。默认为false。如果true将散列嵌入如下路径:

    • 如果原始路径是路径/to/the/file.ext
    • 修改后的路径将是path/to/the/hash/file.ext
  • 计数器-指定是对数据的行、字节还是MD5哈希进行计数,以及数据应存储在哪里。具有以下属性的对象:

    • datapackage row count:数据包的总行计数应存储在何处(默认值:count/u rows
    • 数据包字节:数据包的总字节计数应存储在哪里(默认值:字节
    • 数据包哈希:数据包的md5哈希应存储在哪里(默认值:哈希
    • 资源行数:每个资源的总行数应存储在哪里(默认值:行数
    • 资源字节:每个资源的总字节计数应存储在哪里(默认值:字节
    • 资源哈希:每个资源的md5哈希应存储在哪里(默认值:哈希) 每个属性都可以ld设置为空以防止计数。 每个属性都可以是点分隔的字符串,用于将数据存储在嵌套对象中(例如stats.rowcount
  • pretty descriptor:指定数据包描述符(datapackage.json)文件的外观:

    • false(默认)-描述符将写入一行。
    • true-描述符对每个键都有缩进和新行,因此它变得更易于阅读。

转储到邮政编码

将数据包保存到压缩存档。

参数

  • 输出文件-压缩数据将存储的输出文件的名称
  • 强制格式格式-与转储到路径中相同
  • 将文件哈希添加到路径-与dump\u to\u path中的相同
  • 计数器-与转储到路径中的相同
  • 漂亮的描述符-与"转储到"路径中的相同

不推荐使用的处理器

这些处理器将在下一个主要版本中删除。

添加元数据

由于向后兼容的原因,保留了update_package的别名。

添加资源

将新的外部表格资源添加到数据包。

参数

您应该提供a名称URL属性,以及其他可选属性,这些属性是在规范中定义的。

url指示此资源的数据所在的位置。稍后,当stream_remote_resources运行时,它将使用url(存储在dpp:streamedfrom属性的资源中)读取数据行并将其推入管道。

请注意,url还支持env://<;环境变量>;,这表示应该从指定的环境变量获取资源url。这在为字符串提供敏感信息(例如用于从数据库表流式处理的SQL连接字符串)时非常有用。

参数基本上是传递给atabulator.stream实例的参数(请参见api引用rel="nofollow">api)。 除此之外,还可以传递一个常量参数,该参数应该是头到字符串值的映射。 当与stream_remote_resources一起使用时,这些常量值将添加到每个生成的行中 (以及默认模式)。

您也可以在这里提供一个模式,或者使用由stream_remote_resources处理器生成的默认模式。 如果指定了路径,则将使用该路径。否则,stream_remote_resources处理器将为您分配路径csv扩展名。

示例

$ pip install datapackage-pipelines
6

流远程资源

将外部资源转换为流式资源。

外部资源是指链接到远程数据源(URL或文件路径)的资源,但不由管道处理并保持原样。

流式资源是可以由管道处理的资源,其输出作为结果数据包的一部分保存。

如果某个资源没有模式,则通过从数据源中的每一列创建字符串字段,在此处自动生成默认模式。

参数

  • 资源-要传输哪些资源。可以是:

    • 字符串列表,解释为资源名to流
    • 字符串,解释为用于匹配资源名称的正则表达式

    如果省略,则数据包中的所有资源都将流式传输。

  • 忽略丢失-如果为true,则丢失的资源不会引发错误,而是将被视为"空"(即零行)。 具有空URL的资源将被视为相同的资源(即将生成"空"资源)。

  • 限制行-如果提供,将限制从源中获取的行数。获取一个整数值,该整数值指定要传输的源的行数。

示例

$ pip install datapackage-pipelines
7

此处理器还支持加载纯文本资源(例如HTML页面)并将其作为表格数据处理-使用单个"数据"列拆分成行。 要启用此行为,请将以下属性添加到资源:"format":"txt"

转储到SQL

出于向后兼容的原因,保留了"转储到SQL"的别名。

转储到路径

将数据包保存到文件系统路径。

参数

  • 输出路径-存储datapackage.json的输出路径的名称。

    如果该路径不存在,则将创建该路径以及内部数据包路径。

    如果省略,则假定为(当前目录)。

  • 强制格式-指定是否强制以相同格式生成所有输出文件

    • 如果true(默认值),则所有资源都将使用相同的格式
    • 如果false,则将从文件扩展名推导格式。具有未知扩展名的资源将被丢弃。
  • format-指定要生成的输出文件类型(如果force format为true):csv(默认值)或json

  • handle non-tabular-指定是否应将非表格式资源(即没有架构的资源)转储到结果数据包。 (有关详细信息,请参见下面的注释)

  • 将文件哈希添加到路径:指定是否将文件md5哈希包含到资源路径中。默认为false。如果true将散列嵌入如下路径:

    • 如果原始路径是路径/to/the/file.ext
    • 修改后的路径将是path/to/the/hash/file.ext
  • 计数器-指定是对数据的行、字节还是MD5哈希进行计数,以及数据应存储在哪里。具有以下属性的对象:

    • datapackage row count:数据包的总行计数应存储在何处(默认值:count/u rows
    • 数据包字节:数据包的总字节计数应存储在哪里(默认值:字节
    • 数据包哈希:数据包的md5哈希应存储在哪里(默认值:哈希
    • 资源行数:每个资源的总行数应存储在哪里(默认值:行数
    • 资源字节:每个资源的总字节计数应存储在哪里(默认值:字节
    • 资源哈希:每个资源的md5哈希应存储在哪里(默认值:哈希) 为了防止计数,这些属性中的每一个都可以设置为空。 每个属性都可以是点分隔的字符串,用于将数据存储在嵌套对象中(例如stats.rowcount
  • pretty descriptor:指定数据包描述符(datapackage.json)文件的外观:

    • false(默认)-描述符将写入一行。
    • true-描述符对每个键都有缩进和新行,因此它变得更易于阅读。
  • 文件格式化程序:指定自定义文件格式处理程序。一个具有格式名到python模块和类名映射的对象。

    • 允许重写现有的csvjson格式处理程序或添加对新格式的支持。
    • 请注意,这些更改可能会使生成的数据包与FrictionLessData规范不兼容,并可能导致互操作性问题。
    • 示例用法:pipeline-spec.yaml(在自定义格式化程序管道下),xlsxformat类

转储.to_zip

将数据包保存到压缩存档。

参数

  • 输出文件-压缩数据将存储的输出文件的名称
  • 强制格式格式-与转储到路径中相同
  • 处理非表格格式-与转储到路径中的相同
  • 将文件哈希添加到路径-与dump\u to\u path中的相同
  • 计数器-与转储到路径中的相同
  • 漂亮的描述符-与"转储到"路径中的相同
  • 文件格式化程序-与"转储到"路径中的相同

注意

dump.to_pathdump.to_zip处理器也将处理非表格资源。 这些资源必须同时具有urlpath属性,并且不能包含schema属性。 在这种情况下,将从url下载文件,并将其放置在提供的路径

自定义处理器

对于任何非琐碎的处理任务,您可能会遇到使用标准库处理器无法解决的问题,这是非常合理的。

为此,您可能需要编写自己的处理器-以下是处理方法。

有两种用于编写处理器的API—高级API和低级API。

高级处理器API

高级api对大多数处理器类型都非常有用:

$ pip install datapackage-pipelines
8

高级api由一个方法process组成,它有两个函数:

  • 修改数据包-对数据包描述符进行更改(如有必要),例如添加元数据、添加资源、修改资源的架构等。

    必要时也可用于初始化代码。

    它有以下参数:

    • data package是需要修改的当前数据包描述符。 需要返回修改后的数据包描述符。
    • parameters是包含处理器参数的dict,如pipeline-spec.yaml文件中提供的。
    • stats是一个dict,为了收集过程中的度量和测量(例如验证检查、行计数等),应该对其进行修改。
  • 处理行-修改流中的一行。它接收这些参数:

    • 是包含要处理的行的字典
    • 行索引是资源中该行的索引
    • 资源描述符是当前正在处理的资源的描述符对象
    • 资源索引是数据包中资源的索引
    • parameters是包含处理器参数的dict,如pipeline-spec.yaml文件中提供的。
    • stats是一个dict,为了在过程(例如验证检查、行计数等)

    并生成零个或多个已处理行。

几个例子
$ pip install datapackage-pipelines
9
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
0
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
1

低级处理器API

在某些情况下,高级api可能过于受限。在这种情况下,您应该考虑使用低级api。

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
2

上面的代码片段显示了大多数低级处理器的结构。

我们总是从调用insert()开始-此方法为我们提供上下文,其中包含执行参数、数据包描述符(从上一步输出)和所有流式资源行上的迭代器。

我们通过调用spew()来完成处理,后者将处理后的数据发送到管道中的下一个处理器。spew接收:

  • 修改的数据包描述符;
  • 资源上的(可能是新的)迭代器;
  • 一个stats对象,它将被添加到前面步骤中的stats中,并在管道完成时返回给用户,并且;
  • (可选)一个finalizer函数,它在完成对资源的迭代之后,但在向其他处理器发送完成的信号之前将被调用。例如,您可以使用它关闭任何打开的文件。

更深入的解释

spew按以下顺序写入接收到的数据:

  • 首先,将datapackage参数写入流。 这意味着对数据包描述符的所有修改都必须在调用spew之前完成。 一个常见的陷阱是修改资源迭代器中的数据包描述符-尽量避免这样做,因为下一个处理器将接收到的描述符将是错误的。
  • 然后它开始迭代资源。对于每个资源,它在其行上迭代并将每一行写入流。 这个迭代过程最终导致对原始资源迭代器(从摄取返回的资源迭代器)的迭代。反过来,这会导致读取进程的输入流。由于操作系统中缓冲的工作方式,"慢"处理器将缓慢读取其输入,导致之前的处理器在其CPU密集型处理器完成处理时在IO上休眠。"快速"处理器不会漫无目的地工作,而是在等待输入数据或等待输出缓冲区耗尽时休眠。 这里所得到的结果是,数据中的所有行都或多或少地同时被处理,并且没有一个处理器在可能在后续处理步骤中失败的行上工作得太"超前"。
  • 然后将统计数据写入流。这意味着可以在迭代期间修改统计信息,并且只使用迭代完成后的值。
  • 最后,调用finalizer方法(如果我们收到一个)。

几个例子

我们将从上面的相同处理器开始,现在用低级API实现。

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
3
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
4
$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
5

下一个例子展示了如何实现一个简单的web scraper。虽然不是严格要求,但web刮板通常是管道中的第一个处理器。因此,它们可以忽略传入的数据包和资源迭代器,因为没有以前的处理器生成数据:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
6

在这个例子中,我们可以看到初始的数据包是从头开始生成的,而资源迭代器实际上是一个scraper,在从ckan实例api接收行时生成它们。

插件和源描述符

在特定问题域中编写管道时,可能会发现开发的处理管道遵循某种模式。抓取或获取源数据往往是相似的。处理、数据清理和验证通常是相同的。

为了便于维护和避免样板,可以编写数据包管道插件。

插件是名为datapackage\u pipelines\lt;plugin name>;的python模块。插件可以提供两种功能:

  • 处理器包-您可以在插件中打包围绕某个主题或特定用途的处理器。任何位于数据包管道plugin name>;下的处理器foo都可以在管道内用作<;plugin name>;.foo模块。
  • 管道模板-如果类generator存在于datapackage\u pipelines\lt;plugin name>;模块中,则它将用于基于模板生成管道-我们称之为"源描述符"。

源描述符

源描述符是包含用于创建完整管道的信息的yaml文件。

dpp将查找名为<;plugin name>;.source-spec.yaml的文件,并将它们视为管道生成代码的输入-应在datapackage\u pipelines\lt;plugin name>;中名为generator的类中实现;模块。

这个类应该继承自generatorbase并且应该实现两个方法:

  • 生成管道- 它接收源描述并返回形式为(id,details)的元组迭代器。 id可能是管道id,在这种情况下,详细信息将是包含管道定义的对象。 如果id的格式为:module:,则详细信息将被视为来自指定模块的源规范。这样,生成器可能会生成其他源规范。
  • 获取模式-它应该返回一个json模式,用于验证源描述的结构

示例

假设我们编写了一个数据包管道插件,用于从ckan实例中提取数据。

下面是这样一个假设生成器的外观:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
7

在这种情况下,如果我们存储一个类似这样的文件:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
8

然后当运行dpp时,我们将看到一个名为/example com的可用管道包列表

这个管道在内部由3个步骤组成:ckan.scraper元数据转储到压缩文件

验证源描述符

源描述符可以具有与输出管道的参数域最匹配的任何结构。但是,它必须有一个一致的结构,由json模式文件支持。在我们的例子中,模式可能如下:

$ dpp
Available Pipelines:
- ./worldbank-co2-emissions (*)

$ $ dpp run --verbose ./worldbank-co2-emissions
RUNNING ./worldbank-co2-emissions
Collecting dependencies
Running async task
Waiting for completion
Async task starting
Searching for existing caches
Building process chain:
- update_package
- load
- set_types
- dump_to_zip
- (sink)
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :Starting new HTTP connection (1): api.worldbank.org:80
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
load: DEBUG   :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1"200308736
set_types: INFO    :(<dataflows.processors.set_type.set_type object at 0x10a5c79b0>,)
load: INFO    :Processed 264 rows
set_types: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py
dump_to_zip: INFO    :Processed 264 rows
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py
DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py
DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
INFO    :RESULTS:
INFO    :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'}
9

dpp将确保源描述符文件在尝试使用generator类将其转换为管道之前符合该架构。

提供处理器代码

在某些情况下,生成器也希望提供处理器代码(与管道定义一起)。 为此,生成器可以向包含处理器代码的任何步骤添加code属性。执行此步骤时,不会像往常一样尝试解析处理器,而是会改为解析提供的代码。

按计划运行

datapackage pipelines带有芹菜式集成,允许通过类似于crontab的语法在特定时间运行管道。

为了实现这一点,您只需将schedule部分添加到pipeline-spec.yaml文件中(或者从生成器类返回一个schedule,见上文),如下所示:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
0

在本例中,此管道设置为每小时运行一次,每小时运行一次。

要运行芹菜守护进程,请使用celeri的命令行界面运行datapackage\u pipelines.app。有一种方法

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
1

运行此服务器将从执行所有"脏"任务开始,并根据任务的计划继续执行任务。

作为启动调度程序和仪表板的快捷方式(见下文),您可以使用预先构建的Docker图像:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
2

然后浏览到http://<;docker machine的ip address>;:5000/查看当前执行状态仪表板。

管道仪表板和状态标牌

当安装在服务器上或使用任务调度程序运行时,通常很难确切地知道每个管道的运行情况和状态。

为了简化操作,您可以旋转Web仪表板,以提供每个管道的状态、基本信息和最新执行结果的概述。

要启动web服务器,请从命令行运行dpp serve并浏览到http://localhost:5000

环境变量dpp_base_path将确定仪表板是从根路径还是从另一个基本路径(示例值:/pipelines/)提供服务。

仪表板端点可以通过添加带有环境变量dpp_basic_auth_usernamedpp_basic_auth_password的用户名和密码来要求验证。

对于单个管道和管道集合,甚至更简单的管道状态都可以使用状态标记。对于单个管道,将完整的管道ID添加到徽标端点:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
3

 src=

 src=

 src=

 src=

或者对于管道集合:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
4

 src=

 src=

 src=

请注意,无论dpp_basic_auth_passworddpp_basic_auth_username设置如何,这些徽章端点都将始终公开。

与其他服务集成

数据包管道可以对任何管道事件调用预定义的webhook。这可能允许潜在的集成与其他应用程序一起使用。

要在特定管道中添加Webhook,请在管道定义中添加hooks属性,该属性应该是URL列表。 每当该管道排队、开始运行或结束运行时,所有URL都将使用此负载进行发布:

$ docker run -it -v `pwd`:/pipelines:rw \
        frictionlessdata/datapackage-pipelines
<available-pipelines>

$ docker run -it -v `pwd`:/pipelines:rw \
       frictionlessdata/datapackage-pipelines run ./worldbank-co2-emissions
<execution-logs>
5

已知问题

  • 加载在单个单元格中包含大量数据的资源时会引发异常(\112

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在未知属性上的PUT和POST失败会引发不同的行为   java无法使GWTRPC正常工作   java如何在安卓中更改一个特定视图的主题?   机器学习为什么改变了java中等式的两面?   java继承和重定向标准输出   java为什么Clojure中嵌套循环/重复速度慢?   使用JavaParser解析Java代码并查找父节点的语句类型   java读取类的方法并在arraylist中存储Web服务的路径名   java模板聚合匹配和投影一个没有id的字段   java为什么给定数组不返回false   java如何链接JLabel和JSpinner以调整大小   在java中,当过滤器只返回一个对象时,如何使用流和过滤器将值填充到对象中   java为什么使用getInstance   如何得到我的。运行java命令的bat文件