为了得到坐标,我必须比较距离。因此,我用sc.TEXT文件()做笛卡尔积。文本文件中大约有2.000.000行,因此要比较2.000.000 x 2.000.000的坐标。在
我用大约2000个坐标测试了代码,它在几秒钟内运行良好。但使用大文件似乎在某一点上停止了,我不知道为什么。代码如下所示:
def concat(x,y):
if(isinstance(y, list)&(isinstance(x,list))):
return x + y
if(isinstance(x,list)&isinstance(y,tuple)):
return x + [y]
if(isinstance(x,tuple)&isinstance(y,list)):
return [x] + y
else: return [x,y]
def haversian_dist(tuple):
lat1 = float(tuple[0][0])
lat2 = float(tuple[1][0])
lon1 = float(tuple[0][2])
lon2 = float(tuple[1][2])
p = 0.017453292519943295
a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
print(tuple[0][1])
return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))
def sort_val(tuple):
dtype = [("globalid", int),("distance",float)]
a = np.array(tuple[1], dtype=dtype)
sorted_mins = np.sort(a, order="distance",kind="mergesort")
return (tuple[0], sorted_mins)
def calc_matrix(sc, path, rangeval, savepath, name):
data = sc.textFile(path)
data = data.map(lambda x: x.split(";"))
data = data.repartition(100).cache()
data.collect()
matrix = data.cartesian(data)
values = matrix.map(haversian_dist)
values = values.reduceByKey(concat)
values = values.map(sort_val)
values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
dicti = values.collectAsMap()
hp.save_pickle(dicti, savepath, name)
即使是包含大约15000个条目的文件也无法工作。我知道笛卡尔原因O(n^2)运行时。但spark不应该处理这个吗?还是有什么不对劲?唯一的出发点是一条错误消息,但我不知道它是否与实际问题有关:
^{pr2}$
您在代码中使用了
data.collect()
,它基本上将所有数据调用到一台机器中。根据机器内存的不同,2000000行数据可能不太适合。在此外,我还尝试通过执行联接来减少要完成的计算数量,而不是使用
cartesian
。(请注意,我刚刚用numpy生成了随机数,这里的格式可能与您的格式不同。不过,主要想法还是一样的。)相关问题 更多 >
编程相关推荐