Geolibs Dator-A Data Extractor

geolibs-dator的Python项目详细描述


Geolibs数据

dator是一个数据提取器(etl作为库),它使用pandas的数据帧作为内存中的临时存储。

功能

SourceExtractTransformLoad
BigQueryYY
CARTOYYY*
CSVYY
PandasY
PostgreSQLYYY

*注意:我们正在等待CARTOframes上的追加功能,因为我们使用的是niapa

配置

使用config.example.ymlone作为指南创建config.yml文件。你可以在其中找到所有可能的etl案例。

如果在etl过程中使用bigquery,则需要添加一个GOOGLE_APPLICATION_CREDENTIALS环境变量,其中包含指向google云的credentials.json文件的路径。

您可以使用example.py文件测试它们。

示例

dator_config.yml

datastorages:
  bigquery_input:
    type: bigquery
    data:
      query: SELECT * FROM `dataset.table` WHERE updated_at >= '2019-05-04T00:00:00Z' AND updated_at < '2019-06-01T00:00:00Z';

  carto_input:
    type: carto
    credentials:
      url: https://domain.com/user/user/
      api_key: api_key
    data:
      table: table

  postgresql_input:
    credentials:
      ...
    data:
      query: SELECT * FROM somewhere;
      types:
        - name: timeinstant
          type: datetime
        - name: fillinglevel
          type: float
        - name: temperature
          type: int
        - name: category
          type: str

  carto_output:
    type: carto
    credentials:
      url: https://domain.com/user/user/
      api_key: api_key
    data:
      table: table
      append: false

transformations:
  bigquery_agg:
    type: bigquery
    time:
      field: updated_at
      start: "2019-05-02T00:00:00Z"  # As string or YAML will parse them as DateTimes
      finish: "2019-05-03T00:00:00Z"
      step: 5 MINUTE
    aggregate:
      by:
        - container_id
        - updated_at
      fields:
        field_0: avg
        field_1: max

extract: bigquery_input
transform: bigquery_agg
load: carto_output

如何使用

此软件包旨在通过三个步骤完成etl操作:

提取

extract方法是默认方法,这意味着尽管此方法可以被覆盖,但默认情况下,它必须通过config工作。

(本节正在施工)

变换

(本节正在施工)

负载

load方法是一个默认方法,这意味着尽管这个方法可以被覆盖,但在默认情况下,它必须通过config来工作。它可以接收两个参数,pandas数据帧和一个带有额外信息的字典。

示例

app.py

fromdatorimportDatordator=Dator('/usr/src/app/dator_config.yml')df=dator.extract()df=dator.transform(df)dator.load(df)

app.py附加信息

fromdatorimportDatordefupsert_method:passdator=Dator('/usr/src/app/dator_config.yml')df=dator.extract()df=dator.transform(df)dator.load(df,{'method':upsert_method})

待办事项

  • 更好的医生。
  • 测试。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java生成随机浮点,包括两个边界   java三层体系结构风格是如何工作的?一些简单的例子   多线程可以使用线程。在Java中,在循环中使用sleep(),以便定期执行某些操作?   读取循环上的java HibernateMysql异常   java使用带有Apache Ivy的自定义存储库,未找到解析程序   filenotfoundexception在读取时出现问题。Java中的txt文件   嵌入式tomcat 8.0.21中的java Spring websocket   java为什么我需要创建一个类的引用,然后创建一个B类的对象   java Splashscreen动画在Mac OS中更新时闪烁   JavaSpring3。名为“zoneManagerDelegate”的x Bean必须是[com.ms.adsp.delegate.sapi.zoneManagerDelegate]类型,但实际上是[$Proxy20]类型   java SQLite:没有这样的专栏;不明错误   java将JTable定位到JFrame中JPanel中的(x,y)位置   java在导入组织方面面临挑战。知道。xchart*   xml读取Java中的SVG元素并跳过某些包含文本的元素   java Spring Redis问题:Redis缓存中的GetAllCacheNames不能与RedisCacheManager一起使用   java Vertex Hazelcast:集群问题   java如何编辑osgi托管服务实现使用的属性文件?   java Android活动并行启动?   java AWS Lambda用于将excel转储到数据库中