UDF在PySp中运行两次

2024-04-16 20:34:38 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个简单的spark数据帧,它有两列,都是字符串;一个叫做id,另一个叫做name。我还有一个名为string_replacement的Python函数,它执行一些字符串操作。我已经定义了一个包含string_replacement并应用于数据帧的每一行的包装器UDF。只有name列被传递给字符串操作函数。这是密码

# Import libraries
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Create Example Dataframe

row1 = Row(id='123456', name='Computer Science')

df = spark.createDataFrame([row1])

# Print the dataframe
df.show()

# Define function that does some string operations
def string_replacement(input_string):
    string=input_string
    string=string.replace('Computer', 'Computer x')
    string=string.replace('Science', 'Science x')
    return string


# Define wrapper function to turn into UFD

def wrapper_func(row):
    temp=row[1]
    temp=string_replacement(temp)
    row[1]=temp

    return row


# Create the schema for the resulting data frame
output_schema = StructType([StructField('id', StringType(), True),
                     StructField('name', StringType(), True)])


# UDF to apply the wrapper function to the dataframe
new_udf=f.udf(lambda z: wrapper_func(z), output_schema)

cols=df.columns
new_df=df.select(new_udf(f.array(cols)).alias('results')).select(f.col('results.*'))

new_df.show(truncate = False)

函数将单词Computer转换为Computer x。对单词Science也是这样。你知道吗

原始数据帧如下所示

+------+----------------+
|    id|            name|
+------+----------------+
|123456|Computer Science|
+------+----------------+

应用函数后,看起来是这样的

+------+------------------------+
|id    |name                    |
+------+------------------------+
|123456|Computer x x Science x x|
+------+------------------------+

x x可以看出,它已经运行了两次函数。在第一次运行的输出上的第二次。如何避免这种行为?

有趣的是,如果我不分解生成的数据帧,它看起来很好:

new_df=df.select(new_udf(f.array(cols)).alias('results'))

给你

+-----------------------------+
|results                      |
+-----------------------------+
|[123456,Computer x Science x]|
+-----------------------------+

Tags: the函数nameiddfnewstringwrapper
2条回答

谢谢你,塞利姆。这似乎也可行,但你的方法更清晰。你知道吗

def string_replacement(string1, string2):
    string2=string2.replace('Computer', 'Computer x')
    string2=string2.replace('Science', 'Science x')
    return string1, string2

output_schema = StructType([StructField('id', StringType(), True), StructField('name', StringType(), True)])

new_udf=f.udf(string_replacement, output_schema)

cols=df.columns
df.select( new_udf(f.col('id'), f.col('name')).alias('results')).select(f.col('results.*')).show(truncate = False)

使用星型扩展似乎会导致为每个扩展的元素运行一次UDF,如图所示。你知道吗

df.select(new_udf(F.array(cols)).alias('results')).select(F.col('results.*')).explain()

# == Physical Plan ==
# *(1) Project [pythonUDF1#109.id AS id#104, pythonUDF1#109.name AS name#105]
# +- BatchEvalPython [<lambda>(array(id#0, name#1)), <lambda>(array(id#0, name#1))], [id#0, name#1, pythonUDF0#108, pythonUDF1#109]
#    +- Scan ExistingRDD[id#0,name#1]

如果希望保持当前的代码结构,可以通过将其包装在数组中并执行分解来解决问题。你知道吗

df.select(F.explode(F.array(new_udf(F.array(cols)))).alias('results')).select(F.col('results.*')).show(truncate=False)

# +   +          +
# |id    |name                |
# +   +          +
# |123456|Computer x Science x|
# +   +          +

根据您的用例,如果您可以用这种方式重新实现UDF,即每行只处理一个特定的列而不是整行,那么代码的可读性会更好。你知道吗

def rep_str(string):
    res = string
    res = res.replace('Computer', 'Computer x')
    res = res.replace('Science', 'Science x')
    return res

rep_str_udf = F.udf(lambda s: rep_str(s), StringType())

df.withColumn('new_name', rep_str_udf(df.name)).show()

# +   +        +          +
# |    id|            name|            new_name|
# +   +        +          +
# |123456|Computer Science|Computer x Science x|
# +   +        +          +

相关问题 更多 >