有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

在hadoop中使用相同的输入文件为两个不同的任务创建两个输出文件

我是hadoop的新手,所以我已经开始执行任务,我有一个csv文件,我想从包含公司数据的文件中找到访问者的姓名和访问者的姓名。 这是我的代码,只用于查找访问者。我的输出必须是一个文件,其中包含前20名访问者和每个访问者的数量,然后是前20名访问者及其数量。 访客的名字、姓氏和中名在第0、1、2栏,访客的名字和中名在第20栏

package dataset;
import java.io.IOException;
import java.util.*;

 import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 public class Data {

  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text word = new Text();

     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

         String[] str=value.toString().split(",");
         String wo=str[1]+" "+str[2]+" "+str[0];
     // System.out.println(str[0]);
      word.set(wo);
            context.write(word, new IntWritable(1));
         }

  } 

  public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

      TreeMap<IntWritable,Text> T=new TreeMap<>();
     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
       throws IOException, InterruptedException {
         int sum = 0;
         for (IntWritable val : values) {
             sum += val.get();
         }
         T.put(new IntWritable(sum),new Text(key));
         if(T.size()>20)
         {
             System.out.println(T.firstKey());
             T.remove(T.firstKey());
         }
     }

     protected void cleanup(Context context) throws IOException, InterruptedException
     {

         for (IntWritable k : T.keySet()) {
             System.out.println(k);
                context.write(T.get(k),k);
     }
  }
  }


  public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();

         Job job = new Job(conf, "wordcount");

     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(IntWritable.class);

     job.setMapperClass(Map.class);
     job.setReducerClass(Reduce.class);

     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.addInputPath(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
  }

}

共 (0) 个答案