Hadoop Streaming: Mapper '封装' 二进制可执行文件
我现在在一个大型大学的计算机集群上运行一个数据处理流程。为了发表论文,我想把它转换成mapreduce格式,这样任何人在像亚马逊云服务(AWS)这样的hadoop集群上都可以运行。这个流程目前由一系列的Python脚本组成,这些脚本包裹了不同的二进制可执行文件,并使用Python的subprocess和tempfile模块来管理输入和输出。不幸的是,我并不是这些二进制可执行文件的作者,很多都不支持标准输入(STDIN),或者输出的标准输出(STDOUT)格式不太好用(比如只把结果写入文件)。这些问题就是我为什么要用Python把它们包裹起来的原因。
到目前为止,我已经能够修改我的Python代码,使我可以在本地机器上以标准的“测试格式”运行一个mapper和一个reducer。
$ cat data.txt | mapper.py | reducer.py
这个mapper会按照它包裹的二进制文件需要的格式来处理每一行数据,然后通过subprocess.popen把文本发送给二进制文件(这也让我可以屏蔽很多无用的标准输出),接着收集我想要的标准输出,并把它格式化成适合reducer的文本行。
当我尝试在本地的hadoop安装上复制这个命令时,就出现了问题。我可以让mapper执行,但它给出的错误提示表明找不到二进制可执行文件。
文件 "/Users/me/Desktop/hadoop-0.21.0/./phyml.py", 第69行,在 main() 文件 "/Users/me/Desktop/hadoop-0.21.0/./mapper.py", 第66行,在 main phyml(None) 文件 "/Users/me/Desktop/hadoop-0.21.0/./mapper.py", 第46行,在 phyml ft = Popen(cli_parts, stdin=PIPE, stderr=PIPE, stdout=PIPE) 文件 "/Library/Frameworks/Python.framework/Versions/6.1/lib/python2.6/subprocess.py", 第621行,在 init errread, errwrite) 文件 "/Library/Frameworks/Python.framework/Versions/6.1/lib/python2.6/subprocess.py", 第1126行,在 _execute_child raise child_exception OSError: [Errno 13] 权限被拒绝
我的hadoop命令看起来是这样的:
./bin/hadoop jar /Users/me/Desktop/hadoop-0.21.0/mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \
-input /Users/me/Desktop/Code/AWS/temp/data.txt \
-output /Users/me/Desktop/aws_test \
-mapper mapper.py \
-reducer reducer.py \
-file /Users/me/Desktop/Code/AWS/temp/mapper.py \
-file /Users/me/Desktop/Code/AWS/temp/reducer.py \
-file /Users/me/Desktop/Code/AWS/temp/binary
正如我上面提到的,mapper似乎不知道这个二进制文件 - 也许它没有被发送到计算节点?不幸的是,我真的不知道问题出在哪里。任何帮助都会非常感激。如果能看到一些用Python写的hadoop流式mapper/reducer,能够包裹二进制可执行文件,那就太好了。我想我不是第一个尝试这样做的人!事实上,这里还有另一篇帖子在问基本上相同的问题,但还没有得到回答...
2 个回答
终于让它运行起来了
$pid = open2 (my $out, my $in, "./binary") or die "could not run open2";
经过一番搜索,我终于找到了如何让你的映射器和归约器可以使用可执行的二进制文件、脚本或模块的方法。关键是先把所有文件上传到Hadoop。
$ bin/hadoop dfs -copyFromLocal /local/file/system/module.py module.py
然后你需要按照下面的模板来格式化你的流处理命令:
$ ./bin/hadoop jar /local/file/system/hadoop-0.21.0/mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \
-file /local/file/system/data/data.txt \
-file /local/file/system/mapper.py \
-file /local/file/system/reducer.py \
-cacheFile hdfs://localhost:9000/user/you/module.py#module.py \
-input data.txt \
-output output/ \
-mapper mapper.py \
-reducer reducer.py \
-verbose
如果你要链接一个Python模块,你需要在你的映射器或归约器脚本中添加以下代码:
import sys
sys.path.append('.')
import module
如果你是通过子进程来访问一个二进制文件,你的命令应该像这样:
cli = "./binary %s" % (argument)
cli_parts = shlex.split(cli)
mp = Popen(cli_parts, stdin=PIPE, stderr=PIPE, stdout=PIPE)
mp.communicate()[0]
希望这些信息对你有帮助。