有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java试图使用这个系统。作为RDD中的一项任务

我目前刚刚开始学习ApacheSpark,有一些代码我不太明白为什么不编译。它说我发送到myRDD forEach中的任务是不可序列化的,但是我正在观看的一个教程也做了类似的事情。任何想法或线索都将不胜感激

public class Main {
    public static void main(String[] args) {
        Logger.getLogger("org.apache").setLevel(Level.WARN);
        List<Integer> inputData = new ArrayList<>();

        inputData.add(25);


        SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> myRDD = sc.parallelize(inputData);
        Integer result = myRDD.reduce((x, y) -> x + y);

        myRDD.foreach( System.out::println );
        System.out.println(result);

        sc.close();

    }
}

堆栈跟踪:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable...
    at com.virtualpairprogrammers.Main.main(Main.java:26)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
    - object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@11a82d0f)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)...

共 (1) 个答案

  1. # 1 楼答案

    不要使用Lambda引用。它将尝试将PrintStream的函数println(..)传递给执行器。记住,你通过的或放入spark Close的所有方法(在map/filter/reduce等内部)都必须序列化。因为println(..)PrintStream的一部分,所以类PrintStream必须序列化

    传递一个匿名函数,如下所示-

    myRDD.foreach(integer -> System.out.println(integer));
    

    Full Example

    
    import org.apache.log4j.Level;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class Test63321956 {
        public static void main(String[] args) {
            Logger.getLogger("org.apache").setLevel(Level.WARN);
            List<Integer> inputData = new ArrayList<>();
    
            inputData.add(25);
    
    
            SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            JavaRDD<Integer> myRDD = sc.parallelize(inputData);
            Integer result = myRDD.reduce(Integer::sum);
    
            myRDD.collect().forEach( System.out::println );
            myRDD.foreach(integer -> System.out.println(integer));
            System.out.println(result);
            /**
             * 25
             * 25
             * 25
             */
    
            sc.close();
    
        }
    }