scala为什么这个Spark代码会抛出java。伊奥。NotSerializableException
我想在RDD上的转换中访问伴生对象的方法。为什么以下方法不起作用:
import org.apache.spark.rdd.RDD
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
class Abc {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}
object Abc {
def fn(x: Int): Double = { x.toDouble }
}
implicit def abcEncoder: Encoder[Abc] = Encoders.kryo[Abc]
new Abc().transform(sc.parallelize(1 to 10)).collect
上面的代码抛出一个java.io.NotSerializableException
:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.map(RDD.scala:369)
at Abc.transform(<console>:19)
... 47 elided
Caused by: java.io.NotSerializableException: Abc
Serialization stack:
- object not serializable (class: Abc, value: Abc@4f598dfb)
- field (class: Abc$$anonfun$transform$1, name: $outer, type: class Abc)
- object (class Abc$$anonfun$transform$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 57 more
甚至为Abc类定义一个Encoder
在这里也没有帮助。但更重要的问题是,为什么要尝试对Abc类的对象进行序列化?我的第一个想法是,伴生对象是类的一个单例对象,所以可能有人试图序列化它。但似乎不是这样,因为当我给Abc打电话时。fn来自另一个班级:
class Xyz {
def transform(x: RDD[Int]): RDD[Double] = { x.map(Abc.fn) }
}
implicit def xyzEncoder: Encoder[Xyz] = Encoders.kryo[Xyz]
new Xyz().transform(sc.parallelize(1 to 10)).collect
我得到一个java.io.NotSerializableException: Xyz
# 1 楼答案
os spark的主要抽象是RDD,它跨集群的节点进行分区。因此,当我们运行RDD时,它会在驱动程序节点中序列化,并分发到其他适当的节点。然后工作节点将其反序列化并执行
在您的情况下,类ABC无法序列化并分发到其他工作节点。 您需要用Serializable序列化ABC类