Pyspark在值位于列表中时替换DF值

2024-04-29 12:23:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试编写一个pyspark脚本来从pyspark df中删除信息。我的df看起来像:

  hashed_customer     firstname    lastname    email   order_id    status          timestamp
      eater 1_uuid  1_firstname  1_lastname  1_email    12345    OPTED_IN     2020-05-14 20:45:15
      eater 2_uuid  2_firstname  2_lastname  2_email    23456    OPTED_IN     2020-05-14 20:29:22
      eater 3_uuid  3_firstname  3_lastname  3_email    34567    OPTED_IN     2020-05-14 19:31:55
      eater 4_uuid  4_firstname  4_lastname  4_email    45678    OPTED_IN     2020-05-14 17:49:27

我需要从customer_temp_tb表中删除客户的另一个pyspark df,如下所示:

hashed_customer    eaterstatus
   eater 1_uuid      OPTED_OUT
   eater 3_uuid      OPTED_OUT

如果用户在第二个df中,我试图找到一种方法从第一个df中删除firstname、lastname和email。到目前为止,我已经使用以下方法创建了第二个df的散列_客户列表:

cust_opt_out_id = [row.hashed_eater_uuid for row in df_out.collect()]

现在,如果散列的客户ID在第二个df中,我试图找到一种方法从第一个df中删除firstname、lastname和email,以便最终结果如下所示:

hashed_customer     firstname    lastname    email   order_id    status          timestamp
   eater 1_uuid           NaN         NaN      NaN    12345    OPTED_IN     2020-05-14 20:45:15
   eater 2_uuid   2_firstname  2_lastname  2_email    23456    OPTED_IN     2020-05-14 20:29:22
   eater 3_uuid           NaN         NaN      NaN    34567    OPTED_IN     2020-05-14 19:31:55
   eater 4_uuid   4_firstname  4_lastname  4_email    45678    OPTED_IN     2020-05-14 17:49:27

如何创建一个函数来完成此操作?我知道在熊猫中,这将是一个简单的问题:

df_cust_out.loc[df_in['hashed_customer'].isin(cust_opt_out_id),['firstname','lastname', 'email']]=np.nan

但这在pyspark中不起作用


Tags: iniddf客户uuidemailcustomernan
1条回答
网友
1楼 · 发布于 2024-04-29 12:23:24

如果我要复制您的确切逻辑,我们可以执行以下操作(内联注释):

l = df2.select("hashed_customer").collect()
cols_to_update = ['firstname','lastname','email'] # list of cols to update
#use when + otherwise in a loop for the cols_to_update
cond = [F.when(F.col('hashed_customer').isin([i[0] for i in l]),
           F.lit(None)).otherwise(F.col(col)).alias(col) 
           for col in cols_to_update]
#select the changed columns and other columns
final = df1.select(*cond,*[a for a in df1.columns if a not in cols_to_update])
#order as the original dataframe
final.select(*df1.columns).show()

+       -+     -+     +   -+    +    +         -+
|hashed_customer|  firstname|  lastname|  email|order_id|  status|          timestamp|
+       -+     -+     +   -+    +    +         -+
|   eater 1_uuid|       null|      null|   null|   12345|OPTED_IN|2020-05-14 20:45:15|
|   eater 2_uuid|2_firstname|2_lastname|2_email|   23456|OPTED_IN|2020-05-14 20:29:22|
|   eater 3_uuid|       null|      null|   null|   34567|OPTED_IN|2020-05-14 19:31:55|
|   eater 4_uuid|4_firstname|4_lastname|4_email|   45678|OPTED_IN|2020-05-14 17:49:27|
+       -+     -+     +   -+    +    +         -+

相关问题 更多 >