Spark结构化流媒体pythonorg.apache.kafka.公共.TopicPartition;类对于反序列化无效

2024-04-19 12:31:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试执行下面的spark流示例代码。 https://github.com/apache/spark/blob/master/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py

我在一个带有Spark版本2.0.2的AWS EMR集群上运行它。 下面的依赖关系被添加到spark提交。在

  • spark-sql-kafka-0-10_2.11-2.0.2.jar
  • spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar
  • kafka-clients-0.10.2.0.jar

以下是错误日志:

Batch: 0
-------------------------------------------
17/06/06 19:44:01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, x.x.x.x): java.io.InvalidClassException: org.apache.kafka.common.TopicPartition; class invalid for deserialization
    at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
    at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1987)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/06/06 19:44:01 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
17/06/06 19:44:01 ERROR StreamExecution: Query query-0 terminated with error
17/06/06 19:44:01 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
17/06/06 19:44:01 ERROR StreamExecution: Query query-0 terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, x.x.x.x): java.io.InvalidClassException: 
org.apache.kafka.common.TopicPartition; class invalid for deserialization
    at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
    at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1987)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
    at org.apache.spark.sql.dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2197)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2173)
    at org.apache.spark.sql.execution.streaming.ConsoleSink.addBatch(console.scala:49)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:437)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:225)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142)
Caused by: java.io.InvalidClassException: 
org.apache.kafka.common.TopicPartition; class invalid for deserialization
    at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
    at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1987)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2111)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有没有人遇到过类似的问题并有解决办法?在

当我删除spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar时,出现以下错误

^{pr2}$

Tags: ioorgsqlapachejavadatasetatspark
2条回答

像这样添加jar解决了这个问题

 jars spark-sql-kafka-0-10_2.11-2.0.2.jar,kafka-clients-0.10.2.0.jar,spark-streaming-kafka-0-10_2.11-2.0.2.jar,spark-streaming-kafka-0-10-assembly_2.11-2.0.2.jar 

以前我试过

^{pr2}$

Spark将序列化driver中的任务,并在executor中反序列化它。在

因此,java.io.InvalidClassException: org.apache.kafka.common.TopicPartition异常意味着执行器类路径中的org.apache.kafka.common.TopicPartition类与spark驱动程序的类路径中的类不同。在

这通常是由于executor类路径中的多个kafka-clients-*.jarjar不明确造成的。在

仔细检查您的类路径,例如,可以通过以下步骤实现:

  1. 使用spark ui查找executor节点并对其进行ssh
  2. 执行jps并找到执行器的pid(进程名为CoarseGrainedExecutor
  3. 执行jinfo pid | grep java.class.path以获取executor类路径

相关问题 更多 >