有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Why Storm不是在工作集群上重播失败消息,而是在本地桌面上以集群模式重播失败消息

这里是我试图执行的代码。我故意在插销上失败。这样我就可以看到失败的消息被storm重播。但看起来这并没有发生

public static class FastRandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;
   private static final String[] CHOICES = {
       "marry had a little lamb whos fleese was white as snow",
       "and every where that marry went the lamb was sure to go"
   };

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
      _rand = ThreadLocalRandom.current();
    }

   @Override
   public void nextTuple() {
      String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
      _collector.emit(new Values(sentence), sentence);
   }

   @Override
   public void fail(Object id) {
      System.out.println("RAVI: the failedObjectId = "+id);
      _collector.emit(new Values(id), id);
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("sentence"));
   }
 }

下面是关于分句螺栓的详细信息。我故意失败的地方

 public static class SplitSentence extends BaseRichBolt 
 {
     OutputCollector _collector;
     @Override
     public void prepare(Map conf,
                     TopologyContext context,
                     OutputCollector collector)
    {
       _collector = collector;
    }

这是发生故障的函数

    @Override
    public void execute(Tuple tuple) 
    {
        String sentence = tuple.getString(0);
        System.out.println("sentence = "+sentence);
        if(sentence.equals("marry had a little lamb whos fleese was white as snow"))
        {
           System.out.println("going to fail");
           _collector.fail(tuple);
        }
        else
        { 
           for (String word: sentence.split("\\s+")) {
              _collector.emit(tuple, new Values(word, 1));
           }
           _collector.ack(tuple);
        }   
     }

     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word", "count"));
     }
  }

这是驱动代码的详细信息。 公共静态void main(字符串[]args)引发异常{

   TopologyBuilder builder = new TopologyBuilder();

   builder.setSpout("spout", new FastRandomSentenceSpout(), 4);

   builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");


   Config conf = new Config();
   conf.registerMetricsConsumer(
             org.apache.storm.metric.LoggingMetricsConsumer.class);


   String name = "wc-test";
   if (args != null && args.length > 0) {
       name = args[0];
   }

   conf.setNumWorkers(1);
   StormSubmitter.submitTopologyWithProgressBar(name, 
                                                conf,
                                                builder.createTopology());

  }

共 (1) 个答案

  1. # 1 楼答案

    原来这是因为风暴中提到的全球环境。亚马尔。具体设置为:

    topology.acker.executors: 0