在Pyspark DataFrame中按批次为行添加唯一ID
我有一个PySpark的数据表,想要在里面添加一个新列,这个列里要有唯一的ID,并且这些ID是按批次分配的。比如说,我想给前100行生成并分配一个唯一的ID,然后接着给接下来的100行再生成一个新的ID,依此类推。
我该怎么高效地做到这一点呢?
2 个回答
0
由于你没有分享你的数据框架的结构,我假设你的数据框里有一个“id”列。你可以根据需要把它更新成其他的列。
你可以简单地使用 row_number()
这个窗口函数,像下面这样来得到你想要的结果:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
# 100 in the denominator is the batch size
df = df.withColumn(
"unique_batch_id",
((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
.cast("integer")
)
更新 1: 根据你的回复 - 你没有类似“id”的列,所以我添加了一个 monotonically_increasing_id()
,这样上面的代码才能正常工作,满足你的需求。
更新 2: 根据评论,你需要的是 uuid
而不是整数作为批次 ID - 所以我想出了以下的解决办法 - 在之前的代码基础上进行了扩展:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, monotonically_increasing_id, udf
import hashlib
import uuid
df = df.withColumn(
"batch_id",
((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
.cast("integer").cast("string")
)
def generate_uuid(batch_id):
return str(uuid.UUID(bytes=hashlib.md5(batch_id.encode()).digest()))
uuid_udf = udf(generate_uuid)
df = df.withColumn("uuid", uuid_udf(df["batch_id"])).drop("batch_id")
0
我希望我理解了你的问题。
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
# This creates a new column 'batch_id' which assigns the same id for each batch of 100 rows
df = df.withColumn("batch_id", (df["id"] / 100).cast("integer"))
monotonically_increasing_id() 这个函数会为每一行数据生成一个独一无二的ID。