我正在尝试使用Pyspark雪花连接器将数据从S3加载到雪花表。我已将连接器选项配置如下-
df.write.mode('overwrite') \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(sfURL=[url],
sfUser=[user],
sfDatabase=[database],
sfSchema=[schema],
sfWarehouse=[warehouse],
sfRole=[role],
pem_private_key=pkb) \
.option('truncate_table', 'ON') \
.option('usestagingtable', 'OFF') \
.option('continue_on_error', 'on') \
.option('dbtable', [dbtable]) \
.save()
根据配置,当它出错时,它会跳过坏记录部分加载文件。但是我只在第一条记录中得到错误。在我的例子中,我希望看到发生错误的所有记录以及每个错误的详细信息。 使用VALIDATE函数本来是可能的,但由于spark snowflake connector使用临时外部阶段来执行COPY INTO命令,因此在执行VALIDATE函数时无法获取它
由于在pyspark write/save方法之后阶段和文件不可用,我们可以在该方法本身中使用postactions
选项吗?
在postactions
选项中,我们可以提供select * from table(validate(MY_TABLE, job_id => LAST_QUERY_ID(-1)));
在这种情况下,如何获取所有不良记录以及与之相关的错误
目前没有回答
相关问题 更多 >
编程相关推荐