Python - 多个同时线程池
我正在用Python写一个网页抓取程序,使用的是httplib2和lxml(没错,我知道可以用scrapy,但我们先不提这个……)。这个抓取程序需要处理大约15000个页面,提取出大约40万个项目。我已经把提取项目的代码优化得几乎瞬间完成了,但从服务器下载页面的部分还是非常慢。我想通过并发来解决这个问题。不过,我不能每次都依赖于每个页面都需要被解析。我尝试过使用一个线程池(类似于multiprocessing.pool,但用的是线程——这应该没问题,因为这是一个I/O密集型的过程),但我想不出一个优雅(或者有效)的办法来让所有线程在最后一个索引项的日期大于我们正在处理的项时停止。目前,我正在尝试用两个线程池的实例——一个用来下载每个页面,另一个用来解析这些页面。下面是一个简化的代码示例:
#! /usr/bin/env python2
import httplib2
from Queue import PriorityQueue
from multiprocessing.pool import ThreadPool
from lxml.html import fromstring
pages = [x for x in range(1000)]
page_queue = PriorityQueue(1000)
url = "http://www.google.com"
def get_page(page):
#Grabs google.com
h = httplib2.Http(".cache")
resp, content = h.request(url, "GET")
tree = fromstring(str(content), base_url=url)
page_queue.put((page, tree))
print page_queue.qsize()
def parse_page():
page_num, page = page_queue.get()
print "Parsing page #" + str(page_num)
#do more stuff with the page here
page_queue.task_done()
if __name__ == "__main__":
collect_pool = ThreadPool()
collect_pool.map_async(get_page, pages)
collect_pool.close()
parse_pool = ThreadPool()
parse_pool.apply_async(parse_page)
parse_pool.close()
parse_pool.join()
collect_pool.join()
page_queue.join()
不过,运行这段代码并没有达到我预期的效果——我希望能启动两个线程池:一个填充队列,另一个从队列中取出内容进行解析。它开始运行收集线程池并完成,然后再开始解析线程池(我猜是这样,但我没有让代码运行足够长的时间来确认解析线程池是否真的在运行——关键是,似乎只有收集线程池在运行)。我很确定我在调用join()的顺序上搞错了,但我实在想不出应该是什么顺序。我的问题基本上是:我这样做对吗?如果对,那我到底哪里出错了?如果不对,你有什么建议?
1 个回答
首先,从整体上看,你的设计似乎是正确的。使用线程池来收集网页是合理的,因为httlib2模块是同步的。如果是异步库,一个线程就够了;需要注意的是,即使使用httplib2和线程池,任何时候最多也只有一个收集线程在运行,因为有个叫做全局解释器锁(GIL)的东西在限制。
解析池的使用是因为lxml模块是用C/C++写的(假设在解析网页时,全局解释器锁会被释放——这一点可以在lxml的文档或代码中确认!)。如果这个假设不成立,那么使用专门的解析池就没有性能提升,因为只有一个线程能获取到GIL。在这种情况下,使用进程池会更好。
我对线程池的实现不太熟悉,但我猜它和多进程模块中的Pool类类似。基于这个理解,问题似乎在于你只为解析池创建了一个工作项,而在parse_page处理完第一个网页后,它就不再尝试从解析池中取出更多的网页了。而且,也没有提交额外的工作项到这个池中,所以处理就停了,调用parse_pool.close()后,空的池中的线程就结束了。
解决方案是去掉page_queue。get_page()函数应该在收集每个网页时,通过调用apply_async()将工作项放入解析池,而不是把它们放进page_queue。
主线程应该等到collect_queue为空(也就是collect_pool.join()调用返回后),然后再关闭解析池(因为可以确定不会再有工作提交给解析器)。接着,它应该通过调用parse_pool.join()等待解析池变空,然后再退出。
此外,你需要增加connect_pool中的线程数量,以便同时处理更多的HTTP请求。线程池中的默认线程数量是CPU的数量;目前你不能发出超过这个数量的请求。你可以尝试增加到几千或几万的值;观察线程池的CPU使用情况;它不应该接近1个CPU的使用率。