Pyspark/Python:按类型分组打乱几列的行/记录
我正在尝试在pyspark中对三列(function_txt、item_txt和value_txt)进行随机排序,条件是它们的vehicle_id和diag_sid相同。例如:
diag_sid | vehicle_id | source | function_txt | item_txt | value_txt | date |
---|---|---|---|---|---|---|
1711364453938960795 | W1KAH0FB1PF092903 | A | Bordnetzdaten_EZS222 | 22105E | 1A 00 18 CA 05 7A... | 2024-03-25 |
1711364453938960795 | W1KAH0FB1PF092903 | A | Bordnetzdaten_EZS223 | 221058 | 01 E4 FD E8 FD E0... | 2024-03-25 |
到目前为止,我尝试过使用df.reindex(np.random.permutation(df.index)),但这只是重新排列了行,而没有在单元格内打乱数据。
有没有什么方法可以在pyspark中只对同一组记录(这里是通过diag_sid和vehicle_id)中的某些列进行打乱?
期望的输出是:
diag_sid | vehicle_id | source | function_txt | item_txt | value_txt | date |
---|---|---|---|---|---|---|
1711364453938960795 | W1KAH0FB1PF092903 | A | Bordnetzdaten_EZS223 | 221058 | 01 E4 FD E8 FD E0... | 2024-03-25 |
1711364453938960795 | W1KAH0FB1PF092903 | A | Bordnetzdaten_EZS222 | 22105E | 1A 00 18 CA 05 7A... | 2024-03-25 |
1 个回答
2
我不太明白你为什么要这样做。不过我可以给你一个简单的步骤。
首先,把你想要打乱的所有列和一个随机整数放在一起,形成一个结构体。
接着,创建一个窗口规范,指定哪些列要保持不变,并根据结构体里的随机整数进行排序。
最后,把这个结构体拆开,取回你原来的列。下面是一个例子。
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F, Window
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data = [
("1711364453938960795", "W1KAH0FB1PF092903", "A", "Bordnetzdaten_EZS222", "22105E", "1A 00 18 CA 05 7A", "2024-03-25"),
("1711364453938960795", "W1KAH0FB1PF092903", "A", "Bordnetzdaten_EZS223", "221058", "01 E4 FD E8 FD E0", "2024-03-25")
]
schema = ["diag_sid", "vehicle_id", "source", "function_txt", "item_txt", "value_txt", "date"]
df = sqlContext.createDataFrame(data, schema)
df.show(truncate=False)
df = df.withColumn("rand_int", F.expr("cast(floor(rand() * 1000) as int)"))
df = df.withColumn("info", F.struct( "rand_int", "source", "function_txt", "item_txt", "value_txt", "date")).drop("source", "function_txt", "item_txt", "value_txt", "date", "rand_int")
df.show(truncate=False)
windowSpec = Window.partitionBy("diag_sid", "vehicle_id").orderBy("info.rand_int")
df = df.withColumn("row_num", F.row_number().over(windowSpec))
df.show(truncate=False)
输出:
+-------------------+-----------------+------+--------------------+--------+-----------------+----------+
|diag_sid |vehicle_id |source|function_txt |item_txt|value_txt |date |
+-------------------+-----------------+------+--------------------+--------+-----------------+----------+
|1711364453938960795|W1KAH0FB1PF092903|A |Bordnetzdaten_EZS222|22105E |1A 00 18 CA 05 7A|2024-03-25|
|1711364453938960795|W1KAH0FB1PF092903|A |Bordnetzdaten_EZS223|221058 |01 E4 FD E8 FD E0|2024-03-25|
+-------------------+-----------------+------+--------------------+--------+-----------------+----------+
+-------------------+-----------------+---------------------------------------------------------------------+
|diag_sid |vehicle_id |info |
+-------------------+-----------------+---------------------------------------------------------------------+
|1711364453938960795|W1KAH0FB1PF092903|{561, A, Bordnetzdaten_EZS222, 22105E, 1A 00 18 CA 05 7A, 2024-03-25}|
|1711364453938960795|W1KAH0FB1PF092903|{162, A, Bordnetzdaten_EZS223, 221058, 01 E4 FD E8 FD E0, 2024-03-25}|
+-------------------+-----------------+---------------------------------------------------------------------+
+-------------------+-----------------+---------------------------------------------------------------------+-------+
|diag_sid |vehicle_id |info |row_num|
+-------------------+-----------------+---------------------------------------------------------------------+-------+
|1711364453938960795|W1KAH0FB1PF092903|{162, A, Bordnetzdaten_EZS223, 221058, 01 E4 FD E8 FD E0, 2024-03-25}|1 |
|1711364453938960795|W1KAH0FB1PF092903|{561, A, Bordnetzdaten_EZS222, 22105E, 1A 00 18 CA 05 7A, 2024-03-25}|2 |
+-------------------+-----------------+---------------------------------------------------------------------+-------+