我正在使用pyspark,将一个大型csv文件加载到带有spark csv的数据帧中,作为预处理步骤,我需要对其中一列(包含json字符串)中可用的数据应用各种操作。它将返回X值,每个值都需要存储在各自独立的列中。
该功能将在UDF中实现。但是,我不知道如何从那个UDF返回一个值列表并将这些值输入到各个列中。下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
将udf返回的两个值(在本例中)存储在单独的列上的最佳方法是什么?现在它们被输入为字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
不能从一个UDF调用创建多个顶级列,但可以创建一个新的
struct
。它需要具有指定returnType
的自定义项:使用简单的
select
进一步展平架构:另见Derive multiple columns from a single column in a Spark DataFrame
您可以使用flatMap一次性将列获取所需的数据帧
相关问题 更多 >
编程相关推荐