带有case语句的For循环

2024-04-26 03:34:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我正试图找到一种方法来运行for循环,以便更好地优化我的脚本,使之适合我的case语句

下面显示的脚本没有错误,但是我觉得这太冗长,可能会在下次维护期间造成混乱

df = df.withColumn('Product', when(df.where('input_file_name LIKE "%CAD%"'), 'Cash and DUE').
                   when(df.where('input_file_name LIKE "%TP%"'), 'Trade Product').
                   when(df.where('input_file_name LIKE "%LNS%"'), 'Corp Loans').
                   when(df.where('input_file_name LIKE "%DBT%"'), 'Debt').
                   when(df.where('input_file_name LIKE "%CRD%"'), 'Retail Cards').
                   when(df.where('input_file_name LIKE "%MTG%"'), 'Mortage').
                   when(df.where('input_file_name LIKE "%OD%"'), 'Overdraft').
                   when(df.where('input_file_name LIKE "%PLN%"'), 'Retail Personal Loan').
                   when(df.where('input_file_name LIKE "%CLN%"'), 'CLN').
                   when(df.where('input_file_name LIKE "%CAT%"'), 'Custody and Trust').
                   when(df.where('input_file_name LIKE "%DEP%"'), 'Deposits').
                   when(df.where('input_file_name LIKE "%STZ%"'), 'Securitization').
                   when(df.where('input_file_name LIKE "%SECZ%"'), 'Security Securitization').
                   when(df.where('input_file_name LIKE "%SEC%"'), 'Securities').
                   when(df.where('input_file_name LIKE "%MTSZ%"'), 'Retail Mortage Securitization').
                   when(df.where('input_file_name LIKE "%PLSZ%"'), 'Retail Personal Loan Securitization').
                   when(df.where('input_file_name LIKE "%CCSZ%"'), 'Retail Cards Securitization').
                   when(df.where('input_file_name LIKE "%CMN%"'), 'Cash Management').
                   when(df.where('input_file_name LIKE "%OTC%"'), 'Over-the-counter').
                   when(df.where('input_file_name LIKE "%SFT%"'), 'Securities Financing Transactions').
                   when(df.where('input_file_name LIKE "%ETD%"'), 'Excahnge Traded Deriative').
                   when(df.where('input_file_name LIKE "%DEF%"'), 'Default Products').
                   when(df.where('input_file_name LIKE "%FFS%"'), 'Not Required').
                   when(df.where('input_file_name LIKE "%hdfs%"'), 'Not Required').
                   otherwise('feed_name'));

我曾想过运行一个循环,下面是一个示例(脚本不正确,只是为了演示)

product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
   
##Both product_code & product name have the same number of index

lastIndex = len(product_code)    
    for x in product_code:
       # Logic i thought df.withColumn('Product', when(df.where('input_file_name LIKE "%'product_code[x]'%"'), product_name[x])
       if(product_code[lastIndex]): 
      #otherwise('feed_name')

如果在spark中可以运行when(df.where()).otherwise的案例循环语句,或者有其他方法或用例,我需要一些建议

已更新

我已经用adviced的方法实现了,quuery在条件集上返回了正确的值,但我想知道为什么它在下面的脚本中不返回正确的值,而是删除不符合条件的行

Sample DF:
product_code = ['%CMN%','%TP%','%LNS%']
product_name = ['Cash and Due','Trade Product']
feed_name = ['farid','arshad','jimmy']   

df = spark.createDataFrame(
     list(zip(inp_file,feed_name)),
     ['input_file_name','feed_name']
)

+---------------+---------+
|input_file_name|feed_name|
+---------------+---------+
|sdasdasdasd    |bob      |
|_CMN_BD        |arshad   |
|_CMN_BD_WS     |jimmy    |
+---------------+---------+

product_code = ['%CAD_%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%_OD_%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
   
## -- Create spark dataframe and with list tuple     
## -- Lit is used to add new column

product_ref_df = spark.createDataFrame(
     list(zip(product_code, product_name)),
     ["product_code", "product_name"]
)
    

def tempDF(df,targetField,columnTitle,condition,targetResult,show=False):
    product_ref_df = spark.createDataFrame(
         list(zip(condition,targetResult)),
         ["condition", "target_result"]
    )
    
    df.join(broadcast(product_ref_df), expr(""+targetField+" like condition")) \
    .withColumn(columnTitle, coalesce(col("target_result"), lit("feed_name"))) \
    .drop('condition','target_result') \
    .show()
    
    return df

product_ref_df = tempDF(df,'input_file_name','Product',product_code,product_name)

触发脚本时,没有错误,结果返回如图所示

+---------------+---------+------------+
|input_file_name|feed_name|     Product|
+---------------+---------+------------+
|        _CMN_BD|   arshad|Cash and Due|
|     _CMN_BD_WS|    jimmy|Cash and Due|
+---------------+---------+------------+

结果不应该返回第一行,因为我们没有删除任何行

+---------------+---------+------------+
|input_file_name|feed_name|     Product|
+---------------+---------+------------+
|    sdasdasdasd|      bob|bob         |
|        _CMN_BD|    jimmy|Cash and Due|
|     _CMN_BD_WS|    jimmy|Cash and Due|
+---------------+---------+------------+
+---------------+---------+------------+

Tags: andnamedfinputfeedcodecashproduct
2条回答

您可以使用^{}将所有when语句组合成一个语句coalesce将选取第一个非null列,并且when仅在条件匹配时给出非null;否则它将给出null(不带otherwise条件)

product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']

import pyspark.sql.functions as F

df2 = df.withColumn(
    'Product',
    F.coalesce(
        *[F.when(F.col('input_file_name').like(code), F.lit(name))
        for (code, name) in zip(product_code, product_name)]
    )
)

您可以从这些产品名称引用中创建一个新的数据框,并与原始df连接以获取产品名称:

from pyspark.sql.functions import expr, col, broadcast, coalesce

product_ref_df = spark.createDataFrame(
     list(zip(product_code, product_name)),
     ["product_code", "product_name"]
)

df.join(broadcast(product_ref_df), expr("input_file_name like product_code"), "left") \
  .withColumn("Product", coalesce(col("product_name"), col("Feed_name"))) \
  .drop("product_code", "product_name") \
  .show()

或者使用functools.reduce链接case/when条件,如下所示:

import functools

from pyspark.sql.functions import lit, col, when

case_conditions = list(zip(product_code, product_name))

product_col = functools.reduce(
    lambda acc, x: acc.when(col(f"input_file_name").like(x[1]), lit(x[1])),
    case_conditions[1:],
    when(col("input_file_name").like(case_conditions[0][0]), lit(case_conditions[0][1]))
).otherwise(col("Feed_name"))

df.withColumn("Product", product_col).show()

相关问题 更多 >