有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Linux上的java MapReduce Hadoop多数据输入

我在VirtualBox上使用Ubuntu20.10和Hadoop版本3.2.1(如果您需要更多信息,请给我留言)
此时我的输出为我提供了以下信息:

Aaron Wells Peirsol ,M,17,United States,Swimming,2000 Summer,0,1,0
Aaron Wells Peirsol ,M,21,United States,Swimming,2004 Summer,1,0,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,0,1,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,1,0,0

对于上述输出,我希望能够将他的所有奖牌相加
(字符串末尾的三个数字代表金、银、铜
参赛者多年来在奥运会上获得的奖牌)

该项目没有规定年龄(17,21,25,25)
或者当它发生时(2000200420082008年夏天),但我必须添加奖牌
以便能够根据获得最多金牌的参与者等对其进行排序

有什么想法吗?如果您需要,我可以为您提供我的代码,但我需要另一个MapReduce,我想它将使用我上面导入的给定输入,并为我们提供如下内容:

Aaron Wells Peirsol,M,25,United States,Swimming,2008 Summer,2,2,0

如果我们有一种从reduce输出中删除“\t”的方法,它也将非常有益

谢谢大家抽出时间,Gyftonikolos Nikolaos


共 (1) 个答案

  1. # 1 楼答案

    虽然一开始看起来有点棘手,但这是WordCount示例的另一种情况,这一次只需要组合键和值,以便以key-value对的形式将数据从映射器添加到缩减器中

    对于映射器,我们需要从输入文件的每一行提取所有信息,并将列中的数据分为两个“类别”:

    • 对于key的每个运动员,主信息始终相同
    • 从一行到另一行变化的统计信息,需要根据这些信息进行编辑

    对于每一位运动员的台词,我们知道永远不会改变的栏目是运动员的姓名、性别、国家和运动。通过使用,字符作为每种数据类型之间的分隔符,所有这些都将被视为key。列数据的其余部分将放在key-value对的值端,但我们也需要在它们上使用分隔符,以便首先区分各个年龄段和奥运会年份的奖牌计数器。我们将使用:

    • @字符作为年龄和年份之间的分隔符
    • 作为奖牌计数器之间分隔符的#字符
    • 以及_字符作为这两者之间的分隔符

    Reduce功能中,我们所要做的就是计算奖牌总数,并找出每个运动员的最新年龄和年份

    为了而不是在MapReduce作业输出的键和值之间有一个制表符,我们可以简单地将NULL设置为reducer生成的key-value对的键,并使用,字符作为分隔符,将所有计算的数据放在每对的值处

    此作业的代码如下所示:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.*;
    import java.io.IOException;
    import java.util.*;
    import java.nio.charset.StandardCharsets;
    
    
    public class Medals 
    {
        /* input:  <byte_offset, line_of_dataset>
         * output: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
         */
        public static class Map extends Mapper<Object, Text, Text, Text> 
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                String record = value.toString();
                String[] columns = record.split(",");
    
                // extract athlete's main info
                String name = columns[0];
                String sex = columns[1];
                String country = columns[3];
                String sport = columns[4];
    
                // extract athlete's stat info
                String age = columns[2];
                String year = columns[5]; 
                String gold = columns[6];
                String silver = columns[7];
                String bronze = columns[8];
    
                // set the main info as key and the stat info as value
                context.write(new Text(name + "," + sex + "," + country + "," + sport), new Text(age + "@" + year + "_" +  gold + "#" + silver + "#" + bronze));
            }
        }
    
        /* input:  <(name,sex,country,sport), (age@year_gold#silver#bronze)>
         * output: <(NULL, (name,sex,age,country,sport,year,golds,silvers,bronzes)>
         */
        public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
        {
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
            {
                // extract athlete's main info
                String[] athlete_info = key.toString().split(",");
                String name = athlete_info[0];
                String sex = athlete_info[1];
                String country = athlete_info[2];
                String sport = athlete_info[3];
    
                int latest_age = 0;
                String latest_games = "";
                
                int gold_cnt = 0;
                int silver_cnt = 0;
                int bronze_cnt = 0;
    
                // for a single athlete, compute their stats...
                for(Text value : values)
                {
                    String[] split_value = value.toString().split("_");
                    String[] age_and_year = split_value[0].split("@");
                    String[] medals = split_value[1].split("#");
    
                    // find the last age and games the athlete has stats in the input file
                    if(Integer.parseInt(age_and_year[0]) > latest_age)
                    {
                        latest_age = Integer.parseInt(age_and_year[0]);
                        latest_games = age_and_year[1];
                    }
                    
                    if(Integer.parseInt(medals[0]) == 1)
                        gold_cnt++;
    
                    if(Integer.parseInt(medals[1]) == 1)
                        silver_cnt++;
    
                    if(Integer.parseInt(medals[2]) == 1)
                        bronze_cnt++;
                }
    
                context.write(NullWritable.get(), new Text(name + "," + sex + "," + String.valueOf(latest_age) + "," + country + "," + sport + "," + latest_games + "," + String.valueOf(gold_cnt) + "," + String.valueOf(silver_cnt) + "," + String.valueOf(bronze_cnt)));
            }
        }
    
    
        public static void main(String[] args) throws Exception
        {
            // set the paths of the input and output directories in the HDFS
            Path input_dir = new Path("olympic_stats");
            Path output_dir = new Path("medals");
    
            // in case the output directory already exists, delete it
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(output_dir))
                fs.delete(output_dir, true);
    
            // configure the MapReduce job
            Job medals_job = Job.getInstance(conf, "Medals Counter");
            medals_job.setJarByClass(Medals.class);
            medals_job.setMapperClass(Map.class);
            medals_job.setReducerClass(Reduce.class);    
            medals_job.setMapOutputKeyClass(Text.class);
            medals_job.setMapOutputValueClass(Text.class);
            medals_job.setOutputKeyClass(NullWritable.class);
            medals_job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(medals_job, input_dir);
            FileOutputFormat.setOutputPath(medals_job, output_dir);
            medals_job.waitForCompletion(true);
        }
    }
    

    当然,结果就是你想要的结果,如下所示: enter image description here