如何在Sparkclus上使用Python分发数据

2024-04-25 01:54:07 发布

您现在位置:Python中文网/ 问答频道 /正文

晚上好,亲爱的stackoverflow社区
我有以下问题。
我和cloudera为spark建立了一个集群。
有一个Clustermanager和三个workernodes,它们不是本地的。
我希望集群对一些数据执行python程序。

from pyspark import SparkConf, SparkContext
import matplotlib.pyplot as plt
import numpy as np
from time import time as t

def mapper(point, data):
    counter = 0
    for elem in data:
        dominate = False
        for i in range(len(elem)):
            if point[i] < elem[i]:
                dominate = True
        if dominate:
            counter += 1
    return (point,counter)


if __name__=="__main__":

    xx = np.array([-0.51, 51.2])
    yy = np.array([0.33, 51.6])
    means = [xx.mean(), yy.mean()]  
    stds = [xx.std() / 3, yy.std() / 3]
    corr = 0.8    # correlation
    covs = [[stds[0]**2          , stds[0]*stds[1]*corr], 
           [stds[0]*stds[1]*corr,           stds[1]**2]] 

    m = np.random.multivariate_normal(means, covs, 1000).T
    data = list(zip(m[0],m[1]))

    conf = SparkConf().setAppName("Naive_Spark")
    sc = SparkContext(conf=conf)
    data_rdd = sc.parallelize(data).partitionBy(3).persist()

    start = t()

    mapped = data_rdd.map(lambda x: mapper(x, data)).filter(lambda x: x[1] == len(data)-1).collect()    
    print(mapped)
    time = str(t()-start)
    print(mapped)
    with open('/home/.../Schreibtisch/Naive.txt','a') as f:
        f.write('Spark: ' + str(mapped) + ' in ' + time + ' ms\n\n')
    sc.stop()
    plt.scatter(*zip(*data))
    plt.show()

目前,我在python程序中创建数据,并且在本地运行良好。
通常我会将我的代码和一些文本文件中的数据发送到我的集群,并使用spark submit执行。
我的问题是,如何分割数据,使三个节点分别处理其中的一部分。

问候
山姆


Tags: 数据inimportdatatimeasnpcounter