如何在pyspark中使用稠密的_rank()函数?

2024-06-07 10:29:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行pyspark脚本,其中运行sql查询并创建数据帧。 sql查询中有稠密的_rank()函数。由于此查询需要花费太多时间才能完全执行

有没有办法快速执行查询,或者我们可以在pyspark级别处理这个问题? pyspark中是否有任何函数或方法可以替换sql中的稠密_rank()

SQL:

SELECT  DENSE_RANK() OVER(ORDER BY SOURCE_COLUMN_VALUE) AS SYSTEM_ID,SYSTEM_TABLE_NAME,SOURCE_ID,SOURCE_NAME,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME,SRC_VALUE AS SOURCE_COLUMN_VALUE,IM_INSERT_DT FROM (SELECT ID AS SOURCE_ID,'AMPIL' AS SOURCE_NAME,UPPER(concat(coalesce(addr_line_1,''),';',coalesce(addr_line_2,''),';',coalesce(city_1,''),';',coalesce(state_1,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,''))) as  SOURCE_COLUMN_VALUE,concat(coalesce(addr_line1_src,''),';',coalesce(addr_line2_src,''),';',coalesce(city_src,''),';',coalesce(state_crc,''),';',coalesce(zip_1,''),';',coalesce(cntry_1,'')) as SRC_VALUE,SOURCE_TABLE_NAME,'ADDRESS' AS SYSTEM_TABLE_NAME,SOURCE_COLUMN_NAME,date_format(current_timestamp(),'yyyy-MM-dd hh:mm:ss') as IM_INSERT_DT from (SELECT ID,regexp_replace(addr_line_1,' ','') as addr_line_1,Upper(addr_line_1) as addr_line1_src,regexp_replace(addr_line_2,' ','') as addr_line_2 ,upper(addr_line_2) as addr_line2_src,regexp_replace(UPPER(coalesce(city,meli_city_nm)),' ','') as city_1,UPPER(coalesce(city,meli_city_nm)) as city_src,regexp_replace(coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)),' ','') as state_1, coalesce(meli_stt_provncd,coalesce(vw_states_code.state_cd,state)) as state_crc,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then 'USA' when UPPER(coalesce(vw_states_code.country_cd,country)) = 'CANADA' then 'CA' else regexp_replace(UPPER(coalesce(vw_states_code.country_cd,country)),' ','') end as cntry_1,case when UPPER(coalesce(vw_states_code.country_cd,country)) = 'US' then regexp_extract(substr(trim(regexp_replace(zip,' ','')),0,5),'^[0-9]{5}$',0) else regexp_replace(zip,' ','') end as zip_1,SOURCE_TABLE_NAME,SOURCE_COLUMN_NAME from vw_addr_stg LEFT JOIN (select * from vw_dmn_meli_zip where MELI_LAST_LN = 'L') vw_dmn_meli  on vw_addr_stg.zip=vw_dmn_meli.meli_zip_cd_base LEFT JOIN vw_states_code on (coalesce(meli_stt_provncd,state) = vw_states_code.state_cd or vw_states_code.state_nm = vw_addr_stg.state) LEFT JOIN vw_country_codes on vw_country_codes.country_name = vw_addr_stg.country))

Tags: namecitysourceaslinecodezipcountry
1条回答
网友
1楼 · 发布于 2024-06-07 10:29:30

pyspark中,可以使用Window函数和SQL函数的组合来获得所需的内容。我不是SQL流利的人,也没有测试过解决方案,但类似的东西可能会帮助您:

import pyspark.sql.Window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy("SOURCE_COLUMN_VALUE")
df.withColumn("SYSTEM_ID", dense_rank().over(w))

您可以找到dense_rankhere的文档

相关问题 更多 >

    热门问题