(py)sp中的简单映射分区作业

2024-04-18 01:34:56 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我的代码:

# map function
def echo(lines):
    if lines:
        for i, line in enumerate(lines):
            print i, "=>", line

# load the data
idsFile = "ids.txt"  # Should be some file on your system
linesData = sc.textFile(idsFile).cache()

# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)

# setup task
answers = filteredLinesData.mapPartitions(echo, 2)  # split file into 2 pieces

文件ids.txt

1,2,3,4,5,1
6,4,8,3,2
9,9,9,9,9,9
100000000,1
10,10,10
1,2,4,2

为了运行它,我运行:

$ IPYTHON=1 pyspark --master local[2]

然后我%cpaste输入代码(有更好的方法吗?)。

如果我只是尝试take()并查看这些值,就会得到一个合理的输出:

In[2]: filteredLinesData.take(6)
Out[2]:
[u'1,2,3,4,5,1',
 u'6,4,8,3,2',
 u'9,9,9,9,9,9',
 u'100000000,1',
 u'10,10,10',
 u'1,2,4,2']

但当我试图实际执行setupmapPartitions()作业时,它失败了:

In [3]: executed = answers.collect()
14/09/12 11:18:22 INFO SparkContext: Starting job: collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO DAGScheduler: Got job 2 (collect at <ipython-input-3-6461aec48699>:1) with 2 output partitions (allowLocal=false)
14/09/12 11:18:22 INFO DAGScheduler: Final stage: Stage 2(collect at <ipython-input-3-6461aec48699>:1)
14/09/12 11:18:22 INFO DAGScheduler: Parents of final stage: List()
14/09/12 11:18:22 INFO DAGScheduler: Missing parents: List()
14/09/12 11:18:22 INFO DAGScheduler: Submitting Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37), which has no missing parents
14/09/12 11:18:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37)
14/09/12 11:18:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:0 as 3112 bytes in 1 ms
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:1 as 3112 bytes in 0 ms
14/09/12 11:18:22 INFO Executor: Running task ID 0
14/09/12 11:18:22 INFO Executor: Running task ID 1
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_1 not found, computing it
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_0 not found, computing it
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:31+32
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:0+31
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(288) called with curMem=133256, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_1 stored as values to memory (estimated size 288.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(304) called with curMem=133544, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 304.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_0 in memory on 18.111.61.9:58306 (size: 304.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_1 in memory on 18.111.61.9:58306 (size: 288.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_0
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_1
0 => 1,2,3,4,5,1
1 => 6,4,8,3,2
2 => 9,9,9,9,9,9
0 => 100000000,1
1 => PySpark worker failed with exception:10,10,10

2 => 1,2,4,2
PySpark worker failed with exception:
Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

14/09/12 11:18:22 ERROR Executor: Exception in task ID 1
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR Executor: Exception in task ID 0
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 WARN TaskSetManager: Lost TID 0 (task 2.0:0)
14/09/12 11:18:22 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR TaskSetManager: Task 2.0:0 failed 1 times; aborting job
14/09/12 11:18:22 INFO TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable
 [duplicate 1]
14/09/12 11:18:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/09/12 11:18:22 INFO DAGScheduler: Failed to run collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO TaskSchedulerImpl: Cancelling stage 2
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-6461aec48699> in <module>()
----> 1 executed = answers.collect()

/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/rdd.pyc in collect(self)
    581         """
    582         with _JavaStackTrace(self.context) as st:
--> 583           bytesInJava = self._jrdd.collect().iterator()
    584         return list(self._collect_iterator_through_file(bytesInJava))
    585 

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    535         answer = self.gateway_client.send_command(command)
    536         return_value = get_return_value(answer, self.gateway_client,
--> 537                 self.target_id, self.name)
    538 
    539         for temp_arg in temp_args:

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o38.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
        org.apache.spark.scheduler.Task.run(Task.scala:51)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

很明显,它为输入做了正确的事情,但是出于某种原因,在结束时Spark试图为mapPartition()函数提供某种东西,这会导致TypeError: 'NoneType' object is not iterable错误。

你知道我做错了什么吗?我对火花完全不熟悉。


Tags: inpyorginfostreambinusrapache
2条回答

这里的问题是mapPartitions接受返回iterable对象的函数,例如list或generator。您的echo函数隐式返回None,这就是PySpark抱怨object NoneType is not iterable的原因。

正如Jonathan所建议的,您可以将这个函数(实际上是未修改的)与foreachPartition一起使用。我相信这将在以local模式运行PySpark时打印您想要的输出,但在集群上部署时可能不是您想要的:来自print语句的输出将打印在工作日志中,而不是显示在驱动程序中。

相反,我将把echo函数修改为yield (i, "=>", line);现在,函数的返回类型应该是生成器。

有几个问题:

首先,要在ipython中运行脚本,可以use execfile() or %run

第二,mapPartitions doesn't take a number of partitions parameter;也许它在某个时候做到了?可以使用parallelize显式设置分区数。

如果您这样运行它,您将得到您期望的输出,但是表单错误:

TypeError: 'NoneType' object is not iterable

这是因为mapPartition是一个转换;它需要一个函数来获取RDD的分区并返回RDD的新分区。您输出的是一些副作用,但不返回新的RDD分区。也就是说,你在寻找一个action,而不是一个转换。foreach作用于每个元素;foreachPartition按分区工作,但需要一个不返回任何值的生成器:

# map function
def echo(lines):
    if lines:
        for i, line in enumerate(lines):
            print i, "=>", line
    yield None

# load the data
idsFile = "ids.txt"  # Should be some file on your system
linesData = sc.textFile(idsFile).cache()

# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)

# setup task
answers = filteredLinesData.foreachPartition(echo)

相关问题 更多 >

    热门问题