将Jar添加到独立pysp

2024-04-29 04:37:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在启动一个pyspark程序:

$ export SPARK_HOME=
$ export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.9-src.zip
$ python

以及py代码:

from pyspark import SparkContext, SparkConf

SparkConf().setAppName("Example").setMaster("local[2]")
sc = SparkContext(conf=conf)

如何添加jar依赖项,如Databricks csv jar?使用命令行,我可以添加如下包:

$ pyspark/spark-submit --packages com.databricks:spark-csv_2.10:1.3.0 

但我没有用这些。这个程序是一个更大的工作流的一部分,它没有使用spark submit我应该能够运行我的./foo.py程序,它应该可以正常工作。

  • 我知道你可以为extraClassPath设置spark属性,但是你必须将JAR文件复制到每个节点?
  • 尝试过conf.set(“spark.jars”,“jar1,jar2”),但在py4j CNF异常中也不起作用

Tags: csvpy程序homeconfexportsparkpyspark
3条回答

这里有很多方法(设置ENV vars、添加到$SPARK_HOME/conf/SPARK-defaults.conf等等),一些答案已经涵盖了这些。我想为那些特别使用Jupyter笔记本和在笔记本中创建Spark会话的用户添加一个附加的答案。以下是最适合我的解决方案(在我的情况下,我希望加载Kafka包):

spark = SparkSession.builder.appName('my_awesome')\
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0')\
    .getOrCreate()

使用这一行代码,我不需要做任何其他事情(没有env或conf文件更改)。

2019-10-30更新: 上面这一行代码仍然运行良好,但我想为看到这个答案的新用户指出以下几点:

  • 您需要在结尾处更改版本以匹配您的Spark版本,因此对于Spark 2.4.4,您需要:org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
  • 这个jar spark-sql-kafka-0-10_2.12的最新版本对我来说即将崩溃(Mac膝上型电脑),因此如果在调用“readStream”时发生崩溃,请恢复到2.11。

可以使用$SPARK_HOME/conf/spark-defaults.conf中的^{}(设置^{}也应该有效)属性传递任何依赖项。它应该是一个逗号分隔的坐标列表。

在启动JVM和this happens during ^{} initialization之前,必须设置包或类路径属性。这意味着这里不能使用SparkConf.set方法。

另一种方法是在初始化SparkConf对象之前设置PYSPARK_SUBMIT_ARGS环境变量:

import os
from pyspark import SparkConf

SUBMIT_ARGS = "--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

conf = SparkConf()
sc = SparkContext(conf=conf)

对于不同的jar(“MongoDB连接器for Spark”,mongo-spark-connector)我遇到了类似的问题,但需要注意的是,我通过pysparkcondaconda install pyspark)中安装了Spark。因此,对特定于Spark的答案的所有帮助都不是完全有用的。对于那些使用conda安装的用户,下面是我拼凑的过程:

1)找到pyspark/jars的位置。我的在这条路上:~/anaconda2/pkgs/pyspark-2.3.0-py27_0/lib/python2.7/site-packages/pyspark/jars

2)Downloadjar文件放入步骤1中找到的路径,从this location

3)现在您应该能够运行这样的代码(代码取自MongoDB official tutorial,使用Briford Wylie's answer above):

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/spark.test_pyspark_mbd_conn") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/spark.test_pyspark_mbd_conn") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
    .getOrCreate()

免责声明:

1)我不知道这个答案是否合适,请告诉我一个更好的地方,我会搬家的。

2)如果您认为我有错误或对上述过程有改进,请发表意见,我将进行修改。

相关问题 更多 >