使用Python的MRjob生成前10个值的MapReduce作业

2024-04-27 19:50:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我想这个地图减少工作(代码如下)输出前10名最受好评的产品。它总是给我以下错误消息:

it=izip(iterable,count(0,-1))#装饰 TypeError:izip参数#1必须支持迭代。在

我想这和我试图应用的最大的函数有关。在

有什么建议吗?在

谢谢你!在

from mrjob.job import MRJob
from mrjob.step import MRStep
from heapq import nlargest


class MostRatedProduct(MRJob):

def steps(self):
    return [
        MRStep(mapper = self.mapper_get_ratings,
               reducer = self.reducer_count_ratings),
        MRStep(reducer = self.reducer_find_top10)
    ]


def mapper_get_ratings(self, _, line):
    (userID, itemID, rating, timestamp) = line.split(',')
    yield itemID, 1

def reducer_count_ratings(self, itemID, ratingCount):
    yield None, (sum(ratingCount), itemID)

def top_10(self, ratingPair):
    for ratingTotal, itemID in ratingPair:
        top_rated = nlargest(10, ratingTotal)
    for top_rated in ratingTotal:
        return (ratingTotal, itemID)

def reducer_find_top10(self, key, ratingPair):
    ratingTotal, itemID = self.top_10(ratingPair)
    yield ratingTotal, itemID


if __name__ == '__main__':
    MostRatedProduct.run()

Tags: fromimportselftopdefcountmapperyield
2条回答

我没有使用mrjob,但是我以前在AWS集群上使用MapReduce来查找top值。这是我的代码,它不使用heapq。希望您能够将相同的概念应用到您的代码中。这是mapper函数

import sys, time

def Parser():
    for line in sys.stdin:
        line = line.strip('\n')
        yield line.split()


def mapper():
    counts = list(Parser())
    z = sorted(counts, key = lambda x: int(x[1]))[-10:]
    print '\n'.join(map(lambda x: '\t'.join(x), z))


if __name__=='__main__':
    mapper()

这是减速器的代码

^{pr2}$

我把它改为输出前10个单词。请记住,这是一个字数统计示例,我在其中解析了一个文本文档。我希望这在某种程度上有所帮助!在

使用mrjob库,您可以在Python:-在

#Write a Code to print the top 5 word - occurences

#Import Dependencies
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordCount(MRJob):

  def steps(self):
    return [MRStep(mapper=self.mapper,reducer=self.reducer),MRStep(reducer = self.secondreducer)]

  def mapper(self,_,lines):
    words = lines.split()
    for word in words:
      yield word.lower(),1

  def reducer(self,key,values):
    yield None,('%04d'%int(sum(values)),key)

  def secondreducer(self,key,values):
    self.alist = []
    for value in values:
      self.alist.append(value)
    self.blist = []
    for i in range(5):
      self.blist.append(max(self.alist))
      self.alist.remove(max(self.alist))
    for i in range(5):
      yield self.blist[i]

if __name__ == '__main__':
    MRWordCount.run()

相关问题 更多 >