有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

映射完成后,java Reduce不会启动

下面是我使用自定义可写程序实现简单MapReduce作业的代码

public class MapReduceKMeans {

public static class MapReduceKMeansMapper extends
        Mapper<Object, Text, SongDataPoint, Text> {
    public void map(Object key, Text value, Context context)
            throws InterruptedException, IOException {
        String str = value.toString();
        // Reading Line one by one from the input CSV.
        String split[] = str.split(",");

        String trackId = split[0];
        String title = split[1];
        String artistName = split[2];
        SongDataPoint songDataPoint = 
                new SongDataPoint(new Text(trackId), new Text(title), 
                        new Text(artistName));
        context.write(songDataPoint, new Text());
        }
    }


public static class MapReduceKMeansReducer extends
Reducer<SongDataPoint, Text, Text, NullWritable> {
    public void reduce(SongDataPoint key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        sb.append(key.getTrackId()).append("\t").
        append(key.getTitle()).append("\t")
        .append(key.getArtistName()).append("\t");

        String write = sb.toString();

        context.write(new Text(write), NullWritable.get());
    }

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();

    if (otherArgs.length != 2) {
        System.err
                .println("Usage:<CsV Out Path> <Final Out Path>");
        System.exit(2);
    }



    Job job = new Job(conf, "Song Data Trial");
    job.setJarByClass(MapReduceKMeans.class);
    job.setMapperClass(MapReduceKMeansMapper.class);
    job.setReducerClass(MapReduceKMeansReducer.class);
    job.setOutputKeyClass(SongDataPoint.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

调试时,我的代码读取CSV文件中的所有行,但它根本不进入reduce作业

我还将SongDataPoint用作我的自定义可写文件

其代码如下

public class SongDataPoint implements WritableComparable<SongDataPoint> {

Text trackId;
Text title;
Text artistName;

public SongDataPoint() {
    this.trackId = new Text();
    this.title = new Text();
    this.artistName = new Text();
}

public SongDataPoint(Text trackId, Text title, Text artistName) {
    this.trackId = trackId;
    this.title = title;
    this.artistName = artistName;
}

@Override
public void readFields(DataInput in) throws IOException {
    this.trackId.readFields(in);
    this.title.readFields(in);
    this.artistName.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {

}

public Text getTrackId() {
    return trackId;
}

public void setTrackId(Text trackId) {
    this.trackId = trackId;
}

public Text getTitle() {
    return title;
}

public void setTitle(Text title) {
    this.title = title;
}

public Text getArtistName() {
    return artistName;
}

public void setArtistName(Text artistName) {
    this.artistName = artistName;
}


@Override
public int compareTo(SongDataPoint o) {
    // TODO Auto-generated method stub
    int compare = getTrackId().compareTo(o.getTrackId());
    return compare;
}

}

感谢您的帮助。谢谢


共 (3) 个答案

  1. # 1 楼答案

    我在CustomWriteable类中的write方法被错误地保留为空。在编写了正确的代码之后,它解决了这个问题

    public void write(DataOutput out) throws IOException {
    
    } 
    
  2. # 2 楼答案

    根据驱动程序,您的输出密钥类是SongDataPoint。类和输出值类作为文本。类,但实际上您正在将文本作为Reducer中的键来编写,并将Nullwritable作为Reducer中的值来编写

  3. # 3 楼答案

    您还应该指定映射器输出值,如下所示

    job.setMapOutputKeyClass(SongDataPoint.class);
    job.setMapOutputValueClass(Text.class);