如何在Python函数中更新pyspark DataFrame

0 投票
1 回答
47 浏览
提问于 2025-04-14 16:03

我有一个Python函数,它接收一个pyspark的数据框(dataframe),然后检查这个数据框是否包含其他函数在脚本中需要的所有列。特别是,如果缺少名为 'weight' 的列,我想更新用户传入的数据框,给它添加一个新列。

举个例子:

from pyspark.sql import functions as F

def verify_cols(df):
    if 'weight' not in df.columns:
        df = df.withColumn('weight', F.lit(1))  # Can I update `df` inside this function?

如你所见,我希望这个函数能更新 df。我该怎么做呢?如果可以的话,我想避免使用 return 语句。

这篇帖子非常相似,但它使用了pandas的 inplace 参数。

1 个回答

1

为了避免使用返回语句,你可以创建一个类,并把数据框(df)作为这个类的一个成员变量。

from pyspark.sql import functions as F
from pyspark.sql.DataFrame import DataFrame
class Validator:
    def __init__(self, df: DataFrame):
        self.df = df

    def verify_cols(self):
        if 'weight' not in self.df.columns:
            self.df = self.df.withColumn('weight', F.lit(1))

在调用verify_cols方法之后,成员变量df会被更新。

撰写回答