pyspark read mongo:python/lib中的错误/pyspark.zip/pyspark/rdd.py

2024-04-16 23:21:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用pyspark读取MongoDB下面的简单代码


from pyspark import SparkContext, SparkConf

import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    mongo_rdd = sc.mongoRDD("mongodb://localhost:27017/myDB.myCollection")
    a = mongo_rdd.count()
    print(a)

if __name__ == '__main__':
    main()

然后我在线路上出错了:

^{pr2}$

错误跟踪如下:

15/10/27 16:21:23 INFO DAGScheduler: Job 1 failed: count at /myPath/sample.py:17, took 0.602302 s
Traceback (most recent call last):
  File "/myPath/sample.py", line 28, in <module>
    main()
  File "/myPath/sample.py", line 17, in main
    a = mongo_rdd.count()
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in count
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in sum
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in fold
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/myLocalLib/apache-spark/spark-1.5.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError15/10/27 16:21:23 INFO connection: Closed connection [connectionId{localValue:10, serverValue:494}] to localhost:27017 because the pool has been closed.
15/10/27 16:21:23 INFO Executor: Executor killed task 2.0 in stage 1.0 (TID 3)
15/10/27 16:21:23 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost): TaskKilled (killed intentionally)
15/10/27 16:21:23 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost): java.lang.IllegalStateException: state should be: open
    at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
    at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:79)
    at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75)
    at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71)
    at com.mongodb.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:63)
    at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:166)
    at com.mongodb.operation.FindOperation.execute(FindOperation.java:394)
    at com.mongodb.operation.FindOperation.execute(FindOperation.java:57)
    at com.mongodb.Mongo.execute(Mongo.java:736)
    at com.mongodb.Mongo$2.execute(Mongo.java:723)
    at com.mongodb.DBCursor.initializeCursor(DBCursor.java:815)
    at com.mongodb.DBCursor.hasNext(DBCursor.java:149)
    at com.mongodb.hadoop.input.MongoRecordReader.nextKeyValue(MongoRecordReader.java:78)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:163)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

有人知道我可能错过了什么吗?谢谢!在


Tags: inpyorgcommongodbapachelinejava