Spark的reduceByKey接收到自己的输出作为后续调用的输入

0 投票
1 回答
1084 浏览
提问于 2025-04-18 14:16

我正在写一个PySpark应用程序,用来计算n维空间中点之间的成对距离。我有一个flatMap步骤,它读取一个点,并计算这个点在成对相似度矩阵中应该放置的各种“块”(这比简单的O(n^2)计算稍微高效一点;如果你感兴趣,可以看看这篇论文的第5.2节,这给了我灵感)。

包含数据点的文本文件格式如下:

x1_1,x1_2,x1_3,...,x1_n
x2_1,x2_2,x2_3,...,x2_n
...
xm_1,xm_2,xm_3,...,xm_n

这是我的驱动程序:

rawdata = np.loadtxt(args['input'], dtype = np.str, delimiter = "\n")
indexed = np.vstack([np.arange(rawdata.shape[0]), rawdata]).T
D = sc.parallelize(indexed)

# Broadcast variables.
BLOCKING_FACTOR = sc.broadcast(sc.defaultParallelism)
SIZE = sc.broadcast(rawdata.shape[0])

retval = D.flatMap(parse_line).reduceByKey(pairwise_blocks).collect()

它将整个文本文件读入内存,然后为每一行建立索引,最后通过flatMap处理这些索引数据。这里是parse_line方法:

def parse_line(line):
    index, data = line
    index = int(index)

    v = SIZE.value
    h = BLOCKING_FACTOR.value

    edgelength = int(numpy.ceil(v / h))

    J = int(index / edgelength)
    rows = [((((I + 1) * I) / 2) + J, [0, index, data]) for I in range(0, J)]

    I = int(index / edgelength)
    cols = [((((I + 1) * I) / 2) + J, [1, index, data]) for J in range(I, h)]

    return rows + cols

这些(k, v)对的形式是,每个键都是一个整数——在最终的n乘n成对相似度矩阵中是一个独特的块——而值是一个包含两个整数和一个字符串的3元素列表。

问题来了:当reduceByKey调用pairwise_blocks方法时,早期迭代的输出会作为输入反馈到后续的调用中。具体来说:

def pairwise_blocks(x1, x2):
    print x1 # for debugging; see below comments
    x = np.array(map(float, x1[2].split(",")))
    y = np.array(map(float, x2[2].split(",")))
    return [-1, la.norm(x - y)]

前几次调用返回[-1, dist]是正常的,但很快我就遇到了以下异常(前两行是上面方法的调试输出结果):

[1, 1, '-8.366703221982483285e+00,-3.082631504065840300e+00']
[-1, 6.4988099869742415]
PySpark worker failed with exception:
Traceback (most recent call last):
    File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/worker.py", line 77, in main
        serializer.dump_stream(func(split_index, iterator), outfile)
    File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 283, in func
        def func(s, iterator): return f(iterator)
    File "/home/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py", line 1134, in _mergeCombiners
        combiners[k] = mergeCombiners(combiners[k], v)
    File "/home/Programming/PySpark-Affinities/cartesian.py", line 60, in pairwise_blocks
        x = np.array(map(float, x1[2].split(",")))
    IndexError: list index out of range

最近打印的调试输出形式是[-1, dist]你知道为什么会发生这种情况吗?我是不是漏掉了某个配置步骤或者某些文档中说明了这种行为?我不是map-reduce的高手,但我用过它,之前从没考虑过这种行为,特别是我在使用reduceByKey的时候。

编辑:作为补充,为了帮助调试,在reduceByKey步骤中,有没有办法访问当前调用正在减少的键?

非常感谢!

1 个回答

0

我想说的是,这个问题让我意识到我在学习Spark方面还有很多需要掌握的地方;实际上,Reducer的表现是完全正确的。我还在努力摆脱一些Hadoop的经验,因为在Hadoop中,Reducer的工作方式实际上是将groupByKeyreduceByKey这两个步骤合并成一步来做。而我对此感到非常抱歉。

撰写回答