比较Pyspark数据帧的值(列表)

2024-04-26 21:47:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我想比较一下list\u id列上的两个df1 df2数据帧:

df1 = 
+---------+
|  list_id|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
|[7, 8, 9]|
+---------+
df2 =
+------------+
|     list_id|
+------------+
| [10, 3, 11]|
|[12, 13, 14]|
| [15, 6, 16]|
+------------+

期望的结果是:

df2 =
+-------------------+
|            list_id|
+-------------------+
| [1, 2, 3, 10, 11] |
| [4, 5, 6, 15, 16] |
| [7, 8, 9]         |
| [12, 13, 14]      |
+-------------------+

我的目标是连接那些交集不是空的列表,并保持其他列表与pyspark相同。你知道吗

注意:我的数据帧非常大,使用sparksql连接是不可能的。你知道吗


Tags: 数据id目标列表listpysparkdf1df2
1条回答
网友
1楼 · 发布于 2024-04-26 21:47:49

我想出了一个不需要任何连接操作的代码。 不知何故,这是一个相当混乱,我不知道它会如何表现记忆wize考虑到我爆炸数组多次。你知道吗

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df1 = (sc.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
         .toDF(('c1', 'c2', 'c3'))
         .select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
        )

df2 = (sc.parallelize([(10, 3, 11), (12, 13, 14), (15, 6, 16)])
         .toDF(('c1', 'c2', 'c3'))
         .select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
         )

out = (df1.union(df2)
         .withColumn('key1', F.explode('id_list'))
         .withColumn('key2', F.explode('id_list'))
         .groupBy('key1')
         .agg(F.sort_array(F.collect_set(F.col('key2'))).alias('id_list'))
         .withColumn('key1', F.explode('id_list'))
         .withColumn('max_length', F.max(F.size('id_list')).over(Window().partitionBy('key1')))
         .where(F.col('max_length')==F.size('id_list'))
         .select('id_list')
         .distinct()
    )

相关问题 更多 >