有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spark作业在takeSample上的堆内存不足

我有一个Apache spark集群,有一个主节点和三个工作节点。工作节点各有32个内核和124G内存。我还有一个HDFS中的数据集,有大约6.5亿条文本记录。此数据集是大量序列化的RDD,如下所示:

import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector}
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")

我想提取一百万条记录的样本来做一些分析,所以我想我应该试试val sample = vectors.takeSample(false, 10000, 0)。但是,这最终会失败,并显示以下错误消息:

 15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r

我知道我的堆空间快用完了(我想是在驱动程序上吧?),这是有道理的。执行hadoop fs -du -s /path/to/data操作时,数据集在磁盘上占用2575GB(但大小只有约850GB)

所以,我的问题是,我能做些什么来提取1000000条记录的样本(我稍后计划将其序列化到磁盘)?我知道我可以用更小的样本量来做takeSample(),然后再聚合它们,但我认为我没有设置正确的配置或做了一些错误的事情,这阻止了我以我想要的方式来做这件事


共 (1) 个答案

  1. # 1 楼答案

    在处理大数据时,在驱动节点收集中间结果很少是个好主意。相反,让数据分布在集群中几乎总是更好的。同样的道理也适用于你想要的样品

    如果您想对数据集中的1000000个元素进行采样,然后将其写入磁盘,那么为什么不在驱动程序处采集样本并将其写入磁盘呢?下面的代码片段应该正好做到这一点

    val sample = vectors.zipWithIndex().filter(_._1 < 1000000).map(_._2)
    
    sample.saveAsObjectFile("path to file")