如何使用pySpark比较两个CSV文件并验证是否存在

2024-04-29 21:38:23 发布

您现在位置:Python中文网/ 问答频道 /正文

因此,我有一个input.csv,类似这样:

First_Name  Last_Name   Birthdate   Gender  Email_ID        Mobile
Smit        Will        21-04-1974  M       da1@gmail.com   5224521452
Bob         Builder     14-03-1992  M       ad4@gmail.com   2452586253

和Database.csv,但没有更多记录:

First_Name  Last_Name   Birthdate   Gender  Email_ID        Mobile
Bob         Micheles    10-04-1982  M       ya4@gmail.com   7845214525
Will        Smith       21-04-1974  M       da1@gmail.com   9874521452
Emma        Watson      21-08-1989  F       emma@gmail.com  5748214563
Emma        Smit        21-08-1999  F       da1@gmail.com   9874521452
bob         robison     14-03-1992  M       za@gmail.com    2452586253

df_DataBase = spark.read.csv("DataBase.csv",inferSchema=True,header=True) 我的预期结果是:

  1. Bob Builder与Bob robison相同,只是他的姓氏电子邮件ID不同
  2. Smit Will和Will Smith仅与姓名相同,手机号码不同。 最后,如果它们在现有输入文件中存在或不存在,则按如下方式打印:

Expected Output

注意:当电子邮件、电话和生日不匹配时,人是不同的

因此,使用pyspark,如果我们能够实现这一点,我将非常高兴


Tags: csvnamecomidemailmobilegenderwill
1条回答
网友
1楼 · 发布于 2024-04-29 21:38:23

您可以尝试以下方法:

ip = spark.read.csv("input.csv")
db = spark.read.csv("database.csv")
#condition if person is same
person_exists = [((col('a.Email_id') == col('b.Email_id')) | (col('a.Mobile') == col('b.Mobile')) | (col('a.Birthdate') == col('b.Birthdate'))) ]

#people existing in db
existing_persons = 
ip.alias('a').join(db.alias('b'),person_exists,"inner").select([col('a.'+x) for x in a.columns])

#people not existing in db
non_existing = ip.subtract(existing_persons)

#add a column to indicate if same person or not
existing_persons = existing_persons.withColumn('Same_Person',lit('Yes'))
non_existing = non_existing.withColumn('Same_Person',lit('No'))

相关问题 更多 >