当我尝试执行以下函数时,Apache Spark/Databricks上出现错误:
com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near ')'.
代码是:
Ancestor = ancestor.getAncestorPath(conn,log,processId, entityStageId)
dataPath = Ancestor["ancestorPath"]
#Unpack the config specifications
import json
if len(Ancestor["dfConfig"]) > 0:
dfConfig = json.loads(Ancestor["dfConfig"])
#Display the Ancestor path for debugging
print(f"Ancestor:{Ancestor}")
问题似乎是连接到SQLDB
实际功能如下:
def getAncestorPath(connectionInst: conn.connect, log: logs.Logging, processId, entityStageId = "-1"):
log.writeToLogs(processId,logs.LogType.Info, logs.EventType.GetAncestorPath, logs.LogMessage.GetAncestorPath)
pathQuery = f"SELECT * FROM Config.GetAncestorSlicePath WHERE (ProcessID = {processId}) OR (ProcessID IS NULL AND EntityStageID = {entityStageId})"
AncestorPath = ""
dfConfig = ""
entityPath = connectionInst.readFromDb(processId,pathQuery)
ancestorPathToList = entityPath.limit(1).rdd.collect()
if len(ancestorPathToList) is 0:
log.writeToLogs(processId,logs.LogType.Error, logs.EventType.NoAncestorPath, logs.LogMessage.NoAncestorPath, errorType = logs.ErrorType.NoAncestorPath)
log.writeToLogs(processId,logs.LogType.Error, logs.EventType.FailGetAncestorPath, logs.LogMessage.FailGetAncestorPath, errorType = logs.ErrorType.FailGetAncestorPath)
raise ValueError(logs.LogMessage.NoAncestorPath.value)
for row in ancestorPathToList:
AncestorPath = f"/mnt/lake/{row.LakePath}"
dfConfig = row.Config
log.writeToLogs(processId,logs.LogType.Info, logs.EventType.SuccessGetAncestorPath, logs.LogMessage.SuccessGetAncestorPath)
return {"ancestorPath":AncestorPath,"dfConfig":dfConfig}
错误跟踪如下所示:
Py4JJavaError Traceback (most recent call last)
/databricks/python/lib/python3.8/site-packages/hydr8/utils/connection.py in readFromDb(self, processId, query)
22 try:
---> 23 jdbcDF = (self.connectionProperties.spark.read
24 .format("jdbc")
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
209 else:
--> 210 return self._df(self._jreader.load())
211
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
109 try:
--> 110 return f(*a, **kw)
111 except py4j.protocol.Py4JJavaError as e:
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
如前所述,在尝试连接到SQL数据库时出现问题。用于连接数据库的函数如下所示:
class connect:
def __init__(self, conprops: conn.ConnectionProperties, logs):
self.connectionProperties = conprops
self.log = logs
def readFromDb(self,processId, query):
try:
jdbcDF = (self.connectionProperties.spark.read
.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", f"jdbc:sqlserver://{self.connectionProperties.dbConnectionProperties.DBServer}.database.windows.net;database={self.connectionProperties.dbConnectionProperties.DBDatabase}")
.option("user", self.connectionProperties.dbConnectionProperties.DBUser)
.option("query", query)
.option("password", self.connectionProperties.dbConnectionProperties.DBPword)
.load()
)
return jdbcDF
except Exception as e:
self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
raise Exception(f"{logging.LogMessage.FailReadFromDb.value} ERROR: {e}")
except:
self.log.writeToLogs(self,processId,logging.LogType.Error, logging.EventType.FailReadFromDb, logging.LogMessage.FailReadFromDb, errorType = logging.ErrorType.FailReadFromDb)
raise Exception(f"{logging.LogMessage.FailReadFromDb.value}")
欢迎有任何想法
目前没有回答
相关问题 更多 >
编程相关推荐