python到pyspark,在pysp中转换轴

2024-05-15 22:15:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经在下面的DataFrame中实现了预期的python输出。但我想把它变成pyspark。在

d = {'user': ['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'], 'songs' : [11,22,99,11,11,44,66,66,33,55,11,77]}
data = pd.DataFrame(data = d)


e = {'user': ['A', 'B','C', 'D',  'E', 'F','A'], 'cluster': [1,2,3,1,2,3,2]}
clus = pd.DataFrame(data= e)

期望输出:我希望实现特定集群的用户没有听到的所有歌曲。A belongs to cluster 1, and cluster 1 has songs [11,22,33,44] so A hasnt listened to [33,44]所以我使用下面的python代码实现了这一点。在

^{pr2}$

PYTHON代码:

df = pd.merge(data, clus, on='user', how='left').drop_duplicates(['user','movie'])

df1 = (df.groupby(['cluster']).apply(lambda x: x.pivot('user','movie','cluster').isnull())
        .fillna(False)
        .reset_index(level=0, drop=True)
        .sort_index())

s = np.where(df1, ['{}'.format(x) for x in df1.columns], '')

#remove empty values
s1 = pd.Series([''.join(x).strip(', ') for x in s], index=df1.index)
print (s1)

在pyspark分布式编码中实现同样的效果?在


Tags: to代码dataframedfdataindexmoviedrop
1条回答
网友
1楼 · 发布于 2024-05-15 22:15:41

可能有比这更好的解决办法,但它是有效的。在

假设每个用户只属于一个集群

import pyspark.sql.functions as F
from pyspark.sql.types import *

d = zip(['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'],[11,22,99,11,11,44,66,66,33,55,11,77])
data = sql.createDataFrame(d).toDF('user','songs')

这给了我们

^{pr2}$

创建集群假设每个用户只属于一个集群

c = zip(['A', 'B','C', 'D',  'E', 'F'],[1,2,3,1,2,3])
clus = sql.createDataFrame(c).toDF('user','cluster')
clus.show()

+  +   -+
|user|cluster|
+  +   -+
|   A|      1|
|   B|      2|
|   C|      3|
|   D|      1|
|   E|      2|
|   F|      3|
+  +   -+

现在,我们得到用户听到的所有歌曲及其群集

all_combine = data.groupBy('user').agg(F.collect_list('songs').alias('songs'))\
                  .join(clus, data.user==clus.user).select(data['user'],'songs','cluster')
all_combine.show()
+  +    +   -+                                                         
|user|   songs|cluster|
+  +    +   -+
|   F|[11, 77]|      3|
|   E|[66, 55]|      2|
|   B|[99, 11]|      2|
|   D|[44, 33]|      1|
|   C|[11, 66]|      3|
|   A|[11, 22]|      1|
+  +    +   -+

最后,计算集群中听到的所有歌曲,以及该集群中用户未听到的所有歌曲

not_listened = F.udf(lambda song,all_: list(set(all_) - set(song)) , ArrayType(IntegerType()))

grouped_clusters = data.join(clus, data.user==clus.user).select(data['user'],'songs','cluster')\
                    .groupby('cluster').agg(F.collect_list('songs').alias('all_songs'))\
                    .join(all_combine, ['cluster']).select('user', all_combine['cluster'], 'songs', 'all_songs')\
                    .select('user', not_listened(F.col('songs'), F.col('all_songs')).alias('not_listened'))
grouped_clusters.show()

我们得到的输出是

+  +      +                                                             
|user|not_listened|
+  +      +
|   D|    [11, 22]|
|   A|    [33, 44]|
|   F|        [66]|
|   C|        [77]|
|   E|    [99, 11]|
|   B|    [66, 55]|
+  +      +

相关问题 更多 >