Pyspark数据帧转义&

2024-05-15 00:31:19 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一些大的(~150GB)csv文件,使用分号作为分隔符。我发现一些字段包含html编码的符号和&分号被用作列分隔符,因此我需要找到一种方法来转义它,或者在加载数据帧时用&替换&

例如,我有以下csv文件:

ID;FirstName;LastName
1;Chandler;Bing
2;Ross & Monica;Geller

我使用以下笔记本加载它:

df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test.csv')
df.show()

我得到的结果是:

+---+---------+--------+
| ID|FirstName|LastName|
+---+---------+--------+
|  1| Chandler|    Bing|
|  2|Ross &amp|  Monica|
+---+---------+--------+

而我要找的是:

+---+-------------+--------+
| ID|    FirstName|LastName|
+---+-------------+--------+
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+---+-------------+--------+

我尝试过使用.option("escape", "&"),但这种转义只对单个字符有效

更新

我有一个使用RDD的黑客解决方案,它至少适用于小测试文件,但我仍在寻找一个合适的解决方案,在加载数据帧时转义字符串

rdd = sc.textFile('/mnt/input/AMP test.csv')
rdd = rdd.map(lambda x: x.replace('&', '&'))

rdd.coalesce(1).saveAsTextFile("/mnt/input/AMP test escaped.csv")

df = spark.read.option("delimiter", ";").option("header","true").csv('/mnt/input/AMP test escaped.csv')
df.show()


Tags: 文件csvtestiddfinputfirstnameamp
2条回答

我认为没有办法仅使用spark.read.csv来逃避这个复杂的字符&,解决方案就像你做了“变通”一样:

  • rdd.map:此函数已将所有列中的值&替换为&
  • 无需将rdd保存在临时路径中,只需将其作为csv参数传递:
rdd = sc.textFile("your_path").map(lambda x: x.replace("&", "&"))

df = spark.read.csv(rdd, header=True, sep=";")
df.show()

+ -+      -+    +
| ID|    FirstName|LastName|
+ -+      -+    +
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+ -+      -+    +

您可以直接使用数据帧来实现这一点。如果您知道至少有一个文件不包含任何&来检索架构,那么它会有所帮助

假设存在这样一个文件,并且其路径为“valid.csv”

from pyspark.sql import functions as F

# I acquire a valid file without the & wrong data to get a nice schema
schm = spark.read.csv("valid.csv", header=True, inferSchema=True, sep=";").schema


df = spark.read.text("/mnt/input/AMP test.csv")

# I assume you have several files, so I remove all the headers.
# I do not need them as I already have my schema in schm.
header = df.first().value
df = df.where(F.col("value") != header)


# I replace "&" with "&", and split the column
df = df.withColumn(
    "value", F.regexp_replace(F.col("value"), "&", "&")
).withColumn(
    "value", F.split("value", ";")
)

# I explode the array in several columns and add types based on schm defined previously
df = df.select(
    *(
        F.col("value").getItem(i).cast(col.dataType).alias(col.name)
        for i, col in enumerate(schm)
    )
)

结果如下:

df.show()
+ -+      -+    +
| ID|    FirstName|LastName|
+ -+      -+    +
|  1|     Chandler|    Bing|
|  2|Ross & Monica|  Geller|
+ -+      -+    +

df.printSchema()
root
 |  ID: integer (nullable = true)
 |  FirstName: string (nullable = true)
 |  LastName: string (nullable = true)

相关问题 更多 >

    热门问题