在Hadoop Streaming中创建独立读取文件的函数时遇到问题
我在使用Hadoop Streaming时,遇到了一个问题,就是想单独写一个函数来读取文件。
mapper.py: 运行得不错(但效率很低)
#!/usr/bin/env python import sys def main(): for line in sys.stdin: line = line.strip() # each line contains only one word, 5+ million lines filename = "my_dict.txt" # contains 7+ million words f = open(filename,"r") for line1 in f: line1 = line1.strip() if line1 == line: print '%s\t%s' % (line1, 1) if __name__ == '__main__': main()
我用来读取文件的函数
mapper.py: 没有成功
#!/usr/bin/env python import sys def read_dict(): listD=[] filename = "my_dict.txt" f = open(filename,"r") for line1 in f: listD.append(line1.strip()) return listD def main(): listDM = set(read_dict()) for line in sys.stdin: line = line.strip() for line1 in listDM: if line1 == line: print '%s\t%s' % (line1, 1) if __name__ == '__main__': main()
我的错误日志:
hadoop_admin@gml-VirtualBox:/usr/local/hadoop$ sh myScripts/runHadoopStream.sh Deleted hdfs://localhost:9000/user/hadoop_admin/output packageJobJar: [/var/www/HMS/my_dict.txt, /usr/local/hadoop/mapper.py, /usr/local/hadoop/reducer.py, /tmp/hadoop-hadoop_admin/hadoop-unjar6634198925772314314/] [] /tmp/streamjob1880303974118879660.jar tmpDir=null 11/07/20 18:40:08 INFO mapred.FileInputFormat: Total input paths to process : 2 11/07/20 18:40:08 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop_admin/mapred/local] 11/07/20 18:40:08 INFO streaming.StreamJob: Running job: job_201107181559_0091 11/07/20 18:40:08 INFO streaming.StreamJob: To kill this job, run: 11/07/20 18:40:08 INFO streaming.StreamJob: /usr/local/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201107181559_0091 11/07/20 18:40:08 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201107181559_0091 11/07/20 18:40:09 INFO streaming.StreamJob: map 0% reduce 0% 11/07/20 18:40:41 INFO streaming.StreamJob: map 1% reduce 0% 11/07/20 18:40:47 INFO streaming.StreamJob: map 0% reduce 0% 11/07/20 18:41:05 INFO streaming.StreamJob: map 1% reduce 0% 11/07/20 18:41:08 INFO streaming.StreamJob: map 0% reduce 0% 11/07/20 18:41:26 INFO streaming.StreamJob: map 1% reduce 0% 11/07/20 18:41:29 INFO streaming.StreamJob: map 0% reduce 0% 11/07/20 18:41:48 INFO streaming.StreamJob: map 1% reduce 0% 11/07/20 18:41:51 INFO streaming.StreamJob: map 0% reduce 0% 11/07/20 18:41:57 INFO streaming.StreamJob: map 100% reduce 100% 11/07/20 18:41:57 INFO streaming.StreamJob: To kill this job, run: 11/07/20 18:41:57 INFO streaming.StreamJob: /usr/local/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=localhost:9001 -kill job_201107181559_0091 11/07/20 18:41:57 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201107181559_0091 11/07/20 18:41:57 ERROR streaming.StreamJob: Job not Successful! 11/07/20 18:41:57 INFO streaming.StreamJob: killJob... Streaming Job Failed!
用来运行Hadoop Streaming的Shell脚本:
bin/hadoop dfs -rmr output bin/hadoop jar contrib/streaming/hadoop-*-streaming.jar -file /var/www/HMS/my_dict.txt -file /usr/local/hadoop/mapper.py -mapper /usr/local/hadoop/mapper.py -file /usr/local/hadoop/reducer.py -reducer /usr/local/hadoop/reducer.py -input input/ -output output/
1 个回答
1
试试这个简化的脚本:
#!/usr/bin/env python
import sys
def main():
filename = "my_dict.txt"
listfile = open(filename)
# doesn't create an itermediate list
listDM = set(line.strip() for line in listfile)
# less Pythonic but significantly faster
# still doesn't create an intermediate list
# listDM = set(imap(str.strip, listfile))
listfile.close()
for line in sys.stdin:
line = line.strip()
if line in listDM:
print '%s\t%d' % (line, 1)
if __name__ == '__main__':
main()
如果你使用更快的被注释掉的替代方案,你需要在开头加上 from itertools import imap
。