在EMR上运行pyspark脚本

25 投票
3 回答
36812 浏览
提问于 2025-04-18 04:17

我现在使用EC2的集群来自动化我的Apache Spark Pyspark脚本,利用Spark预先配置好的./ec2目录。为了实现自动化和调度,我想用Boto EMR模块把脚本发送到集群上。

我已经成功在EMR集群上安装了Spark,并且可以通过我本地机器上的pyspark版本来启动脚本,设置主节点如下:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>

不过,这样做需要我在本地运行这个脚本,因此我无法充分利用Boto的功能,比如1) 启动集群 2) 添加脚本步骤 3) 停止集群。我看到有些例子使用了script-runner.sh和emr的“步骤”命令来运行spark-shell(scala),但我觉得用Python模块(pyspark)应该有更简单的方法。非常感谢!

3 个回答

3

你需要把部署模式改成集群模式(而不是客户端模式),这样才能从S3访问脚本。

5

这可能会对你有帮助,尽管它没有使用boto。

可以使用aws cli来创建集群并向其中添加步骤(spark作业)。

1) 创建集群:

aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge  --instance-count 1 --use-default-roles 

2) 添加步骤(spark作业)。注意,你的python脚本应该存放在主节点上(在这个例子中,它位于/home/hadoop/spark)。

aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE

你还可以把两个步骤合并成一个,创建集群/运行作业并终止集群。

几点说明:1) 我尝试了多种方法从S3读取脚本,但都没有成功 :(

所以我最后还是用boto或aws cli把它复制到节点上。2) 由于我是在emr的一个节点上进行测试,步骤中的部署模式是客户端,所以你应该把它改成集群模式。

18

这里有一个很好的例子,展示了如何配置。你可以查看“一个快速示例”部分,里面有Python代码。

不过,为了让emr-4.7.2正常工作,我做了一些调整,所以这里有一个对我有效的AWS CLI命令:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

下面是pythonjob.py文件的内容:

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: testjob  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="MyTestJob")
    dataTextAll = sc.textFile(sys.argv[1])
    dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
    dataRDD.saveAsTextFile(sys.argv[2])
    sc.stop()

这个程序会从S3读取data.csv文件,分割每一行,把第一列的值转成字符串,第二列的值转成浮点数,然后根据第一列的值进行分组,计算第二列的总和,最后把结果写回S3。

几点说明:

  • 我决定把spark.yarn.submit.waitAppCompletion=true保留,这样我可以在控制台监控作业的执行情况。
  • 输入和输出路径(分别是sys.argv[1]sys.argv[2])作为作业提交的一部分传递给脚本(在add-steps命令的Args部分)。
  • 请注意,在配置作业时,Hadoop 2.7及以上版本必须使用s3a://的URI,而不是s3n://s3://
  • 如果你的集群在VPC中,并且你打算在EMR作业中进行读写操作,你需要为Amazon S3创建一个VPC端点

撰写回答