如何用另一个数据帧中的新值更新pyspark数据帧?

2024-06-09 05:34:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我有两个spark数据帧:

数据帧A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据帧B:

^{pr2}$

数据帧B可以包含来自数据帧A的重复、更新和新行。我想在spark中编写一个操作,在其中我可以创建一个新的数据帧,其中包含来自数据帧A的行和来自数据帧B的已更新和新行

我首先创建一个哈希列,其中只包含不可更新的列。这是唯一的id。所以假设col1和{}可以更改值(可以更新),但是{}是唯一的。我创建了一个散列函数hash(col3,..,coln)

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在我想写一些spark代码,基本上从B中选择散列值不在A中的行(因此是新行和更新行)并将它们与A中的行一起连接到一个新的数据帧中?在

编辑: 数据帧B可以有来自数据帧A的额外列,因此不可能进行联合。在

示例

数据帧A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据帧B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果: 数据帧C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

Tags: 数据inforcolvalhashsparkunique
3条回答

这与update a dataframe column with new values密切相关,只是您还想从DataFrame B添加行。一种方法是首先执行链接问题中概述的内容,然后将结果与DataFrame B合并并删除重复项。在

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+  -+  -+  -+
#|col_1|col_2|col_3|
#+  -+  -+  -+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+  -+  -+  -+

或者更一般地使用列表理解,如果您有很多列要替换,并且您不想硬编码它们:

^{pr2}$

我会选择不同的解决方案,我认为它不太冗长,更通用,不涉及列列表。我将首先通过执行基于keyCols(list)的内部连接来确定将被更新的dfA的子集(replaceDf)。然后我将从dfA中减去这个replaceDF,并将其与dfB结合。在

    replaceDf = dfA.alias('a').join(dfB.alias('b'), on=keyCols, how='inner').select('a.*')
    resultDf = dfA.subtract(replaceDf).union(dfB).show()

即使在dfA和dfB中有不同的列,您仍然可以通过从两个数据帧中获取列的列表并找到它们的并集来克服这个问题。那我会的 准备select查询(而不是“select.('a.')*”),这样我就只列出dfA中存在于dfB+中的列+dfB中不存在的列。在

如果您只想保留唯一值,并且要求严格正确的结果,那么union后跟{}应该可以做到:

columns_which_dont_change = [...]
old_df.union(new_df).dropDuplicates(subset=columns_which_dont_change)

相关问题 更多 >