Spark SQL-加载包含一些错误记录的csv/psv文件

2024-04-27 13:43:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我们正在用Spark加载文件目录的层次结构,并将它们转换为Parquet。数百个管道分隔的文件中有数十GB。有些本身就很大。

比如说,每100个文件都有一两行有一个额外的分隔符,使整个进程(或文件)中止。

我们正在使用:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .load(glob)

是否有任何带有Spark的扩展或事件处理机制可以附加到读取行的逻辑上,即,如果遇到格式错误的行,只需跳过该行而不是使其上的进程失败?

(我们计划进行更多的预处理,但这将是最直接和最关键的修复。)


Tags: 文件format管道层次结构进程sparkquoteheader
1条回答
网友
1楼 · 发布于 2024-04-27 13:43:40

在您的情况下,失败的可能不是Spark解析部分,而是默认值实际上是PERMISSIVE,这样它会将最大努力解析为格式错误的记录,然后在处理逻辑的下游引发问题。

您应该能够简单地添加选项:

.option("mode", "DROPMALFORMED")

像这样:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .option("mode", "DROPMALFORMED")
        .load(glob)

它将跳过分隔符数目不正确或与模式不匹配的行,而不是让它们在以后的代码中导致错误。

相关问题 更多 >