晚上好,亲爱的stackoverflow社区
我有以下问题。
我和cloudera为spark建立了一个集群。
有一个Clustermanager和三个workernodes,它们不是本地的。
我希望集群对一些数据执行python程序。
from pyspark import SparkConf, SparkContext
import matplotlib.pyplot as plt
import numpy as np
from time import time as t
def mapper(point, data):
counter = 0
for elem in data:
dominate = False
for i in range(len(elem)):
if point[i] < elem[i]:
dominate = True
if dominate:
counter += 1
return (point,counter)
if __name__=="__main__":
xx = np.array([-0.51, 51.2])
yy = np.array([0.33, 51.6])
means = [xx.mean(), yy.mean()]
stds = [xx.std() / 3, yy.std() / 3]
corr = 0.8 # correlation
covs = [[stds[0]**2 , stds[0]*stds[1]*corr],
[stds[0]*stds[1]*corr, stds[1]**2]]
m = np.random.multivariate_normal(means, covs, 1000).T
data = list(zip(m[0],m[1]))
conf = SparkConf().setAppName("Naive_Spark")
sc = SparkContext(conf=conf)
data_rdd = sc.parallelize(data).partitionBy(3).persist()
start = t()
mapped = data_rdd.map(lambda x: mapper(x, data)).filter(lambda x: x[1] == len(data)-1).collect()
print(mapped)
time = str(t()-start)
print(mapped)
with open('/home/.../Schreibtisch/Naive.txt','a') as f:
f.write('Spark: ' + str(mapped) + ' in ' + time + ' ms\n\n')
sc.stop()
plt.scatter(*zip(*data))
plt.show()
目前,我在python程序中创建数据,并且在本地运行良好。
通常我会将我的代码和一些文本文件中的数据发送到我的集群,并使用spark submit执行。
我的问题是,如何分割数据,使三个节点分别处理其中的一部分。
问候
山姆
只是:
相关问题 更多 >
编程相关推荐