迭代数据帧df_a中的行,并根据Pyspark中的df_a值更新数据帧df_b

2024-04-27 18:55:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个数据帧df_b,它必须根据数据帧df_a值进行更新

df_a

+-----+-----+------------+---------+
| id_1| id_2| header_oper| head_seq|
+-----+-----+------------+---------+
|  boy|    3|      insert|        1|
|  bat|    4|      delete|        3|
|  cat|    2|      insert|        1|
|  bat|    4|      update|        2|
|  bat|    5|   beforeimg|        1|
+-----+-----+------------+---------+

df_b(之前)

+-----+-----+
| id_1| id_2|
+-----+-----+
|  boy|    4|
|  bat|    5|
|  cat|    1|
+-----+-----+

我想出的方法:

  1. 按“标题顺序”对数据进行排序
  2. 迭代df_a
  3. 如果'header_oper'.isin('insert','update'),则将该行附加到df_b
  4. 如果'header\u oper'.isin('delete','beforeimg'),则从df\u b中减去该行

预期东风b(之后):

+-----+-----+
| id_1| id_2|
+-----+-----+
|  boy|    4|
|  boy|    3|
|  cat|    2|
|  cat|    1|
+-----+-----+



需要有关如何迭代df_a并基于df_a值对df_b执行操作的帮助


Tags: 数据iddfupdatedeleteheadseqcat
2条回答

好吧,我知道了。因为每次更新之前都会有一次,所以操作的顺序并不重要

我只需要添加所有插入和更新,然后删除删除和BeforeImgs


对操作进行分区并取消选择标题列

ins=df_a.where(df_a['header_oper']=='insert')
ins=ins.select(id_1,id_2)

upd=df_a.where(df_a['header_oper']=='update')
upd=upd.select(id_1,id_2)

dele=df_a.where(df_a['header_oper']=='delete')
dele=dele.select(id_1,id_2)

bimg=df_a.where(df_a['header_oper']=='delete')
bimg=bimg.select(id_1,id_2)

将插入和更新附加到df_b

df_b=df_b.union(ins)
df_b=df_b.union(upd)

从df_b中删除删除和BeforeImgs

df_b=df_b.subtract(dele)
df_b=df_b.subtract(bimg)
ds= spark.createDataFrame([('boy',4),('bat',5),('cat',1)],['id_1','id_2'])
df_op=spark.createDataFrame([('boy',3,'insert',1),('bat',4,'delete',3),('cat',2,'insert',1),('bat',4,'update',2),('bat',5,'beforeimg',1)], ['id_1','id_2','eff_op','seq'])

effective_op=df_op.groupBy('id_1').agg(max('seq').alias('seq')).join(df_op,['id_1','seq'])

ds_insert=ds.union(effective_op.select('id_1','id_2').filter("eff_op in ('insert')").orderBy(asc('id_1')))

ds_delete=ds_insert.join(effective_op.filter("eff_op in ('delete')").select("*"),['id_1'],'left').select(ds_insert.id_1, ds_insert.id_2).filter("eff_op is null")

display(ds_delete)

相关问题 更多 >