有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Stanford NPL in Spark(Scala)应用程序运行到Java堆空间(Java.lang.OutOfMemoryError)

我的Spark(Scala)应用程序在使用Standford NLP进行情绪分析时遇到了问题。我正在Jupyter实验室用spylon内核运行它

环境设置:

%%init_spark
launcher.master = "local[*]"
launcher.conf.spark.app.name = "test_json"
launcher.packages = ["io.sensesecure:hadoop-xz:1.4", "edu.stanford.nlp:stanford-corenlp:4.2.0"]
launcher.conf.spark.driver.memory = "30g"
launcher.conf.spark.executor.memory = "20g"
launcher.conf.spark.memory.offHeap.enabled = "true"
launcher.conf.spark.memory.offHeap.size = "10g"   
launcher.conf.spark.driver.extraLibraryPath = ["/opt/hadoop/lib/native"]
launcher.conf.spark.executor.extraLibraryPath = ["/opt/hadoop/lib/native"]
launcher.jars = ["stanford-corenlp-4.2.0-models.jar"]

使用斯坦福NLP

//Adapted from https://github.com/vspiewak/twitter-sentiment-analysis

import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer


object SentimentAnalysisUtils extends java.io.Serializable {

  val nlpProps = {
    val props = new Properties()
    props.setProperty("annotators", "tokenize, ssplit, pos, lemma, parse, sentiment")
    props
  }

  def detectSentiment(message: String): BigDecimal = {

    val pipeline = new StanfordCoreNLP(nlpProps)

    val annotation = pipeline.process(message)
    var sentiments: ListBuffer[Double] = ListBuffer()
    var sizes: ListBuffer[Int] = ListBuffer()

    var longest = 0
    var mainSentiment = 0

    for (sentence <- annotation.get(classOf[CoreAnnotations.SentencesAnnotation])) {
      val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
      val sentiment = RNNCoreAnnotations.getPredictedClass(tree)
      val partText = sentence.toString

      if (partText.length() > longest) {
        mainSentiment = sentiment
        longest = partText.length()
      }

      sentiments += sentiment.toDouble
      sizes += partText.length

    }

    val averageSentiment:Double = {
      if(sentiments.size > 0) sentiments.sum / sentiments.size
      else -1
    }

    val weightedSentiments = (sentiments, sizes).zipped.map((sentiment, size) => sentiment * size)
    var weightedSentiment = weightedSentiments.sum / (sizes.fold(0)(_ + _))

    if(sentiments.size == 0) {
      mainSentiment = -1
      weightedSentiment = -1
    }
   weightedSentiment   
  
  }

}

确定给定字符串的情感效果:

SentimentAnalysisUtils.detectSentiment("This is a horrible piece of software")
// res8: BigDecimal = 1.0

对数据帧中的列执行相同操作失败

 import org.apache.spark.sql._
 import org.apache.spark.sql.types._

 val schema = new StructType()
      .add("title", StringType, true)
      .add("id", StringType, true)
      .add("selftext", StringType, true)
      .add("score", LongType, true)
      .add("created_utc", LongType, true)
      .add("subreddit", StringType, true)
      .add("author", StringType, true)


//file size: 2.2 MB, machine RAM: 64 GB
val df = spark.read.schema(schema).json("/home/user/repos/spark-abc/resources/input/sample_submissions.json")

val get_sentiment_score = udf(SentimentAnalysisUtils.detectSentiment _)

df.withColumn("sentiment", get_sentiment_score($"selftext")) .select("selftext","sentiment").take(100)

// Works with 1-10, but already fails for 100
// java.lang.OutOfMemoryError: Java heap space
// Exception in thread "RemoteBlock-temp-file-clean-thread" java.lang.OutOfMemoryError: Java heap space


任何想法都值得赞赏


共 (0) 个答案