为什么zip在pyspark中截断数据?
我在使用zip的时候遇到了一些奇怪的情况;我基本上是想要一个包含键值对的RDD,其中值只是一个索引,比如我初始化了一个叫'f'的RDD:
f = sc.parallelize(tokenizer('a fox jumped over the rabbit')).flatMap(lambda x: ngrams(x))
f.count()
52
然后我做了:
ind = sc.parallelize(range(f.count()))
ind.count()
52
但是
f_ind = f.zip(ind)
f_ind.count()
48
我不明白为什么有些元素会丢失呢?
1 个回答
1
问题在于,SparkRDD
的 zip
操作要求两个 RDD 的元素数量必须相同,并且每个分区中的元素数量也要相同。这个最后的要求在我上面的例子中被违反了。似乎没有办法解决这个问题(但可以参考例如 http://www.adamcrume.com/blog/archive/2014/02/19/fixing-sparks-rdd-zip)。