让PySpark中的简练Python lambda代码更易懂
我已经在一个测试集群上用Anaconda成功运行了pyspark的线性回归示例,感觉挺不错的。
接下来,我想把代码改得更简单一些,方便我们的分析师使用。具体来说,我想把下面这个lambda函数改成一个普通的函数,这样对我们现在的Python水平来说会更容易理解。我尝试了好几次,但同时使用map、lambda和numpy.array让我有点困惑。
data = sc.textFile("hdfs://nameservice1:8020/spark_input/linear_regression/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
整个程序在下面。任何帮助都非常感谢。
#!/opt/tools/anaconda/bin python
from pyspark import SparkConf, SparkContext
from pyspark.mllib.regression import LinearRegressionWithSGD
from numpy import array
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("Python - Linear Regression Test")
conf.set("spark.executor.memory", "1g")
sc = SparkContext(conf = conf)
# Load and parse the data
data = sc.textFile("hdfs://nameservice1:8020/spark_input/linear_regression/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
# Build the model
numIterations = 50
model = LinearRegressionWithSGD.train(parsedData, numIterations)
# Evaluate model on training examples and compute training error
valuesAndPreds = parsedData.map(lambda point: (point.item(0), model.predict(point.take(range(1, point.size)))))
MSE = valuesAndPreds.map(lambda (v, p): (v-p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("training Mean Squared Error = " + str(MSE))
2 个回答
0
Amadan的回答在Python的范围内是正确的,这也是最初的问题。不过,当我们在Spark中使用RDD(弹性分布式数据集)时,具体的实现看起来会稍微不同,因为我们使用的是Spark的map函数,而不是Python的:
# Declare functions at startup:
if __name__ == "__main__":
def line_to_float_array(line):
string_array = line.replace(',', ' ').split(' ')
float_array = map(float, string_array)
return array(float_array)
#
#
sc = SparkContext(conf = conf)
# Load and parse the data
data = sc.textFile("hdfs://nameservice1:8020/sparkjeb/lpsa.data")
parsedData = data.map(line_to_float_array)
1
def line_to_array(line):
space_separated_line = line.replace(',', ' ')
string_array = space_separated_line.split(' ')
float_array = map(float, string_array)
return array(float_array)
parsedData = map(line_to_float_array, data)
或者,可以这样说,
def line_to_array(line):
space_separated_line = line.replace(',', ' ')
string_array = space_separated_line.split(' ')
float_array = [float(x) for x in string_array]
return array(float_array)
parsedData = [line_to_float_array(line) for line in data]