import pandas as pd
import pyspark.sql.functions as F
# create toy data
pdf = pd.DataFrame({'child':list('ABCDEFGHIJKLM'),
'parent':['','A','A','B','B','C','C','E', '', 'I','J','K','L']})
# convert to spark dataframe
df = spark.createDataFrame(pdf)
# coalesce the column parent
df = df.withColumn('parent', F.when(F.col('parent')!='', F.col('parent'))
.otherwise(F.col('child')))
# do self join using alias to direct to the right columns
res = (
df.alias('df1')
.join(df.alias('df2'), F.col('df1.parent') == F.col('df2.child'))
.join(df.alias('df3'), F.col('df2.parent') == F.col('df3.child'))
.select(['df1.child', 'df1.parent',F.col('df3.parent').alias('highest_parent')])
)
df['upper_parent'] = df['parent'].fillna(df['child'])
child parent upper_parent
0 A NaN A
1 B A A
2 C A A
3 D B B
4 E B B
5 F C C
6 G C C
7 H G G
8 I G G
9 A1 NaN A1
10 B1 A1 A1
11 C1 A1 A1
12 D1 B1 B1
13 E1 B1 B1
14 F1 C1 C1
15 G1 C1 C1
16 H1 G1 G1
17 I1 G1 G1
while df['upper_parent'].isin(upper_parent_list).sum()!=df.shape[0]:
for up_par in upper_parent_list:
child_list = list(df[df['upper_parent'].isin([up_par])]['child'])
df['upper_parent'] = np.where(df['parent'].isin(child_list), up_par, df['upper_parent'])
print(df)
child parent upper_parent
0 A NaN A
1 B A A
2 C A A
3 D B A
4 E B A
5 F C A
6 G C A
7 H G A
8 I G A
9 A1 NaN A1
10 B1 A1 A1
11 C1 A1 A1
12 D1 B1 A1
13 E1 B1 A1
14 F1 C1 A1
15 G1 C1 A1
16 H1 G1 A1
17 I1 G1 A1
在熊猫中,您可以使用
networkx
进行检查pyspark中有一种方法,只有3个级别。请注意,在示例中,最后一行有4个级别,但失败了,希望不是您的案例,而是看到它
你得到了什么
相关问题 更多 >
编程相关推荐