火花笛卡尔produ

2024-04-25 21:37:04 发布

您现在位置:Python中文网/ 问答频道 /正文

为了得到坐标,我必须比较距离。因此,我用sc.TEXT文件()做笛卡尔积。文本文件中大约有2.000.000行,因此要比较2.000.000 x 2.000.000的坐标。在

我用大约2000个坐标测试了代码,它在几秒钟内运行良好。但使用大文件似乎在某一点上停止了,我不知道为什么。代码如下所示:

def concat(x,y):
    if(isinstance(y, list)&(isinstance(x,list))):
        return x + y
    if(isinstance(x,list)&isinstance(y,tuple)):
        return x + [y]
    if(isinstance(x,tuple)&isinstance(y,list)):
        return [x] + y
    else: return [x,y]

def haversian_dist(tuple):
    lat1 = float(tuple[0][0])
    lat2 = float(tuple[1][0])
    lon1 = float(tuple[0][2])
    lon2 = float(tuple[1][2])
    p = 0.017453292519943295
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
    print(tuple[0][1])
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))

def sort_val(tuple):
    dtype = [("globalid", int),("distance",float)]
    a = np.array(tuple[1], dtype=dtype)
    sorted_mins = np.sort(a, order="distance",kind="mergesort")
    return (tuple[0], sorted_mins)


def calc_matrix(sc, path, rangeval, savepath, name):
    data = sc.textFile(path)
    data = data.map(lambda x: x.split(";"))
    data = data.repartition(100).cache()
    data.collect()
    matrix = data.cartesian(data)
    values = matrix.map(haversian_dist)
    values = values.reduceByKey(concat)
    values = values.map(sort_val)
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
    dicti = values.collectAsMap()
    hp.save_pickle(dicti, savepath, name)

即使是包含大约15000个条目的文件也无法工作。我知道笛卡尔原因O(n^2)运行时。但spark不应该处理这个吗?还是有什么不对劲?唯一的出发点是一条错误消息,但我不知道它是否与实际问题有关:

^{pr2}$

Tags: 文件mapdatareturnifdefcosfloat
1条回答
网友
1楼 · 发布于 2024-04-25 21:37:04

您在代码中使用了data.collect(),它基本上将所有数据调用到一台机器中。根据机器内存的不同,2000000行数据可能不太适合。在

此外,我还尝试通过执行联接来减少要完成的计算数量,而不是使用cartesian。(请注意,我刚刚用numpy生成了随机数,这里的格式可能与您的格式不同。不过,主要想法还是一样的。)

import numpy as np
from numpy import arcsin, cos, sqrt

# suppose my data consists of latlong pairs
# we will use the indices for pairing up values
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex()
data = data.map(lambda (val, idx): (idx, val))

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2],
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2)
idxs = range(data.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])

# haversian func (i took the liberty of editing some parts of it)
def haversian_dist(latlong1, latlong2):
    lat1, lon1 = latlong1
    lat2, lon2 = latlong2
    p = 0.017453292519943295
    def hav(theta): return (1 - cos(p * theta))/2
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)
    return 12742 * arcsin(sqrt(a))

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val)))
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2))
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y))

相关问题 更多 >