当条款被卡住时,Pypark大

2024-03-29 09:01:31 发布

您现在位置:Python中文网/ 问答频道 /正文

集群:驱动程序:i3.2xlarge,工作程序:i3.2xlarge,8-16个工作程序,随需应变,Databricks 7.3 LTS(Spark 3.0.1)

我有一个约2500个条件的WHEN子句,我的工作最终陷入困境,没有完成0个活动任务和0个阶段,很长一段时间都没有完成。 有没有更好的方法来重写这个

def for_exist_column(df, col, pre):
    if col in df.columns:
        return pre(df[col])
    else:
        return f.lit(False)

df=df.withColumn("flag", f.when((for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='B') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='B') & for_exist_column(df, 'U4C', lambda c: c=='C')) 

                              | (for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='D') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='I') & for_exist_column(df, 'U4C', lambda c: c=='B')) | 
                              | (for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='D') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='I'))
                              | ... 2000 other conditions
                ) 

编辑: 尝试创建与上述内容等效的udf。任何帮助都将不胜感激

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import *
from pyspark.sql import functions as f

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df=spark.read.parquet("s3://tempOutforWith.csv")

@pandas_udf('string', PandasUDFType.SCALAR)  #not sure if this is correct
def for_exist_column(df: pd.DataFrame, col: str, pre: pd.Series) -> pd.Series:
    if col in df.columns:
        return pd.Series(pre(df[col]))
    else:
        return pd.Series(False)

df=df.withColumn("flag", f.when((for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='B') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='B') & for_exist_column(df, 'U4C', lambda c: c=='C')),1).otherwise(0))
display(df)

获取以下错误:

TypeError: Invalid argument, not a string or column: DataFrame[B2B: string, B2C: string, B3N: string, L8A: string, U1A: string, U4C: string, U4N: string, Dummy: bigint, Katashiki: string] of type <class 'pyspark.sql.dataframe.DataFrame'>. 
For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Tags: lambdadfforsqlstringcolumncolpre