有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java从spark调用elasticSearch时出错

我有一个用例,在这个用例中,我需要读取来自卡夫卡的消息,并为每条消息提取数据并调用elasticsearch索引。响应将进一步用于进一步处理。 在调用JavaEsSpark时,我遇到了以下错误。esJsonRDD

java.lang.ClassCastException: org.elasticsearch.spark.rdd.EsPartition incompatible with org.apache.spark.rdd.ParallelCollectionPartition at org.apache.spark.rdd.ParallelCollectionRDD.compute(ParallelCollectionRDD.scala:102)

下面是我的代码片段

              public static void main(String[] args) {
                if (args.length < 4) {
                    System.err.println("Usage: JavaKafkaIntegration <zkQuorum> <group> <topics> <numThreads>");
                    System.exit(1);
                  }

                SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaIntegration").setMaster("local[2]").set("spark.driver.allowMultipleContexts", "true");
                //Setting when using JavaEsSpark.esJsonRDD
            sparkConf.set("es.nodes",<NODE URL>);
                sparkConf.set("es.nodes.wan.only","true");
                context = new JavaSparkContext(sparkConf);


                // Create the context with 2 seconds batch size
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

                int numThreads = Integer.parseInt(args[3]);
                Map<String, Integer> topicMap = new HashMap<>();
                String[] topics = args[2].split(",");
                for (String topic: topics) {
                  topicMap.put(topic, numThreads);
                }

                //Receive Message From kafka
                JavaPairReceiverInputDStream<String, String> messages =
                        KafkaUtils.createStream(jssc,args[0], args[1], topicMap);

                JavaDStream<String> jsons = messages
                        .map(new Function<Tuple2<String, String>, String>() {
                            /**
                             * 
                             */
                            private static final long serialVersionUID = 1L;

                            @Override
                            public String call(Tuple2<String, String> tuple2){

                                JavaRDD<String> esRDD =  JavaEsSpark.esJsonRDD(context, <index>,<search string>  ).values() ; 

                                 return null;

                            }


                        });             

                  jsons.print();
                  jssc.start();
                  jssc.awaitTermination();         


       }    

调用JavaEsSpark时出错。艾森德。这是正确的方法吗?如何从spark成功调用ES? 我在windows上运行kafka和spark,并调用外部弹性搜索索引


共 (0) 个答案