16个任务的序列化结果的总大小(1048.5 MB)大于spark.driver.maxResultSize(1024.0 MB)

2024-05-16 06:20:47 发布

您现在位置:Python中文网/ 问答频道 /正文

当我将--conf spark.driver.maxResultSize=2050添加到我的spark-submit命令时,出现以下错误。

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

因此,我将maxResultSize增加到2.5gb,但是Spark作业无论如何都失败了(上面显示的错误)。 如何解决这个问题?


Tags: runorgapacheutiljavaconcurrentatspark
2条回答

问题似乎是您试图拉回到驱动程序的数据量太大。很可能您正在使用collect方法从DataFrame/RDD中检索所有值。驱动程序是一个单独的进程,通过收集数据帧可以将分布在集群中的所有数据拉回到一个节点上。这会破坏分发它的目的!只有在将数据减少到可管理的数量之后,才有意义这样做。

你有两个选择:

  1. 如果你真的需要处理所有的数据,那么你应该把它放在执行者身上。使用HDFSParquet以分布式方式保存数据,并使用Spark方法处理集群上的数据,而不是试图将所有数据收集回一个位置。

  2. 如果您真的需要将数据返回到驱动程序,您应该检查是否真的需要所有数据。如果您只需要摘要统计信息,那么在调用collect之前,在执行器上计算出来。或者如果你只需要前100名的结果,那么只需要收集前100名。

更新:

还有一个原因,你可能会遇到这个错误,这是不太明显的。Spark将尝试在您显式调用collect之后将数据发送回驱动程序。如果您正在使用累加器,它还将发送每个任务的累加器结果、广播联接的数据以及关于每个任务的一些小的状态数据。如果你有很多分区(我的经验是20k+),你有时会看到这个错误。这是一个经过改进的known issue,还有更多的in the works

如果这是你的问题,可以选择通过:

  1. 增加spark.driver.maxResultSize或将其设置为0表示无限制
  2. 如果广播连接是罪魁祸首,则可以减少spark.sql.autoBroadcastJoinThreshold,以限制广播连接数据的大小
  3. 减少分区数

原因:由诸如RDD的collect()之类的操作引起,这些操作将大量数据发送到驱动程序

解决方案: 由SparkConf设置:conf.set("spark.driver.maxResultSize", "4g") 或者 由spark-defaults.conf设置:spark.driver.maxResultSize 4g 或者 调用spark submit时设置:--conf spark.driver.maxResultSize=4g

相关问题 更多 >