Python Twisted 代理超时

4 投票
1 回答
4173 浏览
提问于 2025-04-17 16:31

我对Twisted这个框架还很陌生,正在尝试制作一个异步客户端,用来获取一些网址,并把每个网址的结果保存到不同的文件里。当我用大约10个服务器运行程序时,反应器循环正常结束,程序也能顺利退出。但是当我用比如Alexa前2500的网站运行程序时,程序开始获取网址,但之后就不再结束了。我设置了超时,但似乎没有效果,我觉得可能有某个打开的连接没有触发任何回调,无论是出错还是成功。我的目标是,一旦程序获取了网页,或者每个连接的超时到了,程序就应该结束,并关闭所有活跃的文件描述符。

抱歉,代码的缩进在复制粘贴时没有保留,现在我检查过了,已经修复了。代码是最基本的示例,注意我遇到的问题是,当我用大量网站进行爬取时,反应器没有停止。

#!/usr/bin/env python

from pprint import pformat
from twisted.internet import reactor
import twisted.internet.defer
import sys
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished, output):
         self.whenFinished = whenFinished
         self.output = output

    def dataReceived(self, bytes):
         #print '##### Received #####\n%s' % (bytes,)
         self.output.write('%s' % (bytes,))

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.output.write('Finished: %s \n'%(reason.getErrorMessage()))
        self.output.write('#########end########%s\n'%(reason.getErrorMessage()))
        self.whenFinished.callback(None)

def handleResponse(r, output, url):
    output.write('############start############\n')
    output.write('%s\n'%(url))
    #print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    output.write("version=%s\ncode=%s\nphrase='%s'"\
             %(r.version, r.code, r.phrase))
    for k, v in r.headers.getAllRawHeaders():
        #print "%s: %s" % (k, '\n  '.join(v))
        output.write("%s: %s\n" % (k, '\n  '.join(v)))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished, output))
    return whenFinished

def handleError(reason):
    print reason
    #reason.printTraceback()
    #reactor.stop()

def getPage(url, output):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET',
                       url,
    Headers({'User-Agent': ['Mozilla/4.0 (Windows XP 5.1) Java/1.6.0_26']}),
                       None)
    d._connectTimeout = 10
    d.addCallback(handleResponse, output, url)
    d.addErrback(handleError)
    return d

if __name__ == '__main__':
    semaphore = twisted.internet.defer.DeferredSemaphore(500)
    dl = list()
    ipset = set()
    queryset =  set(['http://www.google.com','http://www.google1.com','http://www.google2.com', "up to 2500 sites"])
    filemap = {}
    for q in queryset:
        fpos = q.split('http://')[1].split(':')[0]
        dl.append(semaphore.run(getPage, q, filemap[fpos]))
    dl = twisted.internet.defer.DeferredList(dl)
    dl.addCallbacks(lambda x: reactor.stop(), handleError)
    reactor.run()
    for k in filemap:
        filemap[k].close()

谢谢。

Jeppo

1 个回答

8

你的超时代码至少有两个问题。

首先,你设置的唯一超时是 _connectTimeout,而且你是在 Agent.request 返回的 Deferred 上设置的。这个属性其实没什么意义,Agent 的实现和 Twisted 的任何部分都不会理会它。我想你是想把这个属性设置在 Agent 实例上,这样才会有一些效果。不过,这个属性是私有的,不是让你直接使用的。你应该在创建 Agent 的时候传入 connectTimeout=10

其次,这个超时只影响 TCP 连接的建立时间。设置为 10 意味着,如果在 10 秒内无法建立与某个特定 URL 的 HTTP 服务器的 TCP 连接,请求就会因为超时而失败。不过,如果在 10 秒内成功建立了连接,那么这个超时就没有其他意义了。如果服务器花了 10 个小时才给你回复,Agent 就会在那里等 10 个小时。你需要一个额外的超时,也就是整个请求的超时。

这需要单独实现,可以使用 reactor.callLater 和可能的 Deferred.cancel。例如,

...
d = agent.request(...)
timeoutCall = reactor.callLater(60, d.cancel)
def completed(passthrough):
    if timeoutCall.active():
        timeoutCall.cancel()
    return passthrough
d.addBoth(completed)
...

撰写回答