无法从Pysp连接到cassandra

2024-04-25 12:58:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图从Pyspark连接到cassandra并运行一些查询。 以下是我所做的所有步骤:

首先我安装了Spark:

wget http://www.apache.org/dyn/closer.lua/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz

那么:

^{pr2}$

然后我运行这个命令:

./bin/pyspark

我知道了:

16:48 $ ./bin/pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/05/02 16:50:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/02 16:50:33 WARN Utils: Your hostname, rleitao-H81M-HD3 resolves to a loopback address: 127.0.1.1; using 192.168.1.26 instead (on interface eth0)
17/05/02 16:50:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/05/02 16:50:36 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> 

然后:

from pyspark.sql import SQLContext
sql = SQLContext(sc)

然后:

df = sql.read.format("org.apache.spark.sql.cassandra").\
option("spark.cassandra.connection.host", "ec2-IPV4-Adress.REGION.compute.amazonaws.com").\
option("spark.cassandra.auth.username", "user"). \
option("spark.cassandra.auth.password", "pass"). \
option(keyspace="mykeyspace", table="mytable").load()

然后,Ops,我得到了一个巨大的错误:

    >>> df = sql.read.format("org.apache.spark.sql.cassandra").\
    ... option("spark.cassandra.connection.host", "ec2-IPV4-adress.REGION.compute.amazonaws.com").\
    ... option("spark.cassandra.auth.username", "user"). \
    ... option("spark.cassandra.auth.password", "pass"). \
    ... option(keyspace="mykeyspace", table="mytable").load()
    17/05/02 16:47:43 ERROR Schema: Failed initialising database.
    Unable to open a test connection to the given database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, username = APP. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
    java.sql.SQLException: Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@fa39daf, see the next exception for details.
        at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
        at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source)
        at org.apache.derby.impl.jdbc.EmbedConnection.<init>(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at org.apache.derby.jdbc.InternalDriver.getNewEmbedConnection(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
        at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
        at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
        at java.sql.DriverManager.getConnection(DriverManager.java:664)
        at java.sql.DriverManager.getConnection(DriverManager.java:208)
ct.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
        at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
        at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
        at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
        at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
        at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
        at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
        at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
        at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:57)
        at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
        at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199)
        at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
        at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
        at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
        at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
        at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:192)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        ... 108 more
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/context.py", line 464, in read
        return DataFrameReader(self)
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/readwriter.py", line 70, in __init__
        self._jreader = spark._ssql_ctx.read()
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/souadmabrouk/Bureau/Souad/project/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 79, in deco
        raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':"
    >>>

卡珊德拉怎么用连接器?我找不到明确的文件。顺便说一下,cassandra集群在AWS上。在

任何帮助都会得到回报的。在


Tags: toorghadoopsourcesqlapachejavaat
2条回答

以下是我的工作:

./bin/pyspark  master local[*]  packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.2  conf spark.cassandra.connection.host=host.name  conf spark.cassandra.auth.username=cassandraname  conf spark.cassandra.auth.password=cassandrapwd

>>> df = spark.read.format("org.apache.spark.sql.cassandra")\
   .options(table="tablename", keyspace="keyspacename").load()

>>> df.show()
  1. 运行pyspark时使用:
    ./bin/pyspark packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.2
  2. 在代码中,使用connection config创建dict
    hosts = {"spark.cassandra.connection.host": 'host_dns_or_ip_1,host_dns_or_ip_2,host_dns_or_ip_3'}
  3. 在代码中,使用connection config创建Dataframe
    data_frame = sqlContext.read.format("org.apache.spark.sql.cassandra").options(**hosts).load(keyspace="your_keyspace", table="your_table")

相关问题 更多 >