Databricks com.microsoft.sqlserver.jdbc.SQLServerException:语法不正确

2024-06-02 06:54:13 发布

您现在位置:Python中文网/ 问答频道 /正文

当我尝试执行以下函数时,Apache Spark/Databricks上出现错误:

com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near ')'.

代码是:

Ancestor = ancestor.getAncestorPath(conn,log,processId, entityStageId)
dataPath = Ancestor["ancestorPath"]

#Unpack the config specifications
import json
if len(Ancestor["dfConfig"]) > 0:
  dfConfig = json.loads(Ancestor["dfConfig"])

#Display the Ancestor path for debugging
print(f"Ancestor:{Ancestor}")

问题似乎是连接到SQLDB

实际功能如下:

def getAncestorPath(connectionInst: conn.connect, log: logs.Logging,  processId, entityStageId = "-1"):
  log.writeToLogs(processId,logs.LogType.Info, logs.EventType.GetAncestorPath, logs.LogMessage.GetAncestorPath)
  pathQuery = f"SELECT * FROM Config.GetAncestorSlicePath WHERE (ProcessID = {processId}) OR (ProcessID IS NULL AND EntityStageID = {entityStageId})"
  
  AncestorPath = ""
  dfConfig = ""
  
  entityPath = connectionInst.readFromDb(processId,pathQuery)
  
  ancestorPathToList = entityPath.limit(1).rdd.collect()
  
  if len(ancestorPathToList) is 0:
    log.writeToLogs(processId,logs.LogType.Error, logs.EventType.NoAncestorPath, logs.LogMessage.NoAncestorPath, errorType = logs.ErrorType.NoAncestorPath)
    log.writeToLogs(processId,logs.LogType.Error, logs.EventType.FailGetAncestorPath, logs.LogMessage.FailGetAncestorPath, errorType = logs.ErrorType.FailGetAncestorPath)
    raise ValueError(logs.LogMessage.NoAncestorPath.value)
  
  for row in ancestorPathToList:
    AncestorPath = f"/mnt/lake/{row.LakePath}"
    dfConfig = row.Config
  
  log.writeToLogs(processId,logs.LogType.Info, logs.EventType.SuccessGetAncestorPath, logs.LogMessage.SuccessGetAncestorPath)
  return {"ancestorPath":AncestorPath,"dfConfig":dfConfig}

错误跟踪如下所示:

Py4JJavaError                             Traceback (most recent call last)
/databricks/python/lib/python3.8/site-packages/hydr8/utils/connection.py in readFromDb(self, processId, query)
     22     try:
---> 23         jdbcDF = (self.connectionProperties.spark.read
     24         .format("jdbc")

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    209         else:
--> 210             return self._df(self._jreader.load())
    211 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".

如前所述,在尝试连接到SQL数据库时出现问题。用于连接数据库的函数如下所示:

到SQL DB的jdbc读取连接

class connect:  
  def __init__(self, conprops: conn.ConnectionProperties, logs):
        self.connectionProperties = conprops
        self.log = logs

  def readFromDb(self,processId, query):
    try:
        jdbcDF = (self.connectionProperties.spark.read
        .format("jdbc")
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("url", f"jdbc:sqlserver://{self.connectionProperties.dbConnectionProperties.DBServer}.database.windows.net;database={self.connectionProperties.dbConnectionProperties.DBDatabase}")
        .option("user", self.connectionProperties.dbConnectionProperties.DBUser)
        .option("query", query)
        .option("password", self.connectionProperties.dbConnectionProperties.DBPword)
        .load()    
          )
        return jdbcDF
    except Exception as e:
        self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
        raise Exception(f"{logging.LogMessage.FailReadFromDb.value} ERROR: {e}")
    except:
        self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
        raise Exception(f"{logging.LogMessage.FailReadFromDb.value}")

欢迎有任何想法


Tags: selflogreturnlogginglogsancestorjdbclogmessage