我正试图找到一种方法来运行for循环,以便更好地优化我的脚本,使之适合我的case语句
下面显示的脚本没有错误,但是我觉得这太冗长,可能会在下次维护期间造成混乱
df = df.withColumn('Product', when(df.where('input_file_name LIKE "%CAD%"'), 'Cash and DUE').
when(df.where('input_file_name LIKE "%TP%"'), 'Trade Product').
when(df.where('input_file_name LIKE "%LNS%"'), 'Corp Loans').
when(df.where('input_file_name LIKE "%DBT%"'), 'Debt').
when(df.where('input_file_name LIKE "%CRD%"'), 'Retail Cards').
when(df.where('input_file_name LIKE "%MTG%"'), 'Mortage').
when(df.where('input_file_name LIKE "%OD%"'), 'Overdraft').
when(df.where('input_file_name LIKE "%PLN%"'), 'Retail Personal Loan').
when(df.where('input_file_name LIKE "%CLN%"'), 'CLN').
when(df.where('input_file_name LIKE "%CAT%"'), 'Custody and Trust').
when(df.where('input_file_name LIKE "%DEP%"'), 'Deposits').
when(df.where('input_file_name LIKE "%STZ%"'), 'Securitization').
when(df.where('input_file_name LIKE "%SECZ%"'), 'Security Securitization').
when(df.where('input_file_name LIKE "%SEC%"'), 'Securities').
when(df.where('input_file_name LIKE "%MTSZ%"'), 'Retail Mortage Securitization').
when(df.where('input_file_name LIKE "%PLSZ%"'), 'Retail Personal Loan Securitization').
when(df.where('input_file_name LIKE "%CCSZ%"'), 'Retail Cards Securitization').
when(df.where('input_file_name LIKE "%CMN%"'), 'Cash Management').
when(df.where('input_file_name LIKE "%OTC%"'), 'Over-the-counter').
when(df.where('input_file_name LIKE "%SFT%"'), 'Securities Financing Transactions').
when(df.where('input_file_name LIKE "%ETD%"'), 'Excahnge Traded Deriative').
when(df.where('input_file_name LIKE "%DEF%"'), 'Default Products').
when(df.where('input_file_name LIKE "%FFS%"'), 'Not Required').
when(df.where('input_file_name LIKE "%hdfs%"'), 'Not Required').
otherwise('feed_name'));
我曾想过运行一个循环,下面是一个示例(脚本不正确,只是为了演示)
product_code = ['%CAD%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%OD%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
##Both product_code & product name have the same number of index
lastIndex = len(product_code)
for x in product_code:
# Logic i thought df.withColumn('Product', when(df.where('input_file_name LIKE "%'product_code[x]'%"'), product_name[x])
if(product_code[lastIndex]):
#otherwise('feed_name')
如果在spark中可以运行when(df.where()).otherwise
的案例循环语句,或者有其他方法或用例,我需要一些建议
已更新
我已经用adviced的方法实现了,quuery在条件集上返回了正确的值,但我想知道为什么它在下面的脚本中不返回正确的值,而是删除不符合条件的行
Sample DF:
product_code = ['%CMN%','%TP%','%LNS%']
product_name = ['Cash and Due','Trade Product']
feed_name = ['farid','arshad','jimmy']
df = spark.createDataFrame(
list(zip(inp_file,feed_name)),
['input_file_name','feed_name']
)
+---------------+---------+
|input_file_name|feed_name|
+---------------+---------+
|sdasdasdasd |bob |
|_CMN_BD |arshad |
|_CMN_BD_WS |jimmy |
+---------------+---------+
product_code = ['%CAD_%','%TP%','%LNS%','%DBT%','%CRD%','%MTG%','%_OD_%','%PLN%','%CLN%','%CAT%','%DEP%','%STZ%','%SECZ%','%SEC%','%MTSZ%','%PLSZ%','%CCSZ%','%CMN%','%OTC%','%SFT%','%ETD%','%DEF%','%FFS%','%hdfs%']
product_name = ['Cash and Due','Trade Product','Corp Loans','Debt','Retail Cards','Mortage','Overdraft','Retail Personal Loan','CLN','Custody and Trust','Deposits','Securitization','Securities Securitization','Securities','Retail Mortage Securitization','Retail Personal Loan Securitization','Retail Cards Securitization','Cash Management','Over-the-counter','Securities Finanacing Transactions','Exchange Traded Derivative','Default Products','Not Required','Not Required']
## -- Create spark dataframe and with list tuple
## -- Lit is used to add new column
product_ref_df = spark.createDataFrame(
list(zip(product_code, product_name)),
["product_code", "product_name"]
)
def tempDF(df,targetField,columnTitle,condition,targetResult,show=False):
product_ref_df = spark.createDataFrame(
list(zip(condition,targetResult)),
["condition", "target_result"]
)
df.join(broadcast(product_ref_df), expr(""+targetField+" like condition")) \
.withColumn(columnTitle, coalesce(col("target_result"), lit("feed_name"))) \
.drop('condition','target_result') \
.show()
return df
product_ref_df = tempDF(df,'input_file_name','Product',product_code,product_name)
触发脚本时,没有错误,结果返回如图所示
+---------------+---------+------------+
|input_file_name|feed_name| Product|
+---------------+---------+------------+
| _CMN_BD| arshad|Cash and Due|
| _CMN_BD_WS| jimmy|Cash and Due|
+---------------+---------+------------+
结果不应该返回第一行,因为我们没有删除任何行
+---------------+---------+------------+
|input_file_name|feed_name| Product|
+---------------+---------+------------+
| sdasdasdasd| bob|bob |
| _CMN_BD| jimmy|Cash and Due|
| _CMN_BD_WS| jimmy|Cash and Due|
+---------------+---------+------------+
+---------------+---------+------------+
您可以使用^{} 将所有
when
语句组合成一个语句coalesce
将选取第一个非null列,并且when
仅在条件匹配时给出非null;否则它将给出null(不带otherwise
条件)您可以从这些产品名称引用中创建一个新的数据框,并与原始df连接以获取产品名称:
或者使用
functools.reduce
链接case/when
条件,如下所示:相关问题 更多 >
编程相关推荐