将Twisted `enterprise.adbapi`查询添加到`twistd`守护进程创建的反应器循环中
我在一个Twisted的.tac
插件中使用twisted.enterprise.adbapi
,发现像aConnectionPool.runQuery(sqlQuery)
这样的函数返回的延迟对象(deferred object)不会触发,除非调用reactor.run()
。我想知道如何将查询添加到twistd
创建的反应器循环中,而不是调用reactor.run()
。这是一个通用的过程,还是说这是异步数据库API特有的?
编辑 - 附上代码:
from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import json
import base64
from twisted.enterprise import adbapi
class StringProducer(object):
implements(IBodyProducer)
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
def httpRequest(url, values, headers={}, method='POST'):
agent = Agent(reactor)
d = agent.request(method,
url,
Headers(headers),
StringProducer(values)
)
def handle_response(response):
if response.code == 204:
d = defer.succeed('')
else:
class SimpleReceiver(protocol.Protocol):
def __init__(s, d):
s.buf = ''; s.d = d
def dataReceived(s, data):
s.buf += data
response = json.loads(data)
receipt = response[u'receipt']
if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
transactionID = receipt[u'original_transaction_id']
date = receipt[u'original_purchase_date']
purchaseDate = date.strip(' Etc/GMT')
print transactionID
print purchaseDate
dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd')
dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))
def finishInsert(dObject, pool):
print 'inserted!'
pool.close()
dOperation.addCallback(finishInsert, dbpool)
def insertError(dObject):
print 'insert error!'
dOperation.addErrback(insertError)
def connectionLost(s, reason):
s.d.callback(s.buf)
d = defer.Deferred()
response.deliverBody(SimpleReceiver(d))
return d
d.addCallback(handle_response)
class StoreServer(protocol.Protocol):
def dataReceived(self, data):
a = data.split(':delimiter:')
if a[0] == 'addToUserList':
receiptBase64 = base64.standard_b64encode(a[1])
jsonReceipt = json.dumps({'receipt-data':receiptBase64})
httpRequest(
"https://buy.itunes.apple.com/verifyReceipt",
jsonReceipt,
{'Content-Type': ['application/x-www-form-urlencoded']}
)
application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)
1 个回答
2
你的代码每次发请求的时候都会新建一个 ConnectionPool
。这个新的 ConnectionPool
会自己再创建一个新的线程池来执行查询,并且需要重新建立与数据库的连接。
这就意味着你实际上并没有使用连接池。你只是创建了很多连接,并且每个连接只用一次。而且,错误处理函数 insertError
并没有关闭这个连接池。
这些问题加在一起,意味着你可以同时创建的线程和连接数量没有限制,只有你的系统对内存分配或打开的套接字数量的限制。当你达到这些限制时,情况就会变得很糟糕。
这也意味着每当查询出错时,都会泄漏出一些线程和连接(ConnectionPool
启动时会建立3个线程/连接)。如果错误发生得太多,你将无法再创建新的线程或连接,这样就无法再查询数据库了。虽然你的查询很简单,可能觉得出错的机会不大,但MySQL有时会随机断开客户端的连接(你可能对此有所了解,因为你确实添加了错误处理来报告失败)。
正确使用 ConnectionPool
的方式是创建一个(或者两个,或者其他少量固定数量的)连接池,然后在所有查询中重复使用它。至于这些问题是否和你最初观察到的问题有关,我不太清楚,但这些问题可能是你需要解决的。