有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用HFileOutputFormat2时发生java ClassCastException

我试图使用HFileOutputFormat2作为OutputFormat2将数据从hdfs中的文件上载到hbase表,但遇到以下异常:

java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)

Caused by: java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:148)
at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:635)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at com.xogito.ingestion.mr.hbase.CSEventsHBaseMapper.map(CSEventsHBaseMapper.java:90)
at com.xogito.ingestion.mr.hbase.CSEventsHBaseMapper.map(CSEventsHBaseMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

以下是作业的代码

    @Override
    public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    job = Job.getInstance(conf, "MY_JOB");
    job.setJarByClass(getClass());
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setSpeculativeExecution(false);  
    job.setReduceSpeculativeExecution(false);  
    job.setMapperClass(CustomMapper.class);//Custom Mapper
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);


    String parentInputPath = args[0];
    String parentOutputPath = args[1];

    FileInputFormat.addInputPaths(job, inputPath);
    HFileOutputFormat.setOutputPath(job,new Path(parentOutputPath)); 
    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", "x.x.x.x");
    hConf.set("hbase.zookeeper.property.clientPort", "2181");
    HTable hTable = new HTable(hConf, "mytable");
   // hTable.setAutoFlush(false, true);
  //  hTable.setWriteBufferSize(1024 * 1024 * 12);
    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    job.setNumReduceTasks(0);
    job.submit();
  }

映射器的代码如下所示

  @Override
public void map(WritableComparable key, Writable  val, Context context) throws IOException, InterruptedException{
    String data = val.toString();
    String[] splitted = data.split("\t");
    String account = splitted[1];
    Matcher match = ACCOUNT.matcher(account);
    int clientid = 0;
    if (match.find()) {
        clientid = Integer.valueOf(Integer.parseInt(match
                .group(1)));
    }
    String userid = splitted[2];
    Long timestamp = 0L;
    try {
        timestamp = Long.valueOf(splitted[10]);
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    }

    String rowKeyText = "somtext";
    ImmutableBytesWritable rowKey = new    
      ImmutableBytesWritable(Bytes.toBytes(rowKeyText));

    Put put = new Put(Bytes.toBytes(rowKeyText));
    put.add(cf,column, value); 
    context.write(rowKey, put); 
  }

共 (1) 个答案

  1. # 1 楼答案

    HFileOutputFormat或新版本HFileOutputFormat2需要KeyValue作为最终类。可能是PutSortReducer没有正确应用,从而将Put转换为KeyValue实例

    在我的例子中,我使用的不是MapReduce,而是Spark,所以我只是直接创建KeyValue而不是Put