Pyspark 性能提升

0 投票
2 回答
61 浏览
提问于 2025-04-14 18:26

我在一个PySpark的数据框(df)中使用了以下代码:

for col in df.columns:
    df = df.withColumn(col, F.rank()over(Window.orderBy((col))))

因为我的数据框有2000列,所以运行起来非常慢。有什么办法可以让它更快一些吗?我试过一些用户定义函数(UDF),但没有成功。

2 个回答

0

虽然我没找到一个特别好的解决办法,但我发现当我尝试转换的列数越多,性能就会显著下降。基于这个发现,我决定把我的数据集分成每次只转换大约400列,然后最后再把这些表合并在一起。虽然这不是我最理想的做法,但至少解决了问题。

0

请注意,如果在使用窗口函数时没有加上partitionBy这个部分,处理大数据时会导致性能下降。建议你在建模数据时,找一个可以用来分区的列。

据说PySpark会给出这样的警告来提醒你这个问题:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

不过,作为for循环的替代方案,你可以使用selectExpr方法,这样可以只走一遍数据,同时也能利用Catalyst优化器来加快运行速度。

# Create a list of SQL expressions that apply the rank function to each column
exprs = [f"rank() OVER (ORDER BY {col}) as {col}" for col in df.columns]

# Apply all expressions at once using selectExpr
df = df.selectExpr(*exprs)

或者

# Create a list of expressions that apply the rank function to each column
exprs = [F.rank().over(Window.orderBy(col)).alias(col) for col in df.columns]

# Apply all expressions at once using selectExpr
df = df.select(*exprs)

看看是否可以在某一列上使用PARTITION BYpartitionBy,这样可以提高大表的性能。

撰写回答