如何在spark或Python中合并保持有序的数据帧

2024-04-20 07:22:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图合并两个数据帧,但保持顺序

第一个数据帧具有以下值:

>>> df_branch1.show(10,False)
+------------------------+
|col                     |
+------------------------+
|Sorter_SAMPLE_CUSTOMER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

第二个数据帧具有以下值:

>>> df_branch2.show(10,False)
+------------------------+                                                      
|col                     |
+------------------------+
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

我希望合并数据帧,但保留顺序,并希望保留顺序

输出预期如下:

+------------------------+                                                      
|col                     |
+------------------------+
|Sorter_SAMPLE_CUSTOMER  |
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+------------------------+

任何通过pyspark或python的解决方案都可以


Tags: 数据sourcetarget顺序updatecolcustomerfilter
2条回答

此解决方案使用zipWithIndex,在mono上不可信。。。方法有另一个解决方案,但由于时间紧迫,它就在这里

from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType, LongType
import pyspark.sql.functions as F

df1 = spark.createDataFrame([('abc'),('2'),('3'),('4')], StringType())
df2 = spark.createDataFrame([('abc'),('2a'),('3'),('4')], StringType())

# Common schema, can make def but pushed for time otherwise
schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = df1.rdd.zipWithIndex()
rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df1 = spark.createDataFrame(rdd1, schema)
df1 = df1.withColumn("t", F.lit(1))
rdd = df2.rdd.zipWithIndex()
rdd2 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df2 = spark.createDataFrame(rdd2, schema)
df2 = df2.withColumn("t", F.lit(2))

# df1 has all values always to be presented is the assumption, it's about getting the extras from df2 and positioned directly after
# functional solution, may be performance an issue, could do woth collect_list etc. but using SQL here
# Did not consider if less values for T1 vs # of values for T2

df1.createOrReplaceTempView("data1")
df2.createOrReplaceTempView("data2")

df3 = spark.sql('''select * from data2 d2  
                where exists   
                 (select d1.value from data1 d1
                   where d1.index = d2.index
                     and d1.value <> d2.value)
               ''')

dfRES = df1.union(df3).orderBy("index", "t").drop(*['index', 't'])
dfRES.show(truncate=False)

返回,最终DF中保留了顺序,且不需要进行区分:

+  -+
|value|
+  -+
|abc  |
|2    |
|2a   |
|3    |
|4    |
+  -+

UPD

尽管问题很模糊,但该解决方案满足重复值的要求-如果存在,例如:

df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())
df2 = spark.createDataFrame([ ('abc'),('2a'),('3'),('4'), ('abc'),('2b'),('3'),('4'), ('abc'),('2c'),('3c'),('4')   ], StringType()) 

下面是一种使用key列的方法:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# create a key column
d1 = d1.withColumn("key", F.monotonically_increasing_id())
d2 = d2.withColumn("key", F.monotonically_increasing_id())

# concat data
d3 = d1.union(d2)

# sort by key
d3 = d3.orderBy('key').drop('key')

w = Window().partitionBy("col1").orderBy('col1')
d4 = d3.withColumn("key", F.monotonically_increasing_id())
d4 = (d4
     .withColumn("dupe", F.row_number().over(w))
     .where("dupe == 1")
     .orderBy("key")
     .drop(*['key', 'dupe']))

d4.show()

+            +
|col1                    |
+            +
|Sorter_SAMPLE_CUSTOMER  |
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
+            +

相关问题 更多 >