Pyspark/Python:按类型分组打乱几列的行/记录

0 投票
1 回答
43 浏览
提问于 2025-04-12 19:17

我正在尝试在pyspark中对三列(function_txt、item_txt和value_txt)进行随机排序,条件是它们的vehicle_id和diag_sid相同。例如:

diag_sid vehicle_id source function_txt item_txt value_txt date
1711364453938960795 W1KAH0FB1PF092903 A Bordnetzdaten_EZS222 22105E 1A 00 18 CA 05 7A... 2024-03-25
1711364453938960795 W1KAH0FB1PF092903 A Bordnetzdaten_EZS223 221058 01 E4 FD E8 FD E0... 2024-03-25

到目前为止,我尝试过使用df.reindex(np.random.permutation(df.index)),但这只是重新排列了行,而没有在单元格内打乱数据。

有没有什么方法可以在pyspark中只对同一组记录(这里是通过diag_sid和vehicle_id)中的某些列进行打乱?

期望的输出是:

diag_sid vehicle_id source function_txt item_txt value_txt date
1711364453938960795 W1KAH0FB1PF092903 A Bordnetzdaten_EZS223 221058 01 E4 FD E8 FD E0... 2024-03-25
1711364453938960795 W1KAH0FB1PF092903 A Bordnetzdaten_EZS222 22105E 1A 00 18 CA 05 7A... 2024-03-25

1 个回答

2

我不太明白你为什么要这样做。不过我可以给你一个简单的步骤。

首先,把你想要打乱的所有列和一个随机整数放在一起,形成一个结构体。

接着,创建一个窗口规范,指定哪些列要保持不变,并根据结构体里的随机整数进行排序。

最后,把这个结构体拆开,取回你原来的列。下面是一个例子。

from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F, Window


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data = [
    ("1711364453938960795", "W1KAH0FB1PF092903", "A", "Bordnetzdaten_EZS222", "22105E", "1A 00 18 CA 05 7A", "2024-03-25"),
    ("1711364453938960795", "W1KAH0FB1PF092903", "A", "Bordnetzdaten_EZS223", "221058", "01 E4 FD E8 FD E0", "2024-03-25")
]

schema = ["diag_sid", "vehicle_id", "source", "function_txt", "item_txt", "value_txt", "date"]
df = sqlContext.createDataFrame(data, schema)
df.show(truncate=False)

df = df.withColumn("rand_int", F.expr("cast(floor(rand() * 1000) as int)"))

df = df.withColumn("info", F.struct( "rand_int", "source", "function_txt", "item_txt", "value_txt", "date")).drop("source", "function_txt", "item_txt", "value_txt", "date", "rand_int")
df.show(truncate=False)

windowSpec = Window.partitionBy("diag_sid", "vehicle_id").orderBy("info.rand_int")

df = df.withColumn("row_num", F.row_number().over(windowSpec))
df.show(truncate=False)

输出:

+-------------------+-----------------+------+--------------------+--------+-----------------+----------+
|diag_sid           |vehicle_id       |source|function_txt        |item_txt|value_txt        |date      |
+-------------------+-----------------+------+--------------------+--------+-----------------+----------+
|1711364453938960795|W1KAH0FB1PF092903|A     |Bordnetzdaten_EZS222|22105E  |1A 00 18 CA 05 7A|2024-03-25|
|1711364453938960795|W1KAH0FB1PF092903|A     |Bordnetzdaten_EZS223|221058  |01 E4 FD E8 FD E0|2024-03-25|
+-------------------+-----------------+------+--------------------+--------+-----------------+----------+

+-------------------+-----------------+---------------------------------------------------------------------+
|diag_sid           |vehicle_id       |info                                                                 |
+-------------------+-----------------+---------------------------------------------------------------------+
|1711364453938960795|W1KAH0FB1PF092903|{561, A, Bordnetzdaten_EZS222, 22105E, 1A 00 18 CA 05 7A, 2024-03-25}|
|1711364453938960795|W1KAH0FB1PF092903|{162, A, Bordnetzdaten_EZS223, 221058, 01 E4 FD E8 FD E0, 2024-03-25}|
+-------------------+-----------------+---------------------------------------------------------------------+

+-------------------+-----------------+---------------------------------------------------------------------+-------+
|diag_sid           |vehicle_id       |info                                                                 |row_num|
+-------------------+-----------------+---------------------------------------------------------------------+-------+
|1711364453938960795|W1KAH0FB1PF092903|{162, A, Bordnetzdaten_EZS223, 221058, 01 E4 FD E8 FD E0, 2024-03-25}|1      |
|1711364453938960795|W1KAH0FB1PF092903|{561, A, Bordnetzdaten_EZS222, 22105E, 1A 00 18 CA 05 7A, 2024-03-25}|2      |
+-------------------+-----------------+---------------------------------------------------------------------+-------+

撰写回答