Stanford NPL in Spark(Scala)应用程序运行到Java堆空间(Java.lang.OutOfMemoryError)
我的Spark(Scala)应用程序在使用Standford NLP进行情绪分析时遇到了问题。我正在Jupyter实验室用spylon内核运行它
环境设置:
%%init_spark
launcher.master = "local[*]"
launcher.conf.spark.app.name = "test_json"
launcher.packages = ["io.sensesecure:hadoop-xz:1.4", "edu.stanford.nlp:stanford-corenlp:4.2.0"]
launcher.conf.spark.driver.memory = "30g"
launcher.conf.spark.executor.memory = "20g"
launcher.conf.spark.memory.offHeap.enabled = "true"
launcher.conf.spark.memory.offHeap.size = "10g"
launcher.conf.spark.driver.extraLibraryPath = ["/opt/hadoop/lib/native"]
launcher.conf.spark.executor.extraLibraryPath = ["/opt/hadoop/lib/native"]
launcher.jars = ["stanford-corenlp-4.2.0-models.jar"]
使用斯坦福NLP
//Adapted from https://github.com/vspiewak/twitter-sentiment-analysis
import edu.stanford.nlp.ling.CoreAnnotations
import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations
import edu.stanford.nlp.pipeline.StanfordCoreNLP
import edu.stanford.nlp.sentiment.SentimentCoreAnnotations
import java.util.Properties
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
object SentimentAnalysisUtils extends java.io.Serializable {
val nlpProps = {
val props = new Properties()
props.setProperty("annotators", "tokenize, ssplit, pos, lemma, parse, sentiment")
props
}
def detectSentiment(message: String): BigDecimal = {
val pipeline = new StanfordCoreNLP(nlpProps)
val annotation = pipeline.process(message)
var sentiments: ListBuffer[Double] = ListBuffer()
var sizes: ListBuffer[Int] = ListBuffer()
var longest = 0
var mainSentiment = 0
for (sentence <- annotation.get(classOf[CoreAnnotations.SentencesAnnotation])) {
val tree = sentence.get(classOf[SentimentCoreAnnotations.SentimentAnnotatedTree])
val sentiment = RNNCoreAnnotations.getPredictedClass(tree)
val partText = sentence.toString
if (partText.length() > longest) {
mainSentiment = sentiment
longest = partText.length()
}
sentiments += sentiment.toDouble
sizes += partText.length
}
val averageSentiment:Double = {
if(sentiments.size > 0) sentiments.sum / sentiments.size
else -1
}
val weightedSentiments = (sentiments, sizes).zipped.map((sentiment, size) => sentiment * size)
var weightedSentiment = weightedSentiments.sum / (sizes.fold(0)(_ + _))
if(sentiments.size == 0) {
mainSentiment = -1
weightedSentiment = -1
}
weightedSentiment
}
}
确定给定字符串的情感效果:
SentimentAnalysisUtils.detectSentiment("This is a horrible piece of software")
// res8: BigDecimal = 1.0
对数据帧中的列执行相同操作失败
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val schema = new StructType()
.add("title", StringType, true)
.add("id", StringType, true)
.add("selftext", StringType, true)
.add("score", LongType, true)
.add("created_utc", LongType, true)
.add("subreddit", StringType, true)
.add("author", StringType, true)
//file size: 2.2 MB, machine RAM: 64 GB
val df = spark.read.schema(schema).json("/home/user/repos/spark-abc/resources/input/sample_submissions.json")
val get_sentiment_score = udf(SentimentAnalysisUtils.detectSentiment _)
df.withColumn("sentiment", get_sentiment_score($"selftext")) .select("selftext","sentiment").take(100)
// Works with 1-10, but already fails for 100
// java.lang.OutOfMemoryError: Java heap space
// Exception in thread "RemoteBlock-temp-file-clean-thread" java.lang.OutOfMemoryError: Java heap space
任何想法都值得赞赏
共 (0) 个答案