映射完成后,java Reduce不会启动
下面是我使用自定义可写程序实现简单MapReduce作业的代码
public class MapReduceKMeans {
public static class MapReduceKMeansMapper extends
Mapper<Object, Text, SongDataPoint, Text> {
public void map(Object key, Text value, Context context)
throws InterruptedException, IOException {
String str = value.toString();
// Reading Line one by one from the input CSV.
String split[] = str.split(",");
String trackId = split[0];
String title = split[1];
String artistName = split[2];
SongDataPoint songDataPoint =
new SongDataPoint(new Text(trackId), new Text(title),
new Text(artistName));
context.write(songDataPoint, new Text());
}
}
public static class MapReduceKMeansReducer extends
Reducer<SongDataPoint, Text, Text, NullWritable> {
public void reduce(SongDataPoint key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
sb.append(key.getTrackId()).append("\t").
append(key.getTitle()).append("\t")
.append(key.getArtistName()).append("\t");
String write = sb.toString();
context.write(new Text(write), NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err
.println("Usage:<CsV Out Path> <Final Out Path>");
System.exit(2);
}
Job job = new Job(conf, "Song Data Trial");
job.setJarByClass(MapReduceKMeans.class);
job.setMapperClass(MapReduceKMeansMapper.class);
job.setReducerClass(MapReduceKMeansReducer.class);
job.setOutputKeyClass(SongDataPoint.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
调试时,我的代码读取CSV文件中的所有行,但它根本不进入reduce作业
我还将SongDataPoint用作我的自定义可写文件
其代码如下
public class SongDataPoint implements WritableComparable<SongDataPoint> {
Text trackId;
Text title;
Text artistName;
public SongDataPoint() {
this.trackId = new Text();
this.title = new Text();
this.artistName = new Text();
}
public SongDataPoint(Text trackId, Text title, Text artistName) {
this.trackId = trackId;
this.title = title;
this.artistName = artistName;
}
@Override
public void readFields(DataInput in) throws IOException {
this.trackId.readFields(in);
this.title.readFields(in);
this.artistName.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
}
public Text getTrackId() {
return trackId;
}
public void setTrackId(Text trackId) {
this.trackId = trackId;
}
public Text getTitle() {
return title;
}
public void setTitle(Text title) {
this.title = title;
}
public Text getArtistName() {
return artistName;
}
public void setArtistName(Text artistName) {
this.artistName = artistName;
}
@Override
public int compareTo(SongDataPoint o) {
// TODO Auto-generated method stub
int compare = getTrackId().compareTo(o.getTrackId());
return compare;
}
}
感谢您的帮助。谢谢
# 1 楼答案
我在CustomWriteable类中的write方法被错误地保留为空。在编写了正确的代码之后,它解决了这个问题
# 2 楼答案
根据驱动程序,您的输出密钥类是SongDataPoint。类和输出值类作为文本。类,但实际上您正在将文本作为Reducer中的键来编写,并将Nullwritable作为Reducer中的值来编写
# 3 楼答案
您还应该指定映射器输出值,如下所示