在Spark 1.6中,我应该设置哪些参数来处理100 GB Csv

2024-06-02 06:56:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我是Spark的新手,我的用例是在Spark中处理一个100GB的文件并将其加载到hive中。我有一个2节点128GB内存每个集群。通过处理,我的意思是在我现有的csv中添加一个额外的列,它的值是在运行时计算的。 但每次我运行spark submit时,它都无法抛出下面的错误:-你知道吗

Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
        at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
        at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我尝试使用的命令是下图:-你知道吗

spark-submit --master yarn-client \
             --executor-memory 8G --total-executor-cores 2 \
             --class "com.test.app.Test" \
             spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
             harshaltestdata   harshaltestdata  \
             --jars spark-csv_2.10-1.5.0.jar

注:

  • harshaltestdata是我在HDFS中的Csv名称
  • harshaltestdata是我的表名。你知道吗

我已经尝试了文件高达50MB的代码,它的工作很好,但当我尝试超过100MB它失败了。你知道吗

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit

object Test {
  def main(args: Array[String]) {
    //table csv name as in
    val csvName = args(0)
    val tableName = args(1)
    System.setProperty("SPARK_YARN_MODE", "true");
    val sparkConfiguration = new SparkConf();
    sparkConfiguration.setMaster("yarn-client");
    sparkConfiguration.setAppName("test-spark-job");
    sparkConfiguration
      .set("spark.executor.memory", "12g")
      .set("spark.kryoserializer.buffer.max", "512")

    val sparkContext = new SparkContext(sparkConfiguration);
    println("started spark job")
    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
    val hiveContext = new HiveContext(sparkContext)

    val data = hiveContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .load("hdfs_path***" + csvName + ".csv");
    //Printing in lines
    data.collect().foreach(println)
    //Printing in tabular form
    data.show()
    val newdf = data.withColumn("date", lit("10-04-19"))
    newdf.withColumn("date", lit("10-04-19"))
    newdf.write.mode("append").saveAsTable(tableName)

    val d = hiveContext.sql("select * from " + tableName)
    d.show()
    }
    }

预期的结果是文件应该得到处理并加载到配置单元中


Tags: orgcomreadapachevaljavaatspark
1条回答
网友
1楼 · 发布于 2024-06-02 06:56:37

永远不要使用collect()如果你真的不需要它,它会导致内存问题,特别是当你有大的CSV文件。你知道吗

第二行是多余的,你可以去掉它。你知道吗

val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19")) // It means nothing, you can remove it.
newdf.write.mode("append").saveAsTable(tableName)

相关问题 更多 >