PySpark中的cogroup
这个教程建议这样做:
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]
但是,当我运行这个时,我得到了以下输出:
('a', (<pyspark.resultiterable.ResultIterable object at 0x1d8b190>, <pyspark.resultiterable.ResultIterable object at 0x1d8b150>))
('b', (<pyspark.resultiterable.ResultIterable object at 0x1d8b210>, <pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>))
这个有三层嵌套,如果我把输出存储在'r'里,然后这样做:
for i in r:
for j in i[1]:
print list(j)
我得到了正确的组合数字:
1) 为什么在PySpark中,Cogroup不返回像rightjoin/leftouterjoin那样的数字?
2) 为什么我在自己的PySpark环境中无法复制这个例子?
1 个回答
1
简单来说,这是因为cogroup就是这个意思。
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
在Spark中,连接操作实际上是通过cogroup来实现的,简单来说,连接操作就是把cogroup返回的可迭代对象拆分成元组。下面是Spark中连接操作的实现。
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
}
至于解释器输出的细微差别(记住,输出是一样的,只是pyspark的可迭代对象不显示内容),我不能确定,除非我看到教程。教程可能为了让输出更清晰,即使实际上并不是这样显示的。还有,我在scala的命令行中运行了类似的脚本,它显示了所有的输出。
Array((a,(ArrayBuffer(1),ArrayBuffer(2))), (b,(ArrayBuffer(4),ArrayBuffer())))