在Pyspark DataFrame中按批次为行添加唯一ID

0 投票
2 回答
64 浏览
提问于 2025-04-12 06:21

我有一个PySpark的数据表,想要在里面添加一个新列,这个列里要有唯一的ID,并且这些ID是按批次分配的。比如说,我想给前100行生成并分配一个唯一的ID,然后接着给接下来的100行再生成一个新的ID,依此类推。

我该怎么高效地做到这一点呢?

2 个回答

0

由于你没有分享你的数据框架的结构,我假设你的数据框里有一个“id”列。你可以根据需要把它更新成其他的列。

你可以简单地使用 row_number() 这个窗口函数,像下面这样来得到你想要的结果:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

# 100 in the denominator is the batch size
df = df.withColumn(
    "unique_batch_id",
    ((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
    .cast("integer")
)

更新 1: 根据你的回复 - 你没有类似“id”的列,所以我添加了一个 monotonically_increasing_id(),这样上面的代码才能正常工作,满足你的需求。

更新 2: 根据评论,你需要的是 uuid 而不是整数作为批次 ID - 所以我想出了以下的解决办法 - 在之前的代码基础上进行了扩展:

from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, monotonically_increasing_id, udf
import hashlib
import uuid

df = df.withColumn(
    "batch_id",
    ((row_number().over(Window.orderBy(monotonically_increasing_id())) - 1) / 100)
    .cast("integer").cast("string")
)

def generate_uuid(batch_id):
    return str(uuid.UUID(bytes=hashlib.md5(batch_id.encode()).digest()))

uuid_udf = udf(generate_uuid)

df = df.withColumn("uuid", uuid_udf(df["batch_id"])).drop("batch_id")
0

我希望我理解了你的问题。

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("id", monotonically_increasing_id())

# This creates a new column 'batch_id' which assigns the same id for each batch of 100 rows
df = df.withColumn("batch_id", (df["id"] / 100).cast("integer"))

monotonically_increasing_id() 这个函数会为每一行数据生成一个独一无二的ID。

撰写回答