如何使用pyspark snowflake connector将数据加载到snowflake时获取所有不良记录和所有错误?

2024-04-26 00:01:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用Pyspark雪花连接器将数据从S3加载到雪花表。我已将连接器选项配置如下-

      df.write.mode('overwrite') \
        .format(SNOWFLAKE_SOURCE_NAME) \
        .options(sfURL=[url],
                 sfUser=[user],
                 sfDatabase=[database],
                 sfSchema=[schema],
                 sfWarehouse=[warehouse],
                 sfRole=[role],
                 pem_private_key=pkb) \
        .option('truncate_table', 'ON') \
        .option('usestagingtable', 'OFF') \
        .option('continue_on_error', 'on') \
        .option('dbtable', [dbtable]) \
        .save()

根据配置,当它出错时,它会跳过坏记录部分加载文件。但是我只在第一条记录中得到错误。在我的例子中,我希望看到发生错误的所有记录以及每个错误的详细信息。 使用VALIDATE函数本来是可能的,但由于spark snowflake connector使用临时外部阶段来执行COPY INTO命令,因此在执行VALIDATE函数时无法获取它

由于在pyspark write/save方法之后阶段和文件不可用,我们可以在该方法本身中使用postactions选项吗? 在postactions选项中,我们可以提供select * from table(validate(MY_TABLE, job_id => LAST_QUERY_ID(-1)));

在这种情况下,如何获取所有不良记录以及与之相关的错误


Tags: 文件方法函数onsave选项错误记录