火花未执行任务

2024-05-16 15:21:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我不能让火星车工作。我向系统变量SPARK_HOME添加了必要的路径。我从我的mongodb数据库提取数据,并简单地将获得的列表转换为dataframe。然后,我想通过show()(最后一行代码)来查看数据帧,它给出了以下错误。我的hadoop版本是2.7,pyspark和localspark都是2.4.1,python3.6。Java版本是8。在

import os
import sys
spark_path = r"C:\Tools\spark-2.4.0-bin-hadoop2.7" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

import pymongo
from pyspark import SparkContext
import pandas as pd
import pyspark
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer , IDF
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.sql.types import StringType

sc = SparkContext(appName = "app")
# print(sc.version)

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

myclient = pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["The_Rival_Insights"]
mycol = mydb["twitter"]

def getText(keyword):
    myquery = {'keyword': keyword}
    for x in mycol.find(myquery):     #x is a dictionary
        a=x["metadata"]
        return a

text=[]
metadata = getText("uber")    #list is returned
for b in range(len(metadata)):
    text.append(str(metadata[b]["text"]))
data = sqlContext.createDataFrame(text,StringType()).show()

出现以下错误:

^{pr2}$

另外,当我在末尾添加代码时(同时删除show()函数),我会得到另一个错误:

reviews = data.rdd.map(lambda x : x[0]).filter(lambda x: x is not None)
StopWords = stopwords.words("english")
tokens = reviews                                                   \
    .map( lambda document: document.strip().lower())               \
    .map( lambda document: re.split(" ", document))          \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 3] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \
    .zipWithIndex()

剪裁的错误消息:

    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    [Stage 0:>                                                          (0 + 4) / 4]2019-04-07 19:04:30 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed)
    org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "C:\Tools\spark-2.4.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 267, in main
    Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37
        at 
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
    2019-04-07 19:04:30 ERROR Executor:91 - Exception in task 2.0 in stage 0.0 (TID 2)
    java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
    2019-04-07 19:04:30 WARN  TaskSetManager:66 - Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

    2019-04-07 19:04:30 ERROR TaskSetManager:70 - Task 2 in stage 0.0 failed 1 times; aborting job
    Traceback (most recent call last):
      File "C:/Users/Mujtaba Faizi/Documents/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka-master/Analysis/sparkml_testing.py", line 52, in <module>
        .map( lambda word: [x for x in word if x not in StopWords])    \
      File "F:\Softwares\Anaconda\lib\site-packages\pyspark\rdd.py", line 2174, in zipWithIndex
        nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
      File "F:\Softwares\Anaconda\lib\site-packages\pyspark\rdd.py", line 816, in collect
        sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "F:\Softwares\Anaconda\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "F:\Softwares\Anaconda\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
        return f(*a, **kw)
      File "F:\Softwares\Anaconda\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost, executor driver): java.net.SocketException: Connection reset

Tags: inorgapireadapachejavaatcollection
2条回答

如果有人像我一样偶然发现了这一点,并且正在集群上工作,但是需要在目标节点上运行一些本地脚本。在


解决方案

最简单的万无一失的解决方案是在脚本的开头设置PYSPARK_PYTHON env,因为在我的例子中,即使在$SPARK_HOME/conf/spark-env.shspark-defaults.conf和{}中正确配置,PYSPARK shell也无法获取它(两者都不如第一个选项理想)。在

import os
os.environ['PYSPARK_PYTHON'] = '/path/to/python3' # Worker executable
os.environ['PYSPARK_DRIVER_PYTHON'] = '/path/to/python3' # Driver executable

可能原因

我不完全确定,但我猜在您的venv中从pip安装的pyspark与Spark本身实际加载的pyspark不同,它找不到正确的env变量,尽管到处都配置了python2.7可执行文件。在

首先,不要使用Pymongo。MongoDB有一个Spark连接器,可以通过Spark submit中的 packages选项提供。在

如果您使用的是远程MongoDB集群,则需要将MongoDB所在网络中的IP列入白名单。如果遇到错误消息连接重置的原因是这样的,我也曾遇到过。在

Spark-MongoDB connector Docs点击这个链接,你就可以直接在Spark DFs上工作了。在

更新

在浏览了堆栈跟踪之后(很抱歉没有彻底阅读),您的工作节点上似乎没有安装python3。您需要安装与驱动程序节点上安装的版本相匹配的正确版本的Python。然后在每个worker节点上,您需要在位于用户主目录的.bash_profile文件中添加以下行PYSPARK_PYTHON=/path/to/python3/executable。它应该能解决你的问题。在

相关问题 更多 >