将Twisted `enterprise.adbapi`查询添加到`twistd`守护进程创建的反应器循环中

4 投票
1 回答
1278 浏览
提问于 2025-04-17 04:56

我在一个Twisted的.tac插件中使用twisted.enterprise.adbapi,发现像aConnectionPool.runQuery(sqlQuery)这样的函数返回的延迟对象(deferred object)不会触发,除非调用reactor.run()。我想知道如何将查询添加到twistd创建的反应器循环中,而不是调用reactor.run()。这是一个通用的过程,还是说这是异步数据库API特有的?

编辑 - 附上代码:

from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer

from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

import json
import base64
from twisted.enterprise import adbapi

class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

def httpRequest(url, values, headers={}, method='POST'):

    agent = Agent(reactor)
    d = agent.request(method,
                      url,
                      Headers(headers),
                      StringProducer(values)
                      )

    def handle_response(response):
        if response.code == 204:
            d = defer.succeed('')
        else:
            class SimpleReceiver(protocol.Protocol):
                def __init__(s, d):
                    s.buf = ''; s.d = d
                def dataReceived(s, data):
                    s.buf += data
                    response = json.loads(data)

                    receipt = response[u'receipt']
                    if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
                        transactionID = receipt[u'original_transaction_id']
                        date = receipt[u'original_purchase_date']
                        purchaseDate = date.strip(' Etc/GMT')
                        print transactionID
                        print purchaseDate

                        dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user',  passwd='passwd')
                        dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))

                        def finishInsert(dObject, pool):
                            print 'inserted!'
                            pool.close()
                        dOperation.addCallback(finishInsert, dbpool)

                        def insertError(dObject):
                            print 'insert error!'
                        dOperation.addErrback(insertError)

                def connectionLost(s, reason):
                    s.d.callback(s.buf)

            d = defer.Deferred()
            response.deliverBody(SimpleReceiver(d))
        return d

    d.addCallback(handle_response)

class StoreServer(protocol.Protocol):

    def dataReceived(self, data):
        a = data.split(':delimiter:')

        if a[0] == 'addToUserList':
            receiptBase64 = base64.standard_b64encode(a[1])
            jsonReceipt = json.dumps({'receipt-data':receiptBase64})

            httpRequest(
                        "https://buy.itunes.apple.com/verifyReceipt",
                        jsonReceipt,
                        {'Content-Type': ['application/x-www-form-urlencoded']}
                        )

application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)

1 个回答

2

你的代码每次发请求的时候都会新建一个 ConnectionPool。这个新的 ConnectionPool 会自己再创建一个新的线程池来执行查询,并且需要重新建立与数据库的连接。

这就意味着你实际上并没有使用连接池。你只是创建了很多连接,并且每个连接只用一次。而且,错误处理函数 insertError 并没有关闭这个连接池。

这些问题加在一起,意味着你可以同时创建的线程和连接数量没有限制,只有你的系统对内存分配或打开的套接字数量的限制。当你达到这些限制时,情况就会变得很糟糕。

这也意味着每当查询出错时,都会泄漏出一些线程和连接(ConnectionPool 启动时会建立3个线程/连接)。如果错误发生得太多,你将无法再创建新的线程或连接,这样就无法再查询数据库了。虽然你的查询很简单,可能觉得出错的机会不大,但MySQL有时会随机断开客户端的连接(你可能对此有所了解,因为你确实添加了错误处理来报告失败)。

正确使用 ConnectionPool 的方式是创建一个(或者两个,或者其他少量固定数量的)连接池,然后在所有查询中重复使用它。至于这些问题是否和你最初观察到的问题有关,我不太清楚,但这些问题可能是你需要解决的。

撰写回答