将PySpark中的一个Spark DataFrame从行转置为列,并将其附加到另一个DataFrame。

2024-04-25 21:33:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我在PySparkavg_length_df中有一个Spark数据帧,看起来像-

+----------------+---------+----------+-----------+---------+-------------+----------+
|       id       |        x|         a|          b|        c|      country|     param|
+----------------+---------+----------+-----------+---------+-------------+----------+
|            40.0|      9.0|     5.284|      5.047|    6.405|         13.0|avg_length|
+----------------+---------+----------+-----------+---------+-------------+----------+

我想把它从一行移到另一列,这样它就变成了-

+----------+
|avg_length|
+----------+
|      40.0|
|       9.0|
|     5.284|
|     5.047|
|     6.405|
|      13.0|
+----------+

接下来,我有第二个数据帧df2

+----------------+------+
|       col_names|dtypes|
+----------------+------+
|              id|string|
|               x|   int|
|               a|string|
|               b|string|
|               c|string|
|         country|string|
+----------------+------+

我想在df2中创建一列avg_length,等于上面的转置数据帧。因此,预期输出如下所示:

+----------------+------+----------+
|       col_names|dtypes|avg_length|
+----------------+------+----------+
|              id|string|      40.0|
|               x|   int|       9.0|
|               a|string|     5.284|
|               b|string|     5.047|
|               c|string|     6.405|
|         country|string|      13.0|
+----------------+------+----------+

如何完成这两个操作?你知道吗


Tags: 数据iddfstringparamnamescolcountry
2条回答
>>> from pyspark.sql import *
#Input DataFrame
>>> df.show()
+  + -+  -+  -+  -+   -+     +
|  id|  x|    a|    b|    c|country|     param|
+  + -+  -+  -+  -+   -+     +
|40.0|9.0|5.284|5.047|6.405|   13.0|avg_length|
+  + -+  -+  -+  -+   -+     +

>>> avgDF  = df.groupBy(df["id"],df["x"],df["a"],df["b"],df["c"],df["country"]).pivot("param").agg(concat_ws("",collect_list(to_json(struct("id","x","a","b","c","country"))))).drop("id","x","a","b","c","country")
>>> avgDF.show(2,False)
+                                      +
|avg_length                                                                  |
+                                      +
|{"id":"40.0","x":"9.0","a":"5.284","b":"5.047","c":"6.405","country":"13.0"}|
+                                      +

>>> finalDF = avgDF.withColumn("value", explode(split(regexp_replace(col("avg_length"),"""[\\{ " \\}]""",""),","))).withColumn("avg_length", split(col("value"), ":")[1]).withColumn("col_names", split(col("value"), ":")[0]).drop("value")
>>> finalDF.show(10,False)
+     +    -+
|avg_length|col_names|
+     +    -+
|40.0      |id       |
|9.0       |x        |
|5.284     |a        |
|5.047     |b        |
|6.405     |c        |
|13.0      |country  |
+     +    -+

#other dataframe
>>> df2.show()
+    -+   +
|col_names|dtypes|
+    -+   +
|       id|string|
|        x|   int|
|        a|string|
|        b|string|
|        c|string|
|  country|string|
+    -+   +

>>> df2.join(finalDF,"col_names").show(10,False)
+    -+   +     +
|col_names|dtypes|avg_length|
+    -+   +     +
|id       |string|40.0      |
|x        |int   |9.0       |
|a        |string|5.284     |
|b        |string|5.047     |
|c        |string|6.405     |
|country  |string|13.0      |
+    -+   +     +

下面是在pyspark中转置数据帧(RDD)的代码。你知道吗

import numpy as np
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit

dt1 = {'avg_length':[40.0, 9.0, 5.284, 5.047, 6.405, 13.0]}
dt = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in dt1.items()]).toDF()
dt.show()


# - Transpose Code  -


# Grad data from first columns, since it will be transposed to new column headers
new_header = [i[0] for i in dt.select("_1").rdd.map(tuple).collect()]

# Remove first column from dataframe
dt2 = dt.select([c for c in dt.columns if c not in ['_1']])

# Convert DataFrame to RDD
rdd = dt2.rdd.map(tuple)

# Transpose Data
rddT1 = rdd.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in enumerate(x)])
rddT2 = rddT1.map(lambda (i,j,e): (j, (i,e))).groupByKey().sortByKey()
rddT3 = rddT2.map(lambda (i, x): sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
rddT4 = rddT3.map(lambda x: map(lambda (i, y): y , x))

# Convert back to DataFrame (along with header)
df = rddT4.toDF(new_header)

df.show()

在转置之后,您可以简单地合并两个数据帧。 我希望这有帮助。你知道吗

相关问题 更多 >