通过检查另一列的值在PySpark中更新特定行的值

0 投票
1 回答
31 浏览
提问于 2025-04-13 17:38

我想根据一些条件来更新一行的值。具体来说,我需要检查已经存在的三条记录,看看它们的值是否符合条件。如果符合条件,就用匹配到的记录中的相关数据来更新新的行记录。

我需要比较数据表中的三列(姓氏、出生日期和身份证号)。如果这三列的值在不同的雇主代码下匹配了两次,那么pyspark代码就应该查找员工唯一ID这一列,并使用与匹配记录对应的员工唯一ID值。

1 个回答

1

如果这三列的数据在不同的雇主代码下出现了两次,那就意味着对于同一个姓、名和身份证号,有两个不同的雇主代码吗?

假设你的参考数据框是这样的:

lastName|birthDate|nationalId|EmployeeCode
AAA|BBB|123|Emp1
AAA|BBB|123|Emp2
DDD|EEE|789|Emp6
XXX|YYY|456|Emp5

我认为可以通过将参考数据框分成两部分来处理这个问题:

  1. 那些对于同一个姓、生日和身份证号有多个雇主代码的记录
  2. 那些对于同样的三列只有一个雇主代码的记录

下面的代码就是将数据框分成这两部分:

import pyspark.sql.functions as F

df = spark.createDataFrame([
    ('AAA', 'BBB', '123', 'Emp1'),
    ('AAA', 'BBB', '123', 'Emp2'),
    ('XXX', 'YYY', '456', 'Emp5'),
    ('DDD', 'EEE', '789', 'Emp6')
], ['lastName', 'birthDate', 'nationalId', 'EmployeeCode'])


aggregated_df = df.groupBy("lastName", "birthDate", "nationalId").agg(F.countDistinct("EmployeeCode").alias("distinctEmployeeCode"))
part_one = aggregated_df.filter(F.col("distinctEmployeeCode") > 1)
part_two = aggregated_df.filter(F.col("distinctEmployeeCode") <= 1)

现在可以根据需要进行连接,将你的数据框与第一部分进行内连接,这样就能得到EmployeeUniqueID,因为与这个连接匹配的记录有重复的雇主代码。

与第二部分的连接将给你相应的EmployeeCode,任何与这个表连接的记录都会有一个独特的雇主代码。

撰写回答