在Twisted中使用反应器不断扫描数据库表时出现以下错误
我在成功运行了几个小时后,遇到了以下错误。
Traceback (most recent call last): File "/usr/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.6/dist-packages/twisted/python/threadpool.py", line 210, in _worker result = context.call(ctx, function, *args, **kwargs) File "/usr/lib/python2.6/dist-packages/twisted/python/context.py", line 59, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/lib/python2.6/dist-packages/twisted/python/context.py", line 37, in callWithContext return func(*args,**kw) --- <exception caught here> --- File "/usr/lib/python2.6/dist-packages/twisted/enterprise/adbapi.py", line 436, in _runInteraction conn.rollback() File "/usr/lib/python2.6/dist-packages/twisted/enterprise/adbapi.py", line 52, in rollback self._connection.rollback() _mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')
我的代码大概是这样的……
from twisted.internet import reactor, defer,threads
from twisted.enterprise import adbapi
dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test')
class Scanner:
def _execQuery(self,txn):
sql="SELECT tool_id,tool_name FROM tool_master"
txn.execute(sql)
result = txn.fetchall()
return result
def objCursor(self):
return dbpool.runInteraction(self._execQuery)
def printResult(self,result):
print "resssssssssssssssssss",result
reactor.callLater(3,self.deferExecute)
def deferExecute(self):
self.objCursor().addCallback(self.printResult)
Scanner()
class MyApp(object):
reactor.callInThread(Scanner().deferExecute)
reactor.run()
MyApp()
有没有人能告诉我为什么会出现这个错误?
3 个回答
0
你可能还想看看这个代码片段,它提供了一个连接池的子类,可以在"MySQL服务器已断开连接"的情况下重新连接。
http://www.gelens.org/2009/09/13/twisted-connectionpool-revisited/
2
在Matt的建议和帮助下,我写出了以下代码,现在运行得很顺利:
#!usr/bin/python
# Using the "dbmodule" from the previous example, create a ConnectionPool
from twisted.internet import reactor
from twisted.enterprise import adbapi
from twisted.internet import reactor, defer,threads
from twisted.python.threadpool import ThreadPool
import itertools
from twisted.internet.threads import deferToThread
from twisted.internet import reactor, defer, task
from tools.printTime import *
from tools.getVersion import *
from sh_log import *
concurrent = 30
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
#Creating Global Instance variables
path="tools"
lo=Log()
class ToolsBuilder:
def build(self,txn,tool,asset_id):
if tool:
print "\n"
try:
sql="select tool_filename from tool_master where tool_id = %s" %(tool,)
sql_asset="select asset_url from asset_master where asset_id = %s" %(asset_id,)
txn.execute(sql_asset)
asset_url = txn.fetchall()
log_date=lo.log_date()
txn.execute(sql)
result = txn.fetchall()
log='\n'+log_date+"::"+str(result[0][0])+ " tool object is created......\n"
lo.wfile(log)
temp=(path +'/' + str(result[0][0]))
if result:
if temp:
f=open(temp).read()
obj_tool=compile(f, 'a_filename', 'exec')
return obj_tool
except:
lo.wfile("Error in creating executable tool object......")
tb=ToolsBuilder()
class ToolsVectorGenerator:
def generate(self,txn,res_set={}):
v1=[]
for asset_id in res_set.iterkeys():
try:
obj_tools=[]
if asset_id:
print "asset_id..............................",asset_id
log_date=lo.log_date()
log=log_date+"::"+" \nVector generation for the asset number...:"+str(asset_id)
lo.wfile(log)
vector=[]
tools_arr=[]
obj_tools=[]
for tool in res_set[asset_id]:
if tool:
print "tool..............",tool
temp_tool=tb.build(txn,tool,asset_id)
print "temp_tool..........",temp_tool
#fetch data of tool setting.....
sql_tool_setting="select * from tool_asset_settings where tool_id =%s" %(tool,)
txn.execute(sql_tool_setting)
result_tool_setting = txn.fetchall()
tool_id=result_tool_setting[0][1]
t_id=int(tool_id)
tool_id_arr=[]
tool_id_arr.append(t_id)
tool_id_arr.append(result_tool_setting)
tool_id_arr.append(temp_tool)
tools_arr.append(tool_id_arr)
#fetch data from asset master
sql_asset="select asset_name from asset_master where asset_id=%s" %(asset_id,)
txn.execute(sql_asset)
result_asset = txn.fetchall()
vector.append(result_asset)
vector.append(tools_arr)
except:
lo.wfile("\nError in getting asset,please check your database or network connection......")
tvm.executeVector(vector)
tvg=ToolsVectorGenerator()
class Tool:
def exectool(self,tool):
exec tool
return
def getResult(self,tool):
return deferToThread(self.exectool, tool)
to=Tool()
class StateMachine:
def setPriority(self,txn,tup):
temp=[]
arr=[]
for li in tup:
sql2="select tool_dependency from tool_asset_settings where tool_id =%s" %(li[1],)
txn.execute(sql2)
result12 = txn.fetchall()
arr=[]
if result12[0][0]!=None:
tup12=result12[0][0]
arr=(li[0],tup12)
# print "arr.........",arr
if arr in tup:
print "This element is already exist......."
else:
temp.append(arr)
temp.extend(tup)
return tuple(temp)
st=StateMachine()
class ToolsVectorExecutionManager(object):
def executeVector(self,toolsvector):
print "toolsvector================>",toolsvector
if toolsvector:
for tools in toolsvector[1]:
if tools[2] != None:
to.getResult(tools[2])
tvm=ToolsVectorExecutionManager()
class ToolsToExecuteAnalyzer:
def __init__(self,dbpool=None):
self.dbpool = dbpool
self.loopCall = task.LoopingCall(self.myQuery)
def start(self):
print "Started scanner"
self.loopCall.start(3)
def stop(self):
print "Stopping scanner"
self.loopCall.stop()
def myQuery(self):
def interact(txn):
sql="SELECT tool_asset_id,tool_execute_id FROM tool_to_execute where status='0'"
txn.execute(sql)
result=txn.fetchall()
if result:
tool_asset_id=tuple([int(e[0]) for e in result])
tool_execute_id=tuple([int(e[1]) for e in result])
if len(tool_asset_id)>1:
sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id IN %s"%(tool_asset_id,)
else:
sql1="SELECT asset_id,tool_id FROM tool_in_assets WHERE tool_asset_id = (%s)"%(tool_asset_id)
txn.execute(sql1)
tup = txn.fetchall()
#dependency check for the selected tool
asset_tool=st.setPriority(txn,tup)
log_date=lo.log_date()
log=log_date+"::priority have been set for the tools......\n"
lo.wfile(log)
#creating group of asset with their tools
res={}
for element in asset_tool:
if element[0] in res:
res[element[0]].append(int(element[1]))
else:
res[int(element[0])] = [int(element[1])]
#Recored deletion from tool_to_execute table
if res!=None and res.keys()!=[]:
for asset_id in res.iterkeys():
if len(tool_execute_id)>1:
sql_del="delete from tool_to_execute where tool_execute_id in %s " %(tool_execute_id,)
else:
sql_del="delete from tool_to_execute where tool_execute_id = %s" %(tool_execute_id)
txn.execute(sql_del)
#New Addition of vector
tvg.generate(txn,res)
# return res
d = self.dbpool.runInteraction(interact)
d.addCallbacks(self.printResult,self.printError)
def printResult(self,res):
print "In printResult after generate...."
def printError(self,error):
print "Got Error: %r" % error
error.printTraceback()
ToolsToExecuteAnalyzer()
if __name__ == '__main__':
from twisted.internet import reactor
dbpool = adbapi.ConnectionPool("MySQLdb", 'localhost', 'test', 'test','test')
s = ToolsToExecuteAnalyzer(dbpool)
reactor.callWhenRunning(s.start)
reactor.addSystemEventTrigger('before','shutdown',s.stop)
reactor.run()
这就是我的全部代码,我只是想知道现在有多少个线程在运行,也就是说每个工具是不是都开了一个新线程?
无论如何,感谢Matt的帮助..:)
2
有人能告诉我为什么我会得到这个错误吗..
因为你做错了。
runInteraction
是用来运行你提供的函数,并且这个函数会接收一个指向事务的游标作为参数,这个事务是在一个线程中运行的。你不应该调用 reactor.callInThread(Scanner().deferExecute)
。
- 更好的做法是使用
twisted.internet.task.LoopingCall
,这样可以确保上一个调用完成后再进行下一个调用。 - 在你的例子中,你只是运行了一个查询,所以你可以直接使用
ConnectionPool.runQuery
,而不是ConnectionPool.runInteraction
。 - 使用
errorBack
函数来报告异常。
我尝试修正你格式不太好的代码,我觉得你可能需要的是这样的:
from twisted.internet import reactor, defer,threads
from twisted.enterprise import adbapi
dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test')
class Scanner:
def _execQuery(self,txn):
sql="SELECT tool_id,tool_name FROM tool_master"
txn.execute(sql)
result = txn.fetchall()
return result
def objCursor(self):
return dbpool.runInteraction(self._execQuery)
def printResult(self,result):
print "resssssssssssssssssss",result
reactor.callLater(3,self.deferExecute)
def deferExecute(self):
self.objCursor().addCallback(self.printResult)
Scanner()
class MyApp(object):
reactor.callInThread(Scanner().deferExecute)
reactor.run()
MyApp()
如果你打算写一个 twisted
的 Application
,那么修改这个 Scanner
类以继承自 twisted.application.service.Service
会很简单。
from twisted.internet import reactor, defer, task
from twisted.enterprise import adbapi
class Scanner(object):
def __init__(self,dbpool=None):
self.dbpool = dbpool
self.loopCall = task.LoopingCall(self.myQuery)
def start(self):
print "Started scanner"
self.loopCall.start(3)
def stop(self):
print "Stopping scanner"
self.loopCall.stop()
def myQuery(self):
def interact(txn):
sql="SELECT tool_id,tool_name FROM tool_master"
txn.execute(sql)
return txn.fetchall()
d = self.dbpool.runInteraction(interact)
d.addCallbacks(self.printResult,self.printError)
def printResult(self,result):
print "Got Result: %r" % result
def printError(self,error):
print "Got Error: %r" % error
error.printTraceback()
if __name__ == '__main__':
from twisted.internet import reactor
dbpool = adbapi.ConnectionPool("MySQLdb", '192.168.1.102','test', 'test', 'test')
s = Scanner(dbpool)
reactor.callWhenRunning(s.start)
reactor.addSystemEventTrigger('before','shutdown',s.stop)
reactor.run()