在EMR上运行pyspark脚本
我现在使用EC2的集群来自动化我的Apache Spark Pyspark脚本,利用Spark预先配置好的./ec2目录。为了实现自动化和调度,我想用Boto EMR模块把脚本发送到集群上。
我已经成功在EMR集群上安装了Spark,并且可以通过我本地机器上的pyspark版本来启动脚本,设置主节点如下:
$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>
不过,这样做需要我在本地运行这个脚本,因此我无法充分利用Boto的功能,比如1) 启动集群 2) 添加脚本步骤 3) 停止集群。我看到有些例子使用了script-runner.sh和emr的“步骤”命令来运行spark-shell(scala),但我觉得用Python模块(pyspark)应该有更简单的方法。非常感谢!
3 个回答
你需要把部署模式改成集群模式(而不是客户端模式),这样才能从S3访问脚本。
这可能会对你有帮助,尽管它没有使用boto。
可以使用aws cli来创建集群并向其中添加步骤(spark作业)。
1) 创建集群:
aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge --instance-count 1 --use-default-roles
2) 添加步骤(spark作业)。注意,你的python脚本应该存放在主节点上(在这个例子中,它位于/home/hadoop/spark)。
aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE
你还可以把两个步骤合并成一个,创建集群/运行作业并终止集群。
几点说明:1) 我尝试了多种方法从S3读取脚本,但都没有成功 :(
所以我最后还是用boto或aws cli把它复制到节点上。2) 由于我是在emr的一个节点上进行测试,步骤中的部署模式是客户端,所以你应该把它改成集群模式。
这里有一个很好的例子,展示了如何配置。你可以查看“一个快速示例”部分,里面有Python代码。
不过,为了让emr-4.7.2正常工作,我做了一些调整,所以这里有一个对我有效的AWS CLI命令:
aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
下面是pythonjob.py
文件的内容:
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: testjob ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="MyTestJob")
dataTextAll = sc.textFile(sys.argv[1])
dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
dataRDD.saveAsTextFile(sys.argv[2])
sc.stop()
这个程序会从S3读取data.csv
文件,分割每一行,把第一列的值转成字符串,第二列的值转成浮点数,然后根据第一列的值进行分组,计算第二列的总和,最后把结果写回S3。
几点说明: