集群:驱动程序:i3.2xlarge,工作程序:i3.2xlarge,8-16个工作程序,随需应变,Databricks 7.3 LTS(Spark 3.0.1)
我有一个约2500个条件的WHEN子句,我的工作最终陷入困境,没有完成0个活动任务和0个阶段,很长一段时间都没有完成。 有没有更好的方法来重写这个
def for_exist_column(df, col, pre):
if col in df.columns:
return pre(df[col])
else:
return f.lit(False)
df=df.withColumn("flag", f.when((for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='B') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='B') & for_exist_column(df, 'U4C', lambda c: c=='C'))
| (for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='D') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='I') & for_exist_column(df, 'U4C', lambda c: c=='B')) |
| (for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='D') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='I'))
| ... 2000 other conditions
)
编辑: 尝试创建与上述内容等效的udf。任何帮助都将不胜感激
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import *
from pyspark.sql import functions as f
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df=spark.read.parquet("s3://tempOutforWith.csv")
@pandas_udf('string', PandasUDFType.SCALAR) #not sure if this is correct
def for_exist_column(df: pd.DataFrame, col: str, pre: pd.Series) -> pd.Series:
if col in df.columns:
return pd.Series(pre(df[col]))
else:
return pd.Series(False)
df=df.withColumn("flag", f.when((for_exist_column(df, 'L8A', lambda c: c=='C') & for_exist_column(df, 'B2B', lambda c: c=='B') & for_exist_column(df, 'B2C', lambda c: c=='B') & for_exist_column(df, 'U4N', lambda c: c=='C') & for_exist_column(df, 'B3N', lambda c: c=='B') & for_exist_column(df, 'U4C', lambda c: c=='C')),1).otherwise(0))
display(df)
获取以下错误:
TypeError: Invalid argument, not a string or column: DataFrame[B2B: string, B2C: string, B3N: string, L8A: string, U1A: string, U4C: string, U4N: string, Dummy: bigint, Katashiki: string] of type <class 'pyspark.sql.dataframe.DataFrame'>.
For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
目前没有回答
相关问题 更多 >
编程相关推荐