使用Hadoop Streaming进行Avro转换的Python脚本
我有一个10GB的输入文件,想用Python的Hadoop流处理把它转换成avro格式。这个任务成功了,但我用avro阅读器却无法读取输出。
它提示“'utf8'编解码器无法解码字节0xb4,在位置13924:无效的起始字节。”
问题在于我在Hadoop流处理中使用了标准输出作为映射器的输出,如果我在本地使用文件名和脚本,avro输出是可以读取的。
有没有什么想法可以解决这个问题?我觉得问题可能出在流处理中的键/值处理上……
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar \
-input "xxx.txt" \
-mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc" \
-reducer NONE \
-output "xxxxx" -file "mapper.py" \
-lazyOutput \
-file "x.avsc"
映射器脚本是
import sys
import re
import os
from avro import schema, datafile
import avro.io as io
import StringIO
schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
rec_writer = io.DatumWriter(SCHEMA)
df_writer = datafile.DataFileWriter(sys.stdout, rec_writer, SCHEMA,)
header = []
for field in SCHEMA.fields:
header.append(field.name)
for line in sys.stdin:
fields = line.rstrip().split("\x01")
data = dict(zip(header, fields))
try:
df_writer.append(data)
except Exception, e:
print "failed with data: %s" % str(data)
print str(e)
df_writer.close()
1 个回答
2
最后我解决了这个问题。使用输出格式类,把avro二进制转换的工作交给它。在流式映射器中,只需要发出json记录就可以了。
hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar \
-libjars avro-json-1.2.jar \
-jobconf output.schema.url=hdfs:///x.avsc \
-input "xxxxx" \
-mapper "/opt/anaconda/anaconda21/bin/python mapper.py x.avsc" \
-reducer NONE \
-output "/xxxxx" \
-outputformat com.cloudera.science.avro.streaming.AvroAsJSONOutputFormat \
-lazyOutput \
-file "mapper.py" \
-file "x.avsc"
这是mapper.py的内容
import sys
from avro import schema
import json
schema_str = open("xxxxx.avsc", 'r').read()
SCHEMA = schema.parse(schema_str)
header = []
for field in SCHEMA.fields:
header.append(field.name)
for line in sys.stdin:
fields = line.rstrip().split("\x01")
data = dict(zip(header, fields))
try:
print >> sys.stdout, json.dumps(data, encoding='ISO-8859-1')
except Exception, e:
print "failed with data: %s" % str(data)
print str(e)