我不能让火星车工作。我向系统变量SPARK_HOME
添加了必要的路径。我从我的mongodb数据库提取数据,并简单地将获得的列表转换为dataframe。然后,我想通过show()
(最后一行代码)来查看数据帧,它给出了以下错误。我的hadoop版本是2.7,pyspark和localspark都是2.4.1,python3.6。Java版本是8。在
import os
import sys
spark_path = r"C:\Tools\spark-2.4.0-bin-hadoop2.7" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")
import pymongo
from pyspark import SparkContext
import pandas as pd
import pyspark
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.sql.types import StringType
sc = SparkContext(appName = "app")
# print(sc.version)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
myclient = pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["The_Rival_Insights"]
mycol = mydb["twitter"]
def getText(keyword):
myquery = {'keyword': keyword}
for x in mycol.find(myquery): #x is a dictionary
a=x["metadata"]
return a
text=[]
metadata = getText("uber") #list is returned
for b in range(len(metadata)):
text.append(str(metadata[b]["text"]))
data = sqlContext.createDataFrame(text,StringType()).show()
出现以下错误:
^{pr2}$另外,当我在末尾添加代码时(同时删除show()
函数),我会得到另一个错误:
reviews = data.rdd.map(lambda x : x[0]).filter(lambda x: x is not None)
StopWords = stopwords.words("english")
tokens = reviews \
.map( lambda document: document.strip().lower()) \
.map( lambda document: re.split(" ", document)) \
.map( lambda word: [x for x in word if x.isalpha()]) \
.map( lambda word: [x for x in word if len(x) > 3] ) \
.map( lambda word: [x for x in word if x not in StopWords]) \
.zipWithIndex()
剪裁的错误消息:
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 4) / 4]2019-04-07 19:04:30 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\Tools\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 267, in main
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
2019-04-07 19:04:30 ERROR Executor:91 - Exception in task 2.0 in stage 0.0 (TID 2)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
2019-04-07 19:04:30 WARN TaskSetManager:66 - Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
2019-04-07 19:04:30 ERROR TaskSetManager:70 - Task 2 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:/Users/Mujtaba Faizi/Documents/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka-master/Analysis/sparkml_testing.py", line 52, in <module>
.map( lambda word: [x for x in word if x not in StopWords]) \
File "F:\Softwares\Anaconda\lib\site-packages\pyspark\rdd.py", line 2174, in zipWithIndex
nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
File "F:\Softwares\Anaconda\lib\site-packages\pyspark\rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "F:\Softwares\Anaconda\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "F:\Softwares\Anaconda\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "F:\Softwares\Anaconda\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.net.SocketException: Connection reset
如果有人像我一样偶然发现了这一点,并且正在集群上工作,但是需要在目标节点上运行一些本地脚本。在
解决方案
最简单的万无一失的解决方案是在脚本的开头设置PYSPARK_PYTHON env,因为在我的例子中,即使在}中正确配置,PYSPARK shell也无法获取它(两者都不如第一个选项理想)。在
$SPARK_HOME/conf/spark-env.sh
或spark-defaults.conf
和{可能原因
我不完全确定,但我猜在您的venv中从pip安装的pyspark与Spark本身实际加载的pyspark不同,它找不到正确的env变量,尽管到处都配置了python2.7可执行文件。在
首先,不要使用Pymongo。MongoDB有一个Spark连接器,可以通过Spark submit中的
packages
选项提供。在如果您使用的是远程MongoDB集群,则需要将MongoDB所在网络中的IP列入白名单。如果遇到错误消息连接重置的原因是这样的,我也曾遇到过。在
Spark-MongoDB connector Docs点击这个链接,你就可以直接在Spark DFs上工作了。在
更新
在浏览了堆栈跟踪之后(很抱歉没有彻底阅读),您的工作节点上似乎没有安装python3。您需要安装与驱动程序节点上安装的版本相匹配的正确版本的Python。然后在每个worker节点上,您需要在位于用户主目录的
.bash_profile
文件中添加以下行PYSPARK_PYTHON=/path/to/python3/executable
。它应该能解决你的问题。在相关问题 更多 >
编程相关推荐