Pyspark从现有数组列创建特定长度的数组列

2024-04-27 05:05:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个类似pyspark的数据帧

+-----+----+------------+------------+-------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_Values     |S_values    | 
+-----+----+------------+------------+-------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|["ab",1]     | [1,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |["a","b","c"]| []         |
+-----+----+------------+------------+-------------+------------+

我想最终创建df,如下所示

+-----+----+------------+------------+
| Name| Age| Attribute  |      Values|
+-----+----+------------+------------+
| Bob1| 16 |  x1        |     ab     |
| Bob1| 16 |  x2        |     1      |
| Bob1| 16 |  x1        |     1      |
| Bob1| 16 |  x3        |     2      |
| Bob2| 16 |  x1        |     a      |
| Bob2| 16 |  x2        |     b      |
| Bob2| 16 |  x3        |     c      |
+-----+----+------------+------------+

基本上我想合并这两列并将它们分解成行。在pyspark数组函数的帮助下,我能够对数组进行concat和explode操作,但后来可以识别专业属性和运动属性之间的差异,因为它们可以有相同的名称。我还需要一个类型列

+-----+----+------------+------------+------------+
| Name| Age|   Attribute|       type |Value       |
+-----+----+------------+------------+------------+
| Bob1| 16 |  x1        |     1      | ab         |
| Bob1| 16 |  x2        |     1      | 1          |
| Bob1| 16 |  x1        |     2      | 1          |
| Bob1| 16 |  x3        |     2      | 2          |
| Bob2| 16 |  x1        |     1      | a          |
| Bob2| 16 |  x2        |     1      | b          |
| Bob2| 16 |  x3        |     1      | c          |
+-----+----+------------+------------+------------+  

所以我想先创建一个单独的数组列

+-----+----+------------+------------+------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_type      |S_type      |
+-----+----+------------+------------+------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|   [1,1]    | [2,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |  [1,1,1]   |  []        |
+-----+----+------------+------------+------------+------------+

这样我就可以合并列并使用所需类型的列进行分解,如上面的df所示。 问题是我无法动态创建P_类型和S_类型列。 我试过下面的代码

new_df = df.withColumn("temp_P_type", F.lit(1))\
                .withColumn("P_type", F.array_repeat("temp_P_type",F.size("P_Attribute")))

这将抛出TypeError: Column is not iterable错误。 若列的长度已经被提取为另一列,那个么它也不起作用。 有人能帮我吗?有没有更好的解决办法?作为df级别,不使用RDD和python函数(不使用UDF)是否可以做到这一点

另外,我正在使用spark 2.4


Tags: name类型dfageabtypeattribute数组
2条回答

我建议使用高阶函数transform,使用structarray_union,然后使用explode once选择两者

df.show()
#+  + -+      +      +
#|Name|Age| P_Attribute|S_Attributes|
#+  + -+      +      +
#|Bob1| 16|    [x1, x2]|    [x1, x3]|
#|Bob2| 16|[x1, x2, x3]|          []|
#+  + -+      +      +

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union(F.expr("""transform(P_Attribute,x-> struct(x as Attribute,1 as Type))"""),\
              F.expr("""transform(S_Attributes,x-> struct(x as Attribute,2 as Type))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+  + -+    -+  +
#|Name|Age|Attribute|Type|
#+  + -+    -+  +
#|Bob1| 16|       x1|   1|
#|Bob1| 16|       x2|   1|
#|Bob1| 16|       x1|   2|
#|Bob1| 16|       x3|   2|
#|Bob2| 16|       x1|   1|
#|Bob2| 16|       x2|   1|
#|Bob2| 16|       x3|   1|
#+  + -+    -+  +

UPDATE:

df.show()

#+  + -+      +      +    -+    +
#|Name|Age| P_Attribute|S_Attributes| P_Values|S_values|
#+  + -+      +      +    -+    +
#|Bob1| 16|    [x1, x2]|    [x1, x3]|  [ab, 1]|  [1, 2]|
#|Bob2| 16|[x1, x2, x3]|          []|[a, b, c]|      []|
#+  + -+      +      +    -+    +

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union\
               (F.expr("""transform(arrays_zip(P_Attribute,P_Values),x->\
                          struct(x.P_Attribute as Attribute,1 as Type,string(x.P_Values) as Value))"""),\
                F.expr("""transform(arrays_zip(S_Attributes,S_Values),x->\
                          struct(x.S_Attributes as Attribute,2 as Type,string(x.S_Values) as Value))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+  + -+    -+  +  -+
#|Name|Age|Attribute|Type|Value|
#+  + -+    -+  +  -+
#|Bob1| 16|       x1|   1|   ab|
#|Bob1| 16|       x2|   1|    1|
#|Bob1| 16|       x1|   2|    1|
#|Bob1| 16|       x3|   2|    2|
#|Bob2| 16|       x1|   1|    a|
#|Bob2| 16|       x2|   1|    b|
#|Bob2| 16|       x3|   1|    c|
#+  + -+    -+  +  -+

你可以做如下的事情。首先将P_attributesS_attributes收集到单个Attributes列中,然后对其执行posexplode,这将根据需要提供引用属性源(PS)的type列。最后explode单击Attributes列以展平所有属性

import pyspark.sql.functions as f

df = spark.createDataFrame([
    ['Bob1', 16, ['x1', 'x2'], ['x1', 'x3']],
    ['Bob2', 16, ['x1', 'x2', 'x3'], []]],
    ['Name', 'Age', 'P_Attribute', 'S_Attributes'])

df.withColumn('Attributes', f.array('P_Attribute', 'S_Attributes'))\
  .select('Name', 'Age', f.posexplode('Attributes').alias('type', 'Attribute'))\
  .withColumn('Attribute', f.explode('Attribute'))\
  .show()

+  + -+  +    -+
|Name|Age|type|Attribute|
+  + -+  +    -+
|Bob1| 16|   0|       x1|
|Bob1| 16|   0|       x2|
|Bob1| 16|   1|       x1|
|Bob1| 16|   1|       x3|
|Bob2| 16|   0|       x1|
|Bob2| 16|   0|       x2|
|Bob2| 16|   0|       x3|
+  + -+  +    -+

相关问题 更多 >