数据帧pyspark到di

2024-06-09 23:00:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我有这个数据帧路径:

path_df.show()
+---------------+-------------+----+
|FromComponentID|ToComponentID|Cost|
+---------------+-------------+----+
|            160|          163|27.0|
|            160|          183|27.0|
|            161|          162|22.0|
|            161|          170|31.0|
|            162|          161|22.0|
|            162|          167|24.0|
|            163|          160|27.0|
|            163|          164|27.0|
|            164|          163|27.0|
|            164|          165|35.0|
|            165|          164|35.0|
|            165|          166|33.0|
|            166|          165|33.0|
|            166|          167|31.0|
|            167|          162|24.0|
|            167|          166|31.0|
|            167|          168|27.0|
|            168|          167|27.0|
|            168|          169|23.0|
|            169|          168|23.0|
+---------------+-------------+----+
only showing top 20 rows

由此,我想做一个措辞,如下: {FromComponentID:{ToComponentID:Cost}}

对于我目前的数据,应该是:

{160 : {163 : 27,
        183 : 27},
 161 : {162 : 22,
        170 : 31},
 162 : {161 : 22
        167 : 24},
 ...
 167 : {162 : 24,
        166 : 31,
        168 : 27}
 168 : {167 : 27,
        169 : 23},
 169 : {168 : 23}
}

我能只用PySpark做吗?怎么做?或者最好是提取我的数据并用python直接处理它们。


Tags: 数据path路径onlydftopshowpyspark
3条回答

你可以这样试试

df_prod = spark.read.csv('/path/to/sample.csv',inferSchema=True,header=True)
rdd = df_prod.rdd.map(lambda x: {x['FromComponentID']:{x['ToComponentID']:x['Cost']}})
rdd.collect()

所有这些都可以通过数据帧转换和udf来完成。唯一有点烦人的是,因为技术上有两种不同类型的字典(一种是key=integer和value=dictionary,另一种是key=integer value=float),所以必须用不同的数据类型定义两个udf。有一种可能的方法:

from pyspark.sql.functions import udf,collect_list,create_map
from pyspark.sql.types import MapType,IntegerType,FloatType

data = [[160,163,27.0],[160,183,27.0],[161,162,22.0],
      [161,170,31.0],[162,161,22.0],[162,167,24.0],
      [163,160,27.0],[163,164,27.0],[164,163,27.0],
      [164,165,35.0],[165,164,35.0],[165,166,33.0],
      [166,165,33.0],[166,167,31.0],[167,162,24.0],
      [167,166,31.0],[167,168,27.0],[168,167,27.0],
      [168,169,23.0],[169,168,23.0]]

cols = ['FromComponentID','ToComponentID','Cost']
df = spark.createDataFrame(data,cols)

combineMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),FloatType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
             MapType(IntegerType(),MapType(IntegerType(),FloatType())))

mapdf = df.groupBy('FromComponentID')\
.agg(collect_list(create_map('ToComponentID','Cost')).alias('maps'))\
.agg(combineDeepMap(collect_list(create_map('FromComponentID',combineMap('maps')))))

result_dict = mapdf.collect()[0][0]

对于大型数据集,这将比要求将数据收集到单个节点上的解决方案提供一些性能提升。但由于spark仍然需要序列化udf,因此与基于rdd的解决方案相比不会有太大的收益。


更新:

rdd解决方案要简洁得多,但在我看来,它并没有那么干净。这是因为pyspark不太容易将大型字典存储为rdd。解决方案是将其存储为元组的分布式列表,然后在将其收集到单个节点时将其转换为字典。以下是一个可能的解决方案:

maprdd = df.rdd.groupBy(lambda x:x[0]).map(lambda x:(x[0],{y[1]:y[2] for y in x[1]}))
result_dict = dict(maprdd.collect()) 

同样,这应该比单节点上的纯python实现提供性能提升,并且它可能与dataframe实现没有太大的不同,但是我的期望是dataframe版本将更具性能。

我知道最简单的方法如下(但有熊猫依赖性):

path_df.toPandas().set_index('FromComponentID').T.to_dict('list')

相关问题 更多 >