我是Spark的新手,我的用例是在Spark中处理一个100GB的文件并将其加载到hive中。我有一个2节点128GB内存每个集群。通过处理,我的意思是在我现有的csv中添加一个额外的列,它的值是在运行时计算的。 但每次我运行spark submit时,它都无法抛出下面的错误:-你知道吗
Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205) at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363) at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
我尝试使用的命令是下图:-你知道吗
spark-submit --master yarn-client \ --executor-memory 8G --total-executor-cores 2 \ --class "com.test.app.Test" \ spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ harshaltestdata harshaltestdata \ --jars spark-csv_2.10-1.5.0.jar
注:
harshaltestdata
是我在HDFS中的Csv名称harshaltestdata
是我的表名。你知道吗我已经尝试了文件高达50MB的代码,它的工作很好,但当我尝试超过100MB它失败了。你知道吗
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit
object Test {
def main(args: Array[String]) {
//table csv name as in
val csvName = args(0)
val tableName = args(1)
System.setProperty("SPARK_YARN_MODE", "true");
val sparkConfiguration = new SparkConf();
sparkConfiguration.setMaster("yarn-client");
sparkConfiguration.setAppName("test-spark-job");
sparkConfiguration
.set("spark.executor.memory", "12g")
.set("spark.kryoserializer.buffer.max", "512")
val sparkContext = new SparkContext(sparkConfiguration);
println("started spark job")
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val hiveContext = new HiveContext(sparkContext)
val data = hiveContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("hdfs_path***" + csvName + ".csv");
//Printing in lines
data.collect().foreach(println)
//Printing in tabular form
data.show()
val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19"))
newdf.write.mode("append").saveAsTable(tableName)
val d = hiveContext.sql("select * from " + tableName)
d.show()
}
}
预期的结果是文件应该得到处理并加载到配置单元中
永远不要使用
collect()
如果你真的不需要它,它会导致内存问题,特别是当你有大的CSV文件。你知道吗第二行是多余的,你可以去掉它。你知道吗
相关问题 更多 >
编程相关推荐