在Hadoop Streaming中使用Python文件

2 投票
2 回答
3072 浏览
提问于 2025-04-18 02:33

我刚接触Hadoop和MapReduce,正在努力学习这方面的知识。现在我想用Python开发一个MapReduce应用,使用两个CSV文件中的数据。我在mapper中读取这两个文件,然后把文件中的键值对打印到系统输出。

在单台机器上运行这个程序时一切正常,但在使用Hadoop Streaming时却出现了错误。我觉得我在Hadoop的mapper中读取文件时可能犯了些错误。请帮我看看代码,并告诉我在Hadoop Streaming中如何处理文件。下面是mapper.py的代码。(你可以通过注释理解代码):

#!/usr/bin/env python
import sys
from numpy import genfromtxt

def read_input(inVal):
    for line in inVal:
        # split the line into words
        yield line.strip()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    labels=[]
    data=[]    
    incoming = read_input(sys.stdin)
    for vals in incoming:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited;
        if len(vals) > 10:
            data.append(vals)
        else:
            labels.append(vals)

    for i in range(0,len(labels)):
        print "%s%s%s\n" % (labels[i], separator, data[i])


if __name__ == "__main__":
    main()

从两个CSV文件中输入到这个mapper的记录总共有60000条(是在单台机器上,不是在Hadoop集群上):

cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py

2 个回答

0

你没有贴出你的错误信息。在进行流式处理时,你需要传递 -file 参数或者 -input 参数,这样系统才能知道要么把文件和你的流式任务一起上传,要么知道在 hdfs 上哪里可以找到这个文件。

4

我花了大约三天时间才找到解决办法。

问题出在我使用的Hadoop新版本(我用的是2.2.0)。在读取文件中的值时,mapper代码在某个时刻返回了一个非零的退出代码(可能是因为它一次读取了很多值(784个))。在Hadoop 2.2.0中,有一个设置可以让Hadoop系统返回一个通用错误(子进程失败,代码为1)。这个设置默认是开启的。我只需要把这个属性的值改成关闭,就能让我的代码顺利运行,没有错误。

这个设置是:stream.non.zero.exit.is.failure。在进行流式处理时,把它设置为false就可以了。所以流式命令大概是这样的:

**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**

希望这个能帮到某个人,省下三天的时间... ;)

撰写回答