singer.io用于将数据加载到雪花的目标-与流水线兼容

pipelinewise-target-snowflake的Python项目详细描述


管道式目标雪花

PyPI versionPyPI - Python VersionLicense: MIT

SingerSinger spec之后将数据加载到雪花中的目标。

这是一个PipelineWise兼容的目标连接器。

如何使用

运行此目标的建议方法是从PipelineWise中使用它。当从pipelinewise运行它时,您不需要用json文件配置这个tap,而且大多数事情都是自动化的。请查看Target Snowflake上的相关文档

如果要独立运行此Singer Target,请进一步阅读。

安装

首先,确保python 3安装在您的系统上,或者遵循以下步骤 Mac或的安装说明 Ubuntu

建议使用virtualenv:

  python3 -m venv venv
  pip install pipelinewise-target-snowflake

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

流程图

Flow Diagram

运行

像其他任何一个目标一样,遵循歌手的具体要求:

some-singer-tap | target-snowflake --config [config.json]

它从stdin读取传入消息,并使用config.json中的属性将数据上传到snowflake中。

注意:要避免版本冲突,请在单独的虚拟环境中运行taptargets

先决条件

在开始使用此目标之前,需要在一个架构中以雪花形式创建两个对象。

  1. s3上的命名外部stage对象。这将用于将csv文件上载到s3并将数据合并到雪花表中。
CREATE STAGE {schema}.{stage_name}
url='s3://{s3_bucket}'
credentials=(AWS_KEY_ID='{aws_key_id}' AWS_SECRET_KEY='{aws_secret_key}')
encryption=(MASTER_KEY='{client_side_encryption_master_key}');

encryption选项是可选的,用于客户端加密。如果要启用客户端加密,则需要 在目标config.json中定义相同的主密钥。在下面的配置设置部分提供详细信息。

  1. 命名文件格式。这将由merge/copy命令用于正确分析s3中的csv文件:

CREATE file format IF NOT EXISTS {schema}.{file_format_name} type = 'CSV' escape='\\' field_optionally_enclosed_by='"';

配置设置

运行目标连接器需要一个config.json文件。最小设置示例:

{"account":"rtxxxxx.eu-central-1","dbname":"database_name","user":"my_user","password":"password","warehouse":"my_virtual_warehouse","aws_access_key_id":"secret","aws_secret_access_key":"secret","s3_bucket":"bucket_name","stage":"snowflake_external_stage_object_name","file_format":"snowflake_file_format_object_name","default_target_schema":"my_target_schema"}

config.json

中选项的完整列表
PropertyTypeRequired?Description
accountStringYesSnowflake account name (i.e. rtXXXXX.eu-central-1)
dbnameStringYesSnowflake Database name
userStringYesSnowflake User
passwordStringYesSnowflake Password
warehouseStringYesSnowflake virtual warehouse name
aws_access_key_idStringYesS3 Access Key Id
aws_secret_access_keyStringYesS3 Secret Access Key
s3_bucketStringYesS3 Bucket name
s3_key_prefixString(Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket.
stageStringYesNamed external stage name created at pre-requirements section. Has to be a fully qualified name including the schema name
file_formatStringYesNamed file format name created at pre-requirements section. Has to be a fully qualified name including the schema name.
batch_sizeInteger(Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake.
default_target_schemaStringName of the schema where the tables will be created. If ^{} is not defined then every stream sent by the tap is loaded into this schema.
default_target_schema_select_permissionStringGrant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific role or a list of roles. If ^{} is not defined then every stream sent by the tap is granted accordingly.
schema_mappingObjectUseful if you want to load multiple streams from one tap to multiple Snowflake schemas.

If the tap sends the ^{} in ^{} format then this option overwrites the ^{} value. Note, that using ^{} you can overwrite the ^{} value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.

Note: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example]
disable_table_cacheBoolean(Default: False) By default the connector caches the available table structures in Snowflake at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With ^{} option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime.
client_side_encryption_master_keyString(Default: None) When this is defined, Client-Side Encryption is enabled. The data in S3 will be encrypted, No third parties, including Amazon AWS and any ISPs, can see data in the clear. Snowflake COPY command will decrypt the data once it's in Snowflake. The master key must be 256-bit length and must be encoded as base64 string.
client_side_encryption_stage_objectString(Default: None) Required when ^{} is defined. The name of the encrypted stage object in Snowflake that created separately and using the same encryption master key.
add_metadata_columnsBoolean(Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix ^{}. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the ^{} metadata column. Without the ^{} option the deleted rows from singer taps will not be recongisable in Snowflake.
hard_deleteBoolean(Default: False) When ^{} option is true then DELETE SQL commands will be performed in Snowflake to delete rows in tables. It's achieved by continuously checking the ^{} metadata column sent by the singer tap. Due to deleting rows requires metadata columns, ^{} option automatically enables the ^{} option as well.
data_flattening_max_levelInteger(Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off.

运行测试:

  1. 定义需要运行测试的环境变量
  export TARGET_SNOWFLAKE_ACCOUNT=<snowflake-account-name>
  export TARGET_SNOWFLAKE_DBNAME=<snowflake-database-name>
  export TARGET_SNOWFLAKE_USER=<snowflake-user>
  export TARGET_SNOWFLAKE_PASSWORD=<snowfale-password>
  export TARGET_SNOWFLAKE_WAREHOUSE=<snowflake-warehouse>
  export TARGET_SNOWFLAKE_SCHEMA=<snowflake-schema>
  export TARGET_SNOWFLAKE_AWS_ACCESS_KEY=<aws-access-key-id>
  export TARGET_SNOWFLAKE_AWS_SECRET_ACCESS_KEY=<aws-access-secret-access-key>
  export TARGET_SNOWFLAKE_S3_BUCKET=<s3-external-bucket>
  export TARGET_SNOWFLAKE_S3_KEY_PREFIX=<bucket-directory>
  export TARGET_SNOWFLAKE_STAGE=<stage-object-with-schema-name>
  export TARGET_SNOWFLAKE_FILE_FORMAT=<file-format-object-with-schema-name>
  export CLIENT_SIDE_ENCRYPTION_MASTER_KEY=<client_side_encryption_master_key>
  export CLIENT_SIDE_ENCRYPTION_STAGE_OBJECT=<client_side_encryption_stage_object>
  1. 在虚拟环境中安装python依赖项并运行nose单元和集成测试
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .
  pip install nose
  1. 运行单元测试:
  nosetests --where=tests/unit
  1. 要运行集成测试:
  nosetests --where=tests/integration

运行pylint:

  1. 安装python依赖项并运行python linter
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .
  pip install pylint
  pylint target_snowflake -d C,W,unexpected-keyword-arg,duplicate-code

许可证

apache许可证2.0版

请参见LICENSE以查看全文。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java LineNumberReader。如果查询行为不正确,则返回readLine()   java包含了一个使用AndroidX的工具栏,这让我的应用程序崩溃了   JVM设置通过“java jar”运行应用程序的最佳实践   java如何获取ImageButton宽度   java Oracle SQLLDR实用程序无响应   列出Java获取对象的arrayList中最常见的元素   java使用带有FlowLayout的getContentpane对布局应用更改,但不起作用为什么?   在java中,我可以在画布上绘制画布吗?   编译游戏代码时发生java异常错误   从firestore获取java Webview失败   java将TableLayout中单元格的内容向右对齐   java无法在发布模式下启动活动(使用proguard安卓optimize配置)   java允许在线程期间进行GUI更新。睡觉   java如何对以变量为列表的列表进行排序   API URL上的java Google云端点异常