<p>这里的问题是你的reduce函数。对于每个键,<code>reduceByKey</code>使用成对的值调用reduce函数,并期望它生成相同类型的组合值。</p>
<p>例如,假设我想执行字数运算。首先,我可以将每个单词映射到一个<code>(word, 1)</code>对,然后我可以<code>reduceByKey(lambda x, y: x + y)</code>来总结每个单词的计数。最后,我剩下的是<code>(word, count)</code>对的RDD。</p>
<p>下面是来自<a href="https://spark.incubator.apache.org/docs/0.8.1/api/pyspark/pyspark.rdd.RDD-class.html#reduceByKey">PySpark API Documentation</a>的示例:</p>
<pre><code>>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
</code></pre>
<p>为了理解示例为什么不起作用,您可以想象reduce函数被应用如下:</p>
<pre><code>reduce(reduce(reduce(firstValue, secondValue), thirdValue), fourthValue) ...
</code></pre>
<p>基于reduce函数,听起来您可能正在尝试实现内置的<a href="https://spark.incubator.apache.org/docs/0.8.1/api/pyspark/pyspark.rdd.RDD-class.html#groupByKey">^{<cd5>}</a>操作,该操作使用其值列表对每个键进行分组。</p>
<p>另外,请看<a href="https://spark.incubator.apache.org/docs/0.8.1/api/pyspark/pyspark.rdd.RDD-class.html#combineByKey">^{<cd6>}</a>,这是<code>reduceByKey()</code>的一个泛化,它允许reduce函数的输入和输出类型不同(<code>reduceByKey</code>是<a href="https://spark.incubator.apache.org/docs/0.8.1/api/pyspark/pyspark.rdd-pysrc.html#RDD.reduceByKey">implemented</a>,用<code>combineByKey</code>表示)</p>