Spark上的Scala[自连接后过滤出重复行]

2024-06-08 16:12:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我尝试过使用python过滤数据

|name_x | age_x | salary_x | name_y | age_y | salary_y | age_diff
| James | 23    | 200000   | Jack   | 24    | 210040   |  1
| Jack  | 24    | 210040   | James  | 23    | 200000   |  1
| Irene | 25    | 200012   | John   | 25    | 210000   |  0
| Johny | 26    | 21090    | Elon   | 29    | 210012   |  3
| Josh  | 24    | 21090    | David  | 23    | 213012   |  1
| John  | 25    | 210000   | Irene  | 25    | 200012   |  0

第1行行和第2行也是重复的 第3行和第6行是重复的
姓名==姓名年龄工资=工资
,不考虑年龄差异,即输出。 我需要把它们过滤掉,[其中一个重复的行]。在

需要最终输出为:如下过滤出重复项

^{pr2}$

在python上实现如下,它返回重复项的索引,而且速度太慢。在

^{3}$

但是现在我不得不用spark在Scala上做这些,有没有更快的方法来接近这些过滤器,或者类似python中的shift。或scala上的窗口


Tags: 数据nameagediffjohndavid姓名elon
2条回答

我认为astro_asz的答案是更简洁的方法,但为了完整起见,以下是如何使用窗口来实现:

编辑:我更改了数据集,使两个人同名,并根据每行的内容添加了一个唯一的ID

val people = Seq(
  ("1", "James", 23, 200000),
  ("1", "James", 24, 210040),  // two people with same name
  ("2", "Irene", 25, 200012),
  ("2", "John",  25, 210000),
  ("3", "Johny", 26,  21090),
  ("3", "Elon",  29, 200000),
  ("4", "Josh",  24, 200000),
  ("4", "David", 23, 200000))

val columns = Seq("ID", "name", "age", "salary")
val df = people.toDF(columns:_*)

// In general you want to use the primary key from the underlying data store
// as your unique keys.  If for some weird reason the primary key is not 
// available or does not exist, you can try to create your own.  This
// is fraught with danger.  If you are willing to make the (dangerous)
// assumption a unique row is enough to uniquely identify the entity in
// that row, you can use a md5 hash of the contents of the row to create
// your id
val withKey = df.withColumn("key", md5(concat(columns.map(c => col(c)):_*)))

val x = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "x_" + c):_*)
val y = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "y_" + c):_*)

val partition = Window.partitionBy("ID").orderBy("x_key")
val df2 = x.join(y, Seq("ID"))
  .where('x_key =!= 'y_key)
  .withColumn("rank", rank over partition)
  .where('rank === 1)
  .drop("rank", "x_key", "y_key")

df2.show
/*-+   +  -+    +   +  -+    +                         
|ID|x_name|x_age|x_salary|y_name|y_age|y_salary|
+ +   +  -+    +   +  -+    +
| 3|  Elon|   29|  200000| Johny|   26|   21090|
| 1| James|   24|  210040| James|   23|  200000|
| 4| David|   23|  200000|  Josh|   24|  200000|
| 2| Irene|   25|  200012|  John|   25|  210000|
+ +   +  -+    +   +  -+   -*/

您可以向只保留两行中的一行的联接添加附加条件,例如name_x<;name_y。下面是一个示例:

示例数据帧:

  val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
    Seq(
      Row(1, "James",  1, 10),
      Row(1, "Jack",   2, 20),
      Row(2, "Tom",    3, 30),
      Row(2, "Eva",    4, 40)
    )
  )

  val schema: StructType = new StructType()
    .add(StructField("id",      IntegerType,  false))
    .add(StructField("name",    StringType,  false))
    .add(StructField("age",     IntegerType, false))
    .add(StructField("salary",  IntegerType, false))

  val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)

  df0.sort("id").show()

它给出了:

^{pr2}$

重命名dataframe的列:

val df1 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_x"))
val df2 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_y"))

然后用三个条件进行连接:

val df3 = df1.join(df2,
    col("id_x") === col("id_y") and
    col("name_x") =!= col("name_y") and
    col("name_x") < col("name_y"),
    "inner")
df3.show()

它回来了

+  +   +  -+    +  +   +  -+    +                         
|id_x|name_x|age_x|salary_x|id_y|name_y|age_y|salary_y|
+  +   +  -+    +  +   +  -+    +
|   1|  Jack|    2|      20|   1| James|    1|      10|
|   2|   Eva|    4|      40|   2|   Tom|    3|      30|
+  +   +  -+    +  +   +  -+    +

根据在数据中定义重复项的方式,区分两个重复项的条件将不同。在

相关问题 更多 >