singer.io用于将数据加载到雪花的目标-与流水线兼容
pipelinewise-target-snowflake的Python项目详细描述
管道式目标雪花
Singer在Singer spec之后将数据加载到雪花中的目标。
这是一个PipelineWise兼容的目标连接器。
如何使用
运行此目标的建议方法是从PipelineWise中使用它。当从pipelinewise运行它时,您不需要用json文件配置这个tap,而且大多数事情都是自动化的。请查看Target Snowflake上的相关文档
如果要独立运行此Singer Target,请进一步阅读。
安装
首先,确保python 3安装在您的系统上,或者遵循以下步骤 Mac或的安装说明 Ubuntu。
建议使用virtualenv:
python3 -m venv venv pip install pipelinewise-target-snowflake
或
python3 -m venv venv . venv/bin/activate pip install --upgrade pip pip install .
流程图
运行
像其他任何一个目标一样,遵循歌手的具体要求:
some-singer-tap | target-snowflake --config [config.json]
它从stdin读取传入消息,并使用config.json
中的属性将数据上传到snowflake中。
注意:要避免版本冲突,请在单独的虚拟环境中运行tap
和targets
。
先决条件
在开始使用此目标之前,需要在一个架构中以雪花形式创建两个对象。
- s3上的命名外部stage对象。这将用于将csv文件上载到s3并将数据合并到雪花表中。
CREATE STAGE {schema}.{stage_name}
url='s3://{s3_bucket}'
credentials=(AWS_KEY_ID='{aws_key_id}' AWS_SECRET_KEY='{aws_secret_key}')
encryption=(MASTER_KEY='{client_side_encryption_master_key}');
encryption
选项是可选的,用于客户端加密。如果要启用客户端加密,则需要
在目标config.json
中定义相同的主密钥。在下面的配置设置部分提供详细信息。
- 命名文件格式。这将由merge/copy命令用于正确分析s3中的csv文件:
CREATE file format IF NOT EXISTS {schema}.{file_format_name} type = 'CSV' escape='\\' field_optionally_enclosed_by='"';
配置设置
运行目标连接器需要一个config.json
文件。最小设置示例:
{"account":"rtxxxxx.eu-central-1","dbname":"database_name","user":"my_user","password":"password","warehouse":"my_virtual_warehouse","aws_access_key_id":"secret","aws_secret_access_key":"secret","s3_bucket":"bucket_name","stage":"snowflake_external_stage_object_name","file_format":"snowflake_file_format_object_name","default_target_schema":"my_target_schema"}
config.json
:
Property | Type | Required? | Description |
---|---|---|---|
account | String | Yes | Snowflake account name (i.e. rtXXXXX.eu-central-1) |
dbname | String | Yes | Snowflake Database name |
user | String | Yes | Snowflake User |
password | String | Yes | Snowflake Password |
warehouse | String | Yes | Snowflake virtual warehouse name |
aws_access_key_id | String | Yes | S3 Access Key Id |
aws_secret_access_key | String | Yes | S3 Secret Access Key |
s3_bucket | String | Yes | S3 Bucket name |
s3_key_prefix | String | (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. | |
stage | String | Yes | Named external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name |
file_format | String | Yes | Named file format name created at pre-requirements section. Has to be a fully qualified name including the schema name. |
batch_size | Integer | (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake. | |
default_target_schema | String | Name of the schema where the tables will be created. If ^{ | |
default_target_schema_select_permission | String | Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If ^{ | |
schema_mapping | Object | Useful if you want to load multiple streams from one tap to multiple Snowflake schemas. If the tap sends the ^{ Note: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example] | |
disable_table_cache | Boolean | (Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With ^{ | |
client_side_encryption_master_key | String | (Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string. | |
client_side_encryption_stage_object | String | (Default: None) Required when ^{ | |
add_metadata_columns | Boolean | (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix ^{ | |
hard_delete | Boolean | (Default: False) When ^{ | |
data_flattening_max_level | Integer | (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically. When value is 0 (default) then flattening functionality is turned off. |
运行测试:
- 定义需要运行测试的环境变量
export TARGET_SNOWFLAKE_ACCOUNT=<snowflake-account-name>
export TARGET_SNOWFLAKE_DBNAME=<snowflake-database-name>
export TARGET_SNOWFLAKE_USER=<snowflake-user>
export TARGET_SNOWFLAKE_PASSWORD=<snowfale-password>
export TARGET_SNOWFLAKE_WAREHOUSE=<snowflake-warehouse>
export TARGET_SNOWFLAKE_SCHEMA=<snowflake-schema>
export TARGET_SNOWFLAKE_AWS_ACCESS_KEY=<aws-access-key-id>
export TARGET_SNOWFLAKE_AWS_SECRET_ACCESS_KEY=<aws-access-secret-access-key>
export TARGET_SNOWFLAKE_S3_BUCKET=<s3-external-bucket>
export TARGET_SNOWFLAKE_S3_KEY_PREFIX=<bucket-directory>
export TARGET_SNOWFLAKE_STAGE=<stage-object-with-schema-name>
export TARGET_SNOWFLAKE_FILE_FORMAT=<file-format-object-with-schema-name>
export CLIENT_SIDE_ENCRYPTION_MASTER_KEY=<client_side_encryption_master_key>
export CLIENT_SIDE_ENCRYPTION_STAGE_OBJECT=<client_side_encryption_stage_object>
- 在虚拟环境中安装python依赖项并运行nose单元和集成测试
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
pip install nose
- 运行单元测试:
nosetests --where=tests/unit
- 要运行集成测试:
nosetests --where=tests/integration
运行pylint:
- 安装python依赖项并运行python linter
python3 -m venv venv
. venv/bin/activate
pip install --upgrade pip
pip install .
pip install pylint
pylint target_snowflake -d C,W,unexpected-keyword-arg,duplicate-code
许可证
apache许可证2.0版
请参见LICENSE以查看全文。