基于两个spark数据列的两个pyframe更新

2024-06-02 05:40:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我在pyspark中有一个类似下面的数据帧。在

+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
|     serial_number  |     rest_id  |     value  |     body  |     legs  |     face  |     idle  |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
| sn11               | rs1          | N          | Y         | N         | N         | acde      |
| sn1                | rs1          | N          | Y         | N         | N         | den       |
| sn1                | null         | Y          | N         | Y         | N         | can       |
| sn2                | rs2          | Y          | Y         | N         | N         | aeg       |
| null               | rs2          | N          | Y         | N         | Y         | ueg       |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+

现在我想在检查一些列值的同时update一些列。在

我想更新value当任何给定的serial_number或{}有值Y,那么该特定的serial_number或{}的所有{}都应该更新为Y。如果没有,那么它们有什么值。在

我已经做了如下。在

^{pr2}$

我得到了我想要的结果。在

现在我想对列bodylegs和{}重复相同的操作。在

我可以对所有列individually执行上述操作,我的意思是说3连接语句。但是我想在一个语句中更新所有4列。在

我怎么能做到呢?在

Expected result

+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
|     serial_number  |     rest_id  |     value  |     body  |     legs  |     face  |     idle  |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+
| sn11               | rs1          | N          | Y         | N         | N         | acde      |
| sn1                | rs1          | Y          | Y         | Y         | N         | den       |
| sn1                | null         | Y          | Y         | Y         | N         | can       |
| sn2                | rs2          | Y          | Y         | N         | Y         | aeg       |
| null               | rs2          | Y          | Y         | N         | Y         | ueg       |
+--------------------+--------------+------------+-----------+-----------+-----------+-----------+

Tags: restidnumbervalueserialbodynullface
1条回答
网友
1楼 · 发布于 2024-06-02 05:40:11

您应该对serial_numberrest_id列使用window函数,以检查该组中的列中是否存在Y。(以下为解释意见)

#column names for looping for the updates
columns = ["value","body","legs","face"]
import sys
from pyspark.sql import window as w
#window for serial number grouping
windowSpec1 = w.Window.partitionBy('serial_number').rowsBetween(-sys.maxint, sys.maxint)
#window for rest id grouping
windowSpec2 = w.Window.partitionBy('rest_id').rowsBetween(-sys.maxint, sys.maxint)

from pyspark.sql import functions as f
from pyspark.sql import types as t
#udf function for checking if Y is in the collected list of windows defined above for the columns in the list defined for looping
def containsUdf(x):
    return "Y" in x

containsUdfCall = f.udf(containsUdf, t.BooleanType())

#looping the columns for checking the condition defined in udf function above by collecting the N and Y in each columns within windows defined
for column in columns:
    df = df.withColumn(column, f.when(containsUdfCall(f.collect_list(column).over(windowSpec1)) | containsUdfCall(f.collect_list(column).over(windowSpec2)), "Y").otherwise(df[column]))

df.show(truncate=False)

它应该给你

^{pr2}$

我建议在两个循环中分别使用window函数,因为这可能会给大数据带来内存异常,因为两个窗口函数同时用于每一行

相关问题 更多 >