PySpark rdd中最常见的按组分类,不转换为Spark Datafram

2024-04-25 22:52:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我想要每组中最常上的课。 每个组中可以有多行,也可以有多个类。 我们可以忽略tie的问题,因为这个python应该自动接受第一个类。你知道吗

我尝试将rdd更改为spark数据帧,然后使用下面链接pyspark: aggregate on the most frequent value in a column中的代码

不过,我尝试在不将数据转换为SparkDataframe的情况下执行此操作

下面是数据集

Data= sc.parallelize([(1, 'class1', 0.0),
       (1, 'class1', 2.9870435922860854),
       (1, 'class1', 3.1390539564237088),
       (2, 'class1', 1.8147552294243288),
       (2, 'class1', 2.2762141107738643),
       (2, 'class1', 2.3276650040679754),
       (3, 'class1', 2.1916602976063415),
       (3, 'class2', 2.8092745089004265),
       (3, 'class2', 2.962653217205646),
       (4, 'class2', 1.9684050295783773),
       (4, 'class2', 2.6954556024643974),
       (4, 'class1', 2.849277442723792),
       (5, 'class2', 2.42178294501635),
       (5, 'class2', 3.650846798310411),
       (5, 'class1', 4.209012410198228),
       (6, 'class1', 1.942895930291406),
       (6, 'class1', 2.3133629778496676),
       (6, 'class2', 3.0147225096785264),
       (7, 'class1', 1.7185194340256884),
       (7, 'class1', 2.91322741107079),
       (7, 'class1', 3.5767422323347633),
       (8, 'class1', 2.4711392945465893),
       (8, 'class1', 3.436547108084221),
       (8, 'class1', 3.937683211352823),
       (9, 'class1', 3.800013103330196),
       (9, 'class1', 4.632413017908266),
       (9, 'class1', 5.191184050603831),

预期产量

  [(1, Class1),(2,Class1),(3,Class2),(4,Class2),(5,Class2),(6,Class1),(7,Class1),(8,Class1),(9,Class1)]

此外,我可能有多个类。你知道吗

每行中的第一个元素是group id,第二个元素是class,第三个元素是distance,我认为这没有多大用处。你知道吗


Tags: the数据元素most链接onsparkpyspark
2条回答

这是pyspark的RDD解决方案

>>> rddMap1 = Data.map(lambda x: (str(x[0])+','+x[1],float(x[2])))
>>> rddMap1.first()
('1,class1', 0.0)
>>> 
>>> rddReduce1 = rddMap1.reduceByKey(lambda x,y: x+y)
>>> rddReduce1.first()
('1,class1', 6.126097548709794)
>>> 
>>> rddMap2 = rddReduce1.map(lambda x: (int(x[0].split(',')[0]),(x[0].split(',')[1],x[1])))
>>> rddMap2.first()
(1, ('class1', 6.126097548709794))
>>> 
>>> rddReduce2 = rddMap2.reduceByKey(lambda x,y: x if x[1] > y[1] else y)
>>> rddReduce2.first()
(1, ('class1', 6.126097548709794))
>>> 
>>> rddReduce2.map(lambda x: (x[0],x[1][0])).collect()
[(1, 'class1'), (2, 'class1'), (3, 'class2'), (4, 'class2'), (5, 'class2'), (6, 'class1'), (7, 'class1'), (8, 'class1'), (9, 'class1')]
from operator import add
from operator import itemgetter

data= sc.parallelize([
   (1, 'class1', 0.0),
   (1, 'class1', 2.9870435922860854),
   (1, 'class1', 3.1390539564237088),
   (2, 'class1', 1.8147552294243288),
   (2, 'class1', 2.2762141107738643),
   (2, 'class1', 2.3276650040679754),
   (3, 'class1', 2.1916602976063415),
   (3, 'class1', 2.1916602976063415),
   (3, 'class1', 2.1916602976063415),
   (3, 'class2', 2.8092745089004265),
   (3, 'class2', 2.962653217205646),
   (3, 'class4', 1.9684050295783773),
   (3, 'class4', 2.6954556024643974),
   (3, 'class4', 2.849277442723792),
   (3, 'class4', 2.42178294501635),
   (5, 'class2', 3.650846798310411),
   (5, 'class1', 4.209012410198228),
   (6, 'class1', 1.942895930291406),
   (6, 'class1', 2.3133629778496676),
   (6, 'class2', 3.0147225096785264),
   (7, 'class1', 1.7185194340256884),
   (7, 'class1', 2.91322741107079),
   (7, 'class1', 3.5767422323347633),
   (8, 'class1', 2.4711392945465893)
                 ])

data2 = data.map(lambda x: ((x[0],x[1]), 1)).reduceByKey(add).map(lambda x: ((x[0][0]),(x[0][1],x[1]))).groupByKey().mapValues(list)

data3 = data2.map(lambda (k, l): (k, sorted(l, key=itemgetter(1), reverse=True)))

data4 = data3.mapValues(lambda x: (x[0]))

退货:

[(8, ('class1', 1)), (1, ('class1', 3)), (2, ('class1', 3)), (3, ('class4', 4)), (5, ('class1', 1)), (6, ('class1', 2)), (7, ('class1', 3))]

随时给我斯卡拉!你知道吗

领带不符合规定。不知道你会怎么做,无论如何,如果做了什么真正的好处是不那么明显。你知道吗

相关问题 更多 >