Spark的reduceByKey接收到自己的输出作为后续调用的输入
我正在写一个PySpark应用程序,用来计算n维空间中点之间的成对距离。我有一个flatMap
步骤,它读取一个点,并计算这个点在成对相似度矩阵中应该放置的各种“块”(这比简单的O(n^2)计算稍微高效一点;如果你感兴趣,可以看看这篇论文的第5.2节,这给了我灵感)。
包含数据点的文本文件格式如下:
x1_1,x1_2,x1_3,...,x1_n
x2_1,x2_2,x2_3,...,x2_n
...
xm_1,xm_2,xm_3,...,xm_n
这是我的驱动程序:
rawdata = np.loadtxt(args['input'], dtype = np.str, delimiter = "\n")
indexed = np.vstack([np.arange(rawdata.shape[0]), rawdata]).T
D = sc.parallelize(indexed)
# Broadcast variables.
BLOCKING_FACTOR = sc.broadcast(sc.defaultParallelism)
SIZE = sc.broadcast(rawdata.shape[0])
retval = D.flatMap(parse_line).reduceByKey(pairwise_blocks).collect()
它将整个文本文件读入内存,然后为每一行建立索引,最后通过flatMap
处理这些索引数据。这里是parse_line
方法:
def parse_line(line):
index, data = line
index = int(index)
v = SIZE.value
h = BLOCKING_FACTOR.value
edgelength = int(numpy.ceil(v / h))
J = int(index / edgelength)
rows = [((((I + 1) * I) / 2) + J, [0, index, data]) for I in range(0, J)]
I = int(index / edgelength)
cols = [((((I + 1) * I) / 2) + J, [1, index, data]) for J in range(I, h)]
return rows + cols
这些(k, v)对的形式是,每个键都是一个整数——在最终的n乘n成对相似度矩阵中是一个独特的块——而值是一个包含两个整数和一个字符串的3元素列表。
问题来了:当reduceByKey
调用pairwise_blocks
方法时,早期迭代的输出会作为输入反馈到后续的调用中。具体来说:
def pairwise_blocks(x1, x2):
print x1 # for debugging; see below comments
x = np.array(map(float, x1[2].split(",")))
y = np.array(map(float, x2[2].split(",")))
return [-1, la.norm(x - y)]
前几次调用返回[-1, dist]
是正常的,但很快我就遇到了以下异常(前两行是上面方法的调试输出结果):
[1, 1, '-8.366703221982483285e+00,-3.082631504065840300e+00']
[-1, 6.4988099869742415]
PySpark worker failed with exception:
Traceback (most recent call last):
File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 283, in func
def func(s, iterator): return f(iterator)
File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 1134, in _mergeCombiners
combiners[k] = mergeCombiners(combiners[k], v)
File "/home/Programming/PySpark-Affinities/cartesian.py", line 60, in pairwise_blocks
x = np.array(map(float, x1[2].split(",")))
IndexError: list index out of range
最近打印的调试输出形式是[-1, dist]
。你知道为什么会发生这种情况吗?我是不是漏掉了某个配置步骤或者某些文档中说明了这种行为?我不是map-reduce的高手,但我用过它,之前从没考虑过这种行为,特别是我在使用reduceByKey
的时候。
编辑:作为补充,为了帮助调试,在reduceByKey
步骤中,有没有办法访问当前调用正在减少的键?
非常感谢!
1 个回答
0
我想说的是,这个问题让我意识到我在学习Spark方面还有很多需要掌握的地方;实际上,Reducer的表现是完全正确的。我还在努力摆脱一些Hadoop的经验,因为在Hadoop中,Reducer的工作方式实际上是将groupByKey
和reduceByKey
这两个步骤合并成一步来做。而我对此感到非常抱歉。