singer.io tap用于从红移中提取数据
tap-redshift的Python项目详细描述
Singer点击从Redshift数据库中提取数据,并根据singer规范生成json格式的数据。
用法
tap redshift假设您与redshift有连接,并且需要python 3.6+。
步骤1:创建配置文件
安装tap redshift时,需要为数据库连接创建一个config.json文件。
json文件需要以下属性;
- host
- port
- dbname
- user
- password
- start_date(表示法:yyyy-mm-ddthh:mm:ssz)
以及一个可选属性;
- schema
示例:
{"host":"REDSHIFT_HOST","port":"REDSHIFT_PORT","dbname":"REDSHIFT_DBNAME","user":"REDSHIFT_USER","password":"REDSHIFT_PASSWORD","start_date":"REDSHIFT_START_DATE","schema":"REDSHIFT_SCHEMA"}
步骤2:发现可以从redshift
中提取的内容可以在发现模式下调用tap以获取数据库中可用的表和列。 它指向为连接到redshift而创建的配置文件:
$ tap-redshift --config config.json -d
完整的catalog tap被写入stdout,每个表都有一个json模式描述。消息来源 表直接对应于歌手流。
将输出从tap的发现模式重定向到文件,以便在tap处于 以同步模式调用。
$ tap-redshift -c config.json -d > catalog.json
这将在发现模式下运行tap并将输出复制到catalog.json文件中。
目录包含流对象的列表,每个对象对应红移架构中可用的表。
示例:
{"streams":[{"tap_stream_id":"sample-dbname.public.sample-name","stream":"sample-stream","database_name":"sample-dbname","table_name":"public.sample-name""schema":{"properties":{"id":{"minimum":-2147483648,"inclusion":"automatic","maximum":2147483647,"type":["null","integer"]},"name":{"maxLength":255,"inclusion":"available","type":["null","string"]},"updated_at":{"inclusion":"available","type":["string"],"format":"date-time"},},"type":"object"},"metadata":[{"metadata":{"selected-by-default":false,"selected":true,"is-view":false,"table-key-properties":["id"],"schema-name":"sample-stream","valid-replication-keys":["updated_at"]},"breadcrumb":[],},{"metadata":{"selected-by-default":true,"sql-datatype":"int2","inclusion":"automatic"},"breadcrumb":["properties","id"]},{"metadata":{"selected-by-default":true,"sql-datatype":"varchar","inclusion":"available"},"breadcrumb":["properties","name"]},{"metadata":{"selected-by-default":true,"sql-datatype":"datetime","inclusion":"available",},"breadcrumb":["properties","updated_at"]}]}]}
步骤3:选择要同步的表
在同步模式下,tap-redshift需要提供一个目录文件,其中用户必须 已选择应传输哪些流(表)。默认情况下不选择流。
对于目录中的每个流,找到metadata部分。这是您要修改的部分 以选择流和(可选)单个属性。
流本身由一个空的breadcrumb表示。
示例:
"metadata":[{"breadcrumb":[],"metadata":{"selected-by-default":false,...}}]
您可以通过将"selected": true添加到其元数据中来选择它。
示例:
"metadata":[{"breadcrumb":[],"metadata":{"selected":true,"selected-by-default":false,...}}]
然后,可以使用properties catalog参数以同步模式调用tap:
示例(与target-datadotworld配对)
tap-redshift -c config.json --catalog catalog.json | target-datadotworld -c config-dw.json
第4步:同步数据
有两种方法可以复制给定的表。全表和增量。 默认情况下使用全表复制。
全桌
全表复制每次调用tap时都从源表中提取所有数据 国家档案。
增量
增量复制与状态文件一起工作,每个状态文件只提取新记录 调用tap的时间,即从上次同步的数据继续。
要使用增量复制,我们需要添加replication_method和replication_key 到catalog.json文件中的流(表)元数据。
示例:
"metadata":[{"breadcrumb":[],"metadata":{"selected":true,"selected-by-default":false,"replication-method":"INCREMENTAL","replication-key":"updated_at",...}}]
然后我们可以在同步模式下再次调用tap。这次输出将有STATE条消息 包含提取的数据的replication_key_value和bookmark。
将输出重定向到state.json文件。通常,目标将在 数据处理完毕。
运行下面的代码将状态传递到state.json文件中。
示例:
tap-redshift -c config.json --catalog catalog.json |\ target-datadotworld -c config-dw.json > state.json
state.json文件应该是;
{"currently_syncing":null,"bookmarks":{"sample-dbname.public.sample-name":{"replication_key":"updated_at","version":1516304171710,"replication_key_value":"2013-10-29T09:38:41.341Z"}}}
对于后续运行,可以调用传递最新状态的增量复制,以便将数据限制为自上次执行以来已修改的内容。
tail -1 state.json > latest-state.json;\ tap-redshift \ -c config-redshift.json \ --catalog catalog.json \ -s latest-state.json |\ target-datadotworld -c config-dw.json > state.json
一个生成文件中的所有步骤
为了您的方便,上面提到的所有步骤都在下面的Makefile中捕获。 此示例使用target-datadotworld,但可以修改为使用任何其他歌手目标。
# Requires python 3.6 install: pip3 install tap-redshift;\ pip3 install target-datadotworld # Catalog discovery discover: tap-redshift \ -c config-redshift.json -d > catalog.json # Full sync fullsync: tap-redshift \ -c config-redshift.json \ --catalog catalog.json |\ target-datadotworld -c config-dw.json > state.json # Incremental sync sync: tail -1 state.json > latest-state.json;\ tap-redshift \ -c config-redshift.json \ --catalog catalog.json \ -s latest-state.json |\ target-datadotworld -c config-dw.json > state.json