在我的代码中,我有两个假设的任务:一个从生成器获取url并使用Twisted的cooperatior批量下载它们,另一个获取下载的源并异步解析它。我试图将所有的fetch和parse任务封装到一个延迟对象中,该对象在下载所有页面和解析所有源代码时进行回调。在
我想出了以下解决方案:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks = defer.DeferredList(deferreds)
main_tasks.addCallback(on_finish)
return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
代码是有效的,但我觉得我要么是遗漏了一些显而易见的东西,要么是对一个简单的扭曲模式一无所知,这种模式会让这件事简单得多。有没有更好的方法来返回一个在所有获取和解析完成后回调的延迟的方法?在
正如目前所写的,在我看来,这段代码将有有限数量的并行下载,但有无限数量的并行解析作业。这是故意的吗?我将假设“否”,因为如果您的网络碰巧很快,而您的解析器恰好很慢,当url的数量接近无穷大时,您的内存使用率也将是无限的:)。在
因此,这里有一个并行性有限但通过下载按顺序执行解析的东西:
这是因为
parse
(显然)返回一个Deferred
,将其作为回调添加到getPage
返回的回调会导致Deferred
在parse
完成其业务之前不会调用coiterate
添加的回调。在既然您询问了惯用的扭曲代码,我还自由地对它进行了一点现代化(使用
task.react
而不是手动运行reactor,通过内联表达式使事情更简洁等等)。在如果您真的希望有更多的并行解析而不是并行获取,那么类似这样的方法可能会更好地工作:
^{pr2}$您可以看到},因为获取应该可以继续进行。在
parseWhenReady
返回从acquire
返回的Deferred
,因此并行解析一开始就将继续并行提取,因此即使解析器过载,您也不会继续不加区别地获取数据。但是,parallelParse
谨慎地避免返回parse
或release
返回的{(请注意,由于您最初的示例不可运行,因此我根本没有测试过这两个示例中的任何一个。希望即使有bug,意图也很清楚。)
相关问题 更多 >
编程相关推荐