如何加速Python中的网页爬虫
我正在做一个学校的项目,想要获取关于电影的数据。我已经写了一个脚本,可以从IMDbPY和Open Movie DB API(omdbapi.com)获取我需要的数据。现在遇到的挑战是,我需要获取22,305部电影的数据,而每次请求大约需要0.7秒。也就是说,我现在的脚本大约需要8个小时才能完成。我在寻找任何可以同时发送多个请求的方法,或者其他能显著加快获取数据速度的建议。
import urllib2
import json
import pandas as pd
import time
import imdb
start_time = time.time() #record time at beginning of script
#used to make imdb.com think we are getting this data from a browser
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = { 'User-Agent' : user_agent }
#Open Movie Database Query url for IMDb IDs
url = 'http://www.omdbapi.com/?tomatoes=true&i='
#read the ids from the imdb_id csv file
imdb_ids = pd.read_csv('ids.csv')
cols = [u'Plot', u'Rated', u'tomatoImage', u'Title', u'DVD', u'tomatoMeter',
u'Writer', u'tomatoUserRating', u'Production', u'Actors', u'tomatoFresh',
u'Type', u'imdbVotes', u'Website', u'tomatoConsensus', u'Poster', u'tomatoRotten',
u'Director', u'Released', u'tomatoUserReviews', u'Awards', u'Genre', u'tomatoUserMeter',
u'imdbRating', u'Language', u'Country', u'imdbpy_budget', u'BoxOffice', u'Runtime',
u'tomatoReviews', u'imdbID', u'Metascore', u'Response', u'tomatoRating', u'Year',
u'imdbpy_gross']
#create movies dataframe
movies = pd.DataFrame(columns=cols)
i=0
for i in range(len(imdb_ids)-1):
start = time.time()
req = urllib2.Request(url + str(imdb_ids.ix[i,0]), None, headers) #request page
response = urllib2.urlopen(req) #actually call the html request
the_page = response.read() #read the json from the omdbapi query
movie_json = json.loads(the_page) #convert the json to a dict
#get the gross revenue and budget from IMDbPy
data = imdb.IMDb()
movie_id = imdb_ids.ix[i,['imdb_id']]
movie_id = movie_id.to_string()
movie_id = int(movie_id[-7:])
data = data.get_movie_business(movie_id)
data = data['data']
data = data['business']
#get the budget $ amount out of the budget IMDbPy string
try:
budget = data['budget']
budget = budget[0]
budget = budget.replace('$', '')
budget = budget.replace(',', '')
budget = budget.split(' ')
budget = str(budget[0])
except:
None
#get the gross $ amount out of the gross IMDbPy string
try:
budget = data['budget']
budget = budget[0]
budget = budget.replace('$', '')
budget = budget.replace(',', '')
budget = budget.split(' ')
budget = str(budget[0])
#get the gross $ amount out of the gross IMDbPy string
gross = data['gross']
gross = gross[0]
gross = gross.replace('$', '')
gross = gross.replace(',', '')
gross = gross.split(' ')
gross = str(gross[0])
except:
None
#add gross to the movies dict
try:
movie_json[u'imdbpy_gross'] = gross
except:
movie_json[u'imdbpy_gross'] = 0
#add gross to the movies dict
try:
movie_json[u'imdbpy_budget'] = budget
except:
movie_json[u'imdbpy_budget'] = 0
#create new dataframe that can be merged to movies DF
tempDF = pd.DataFrame.from_dict(movie_json, orient='index')
tempDF = tempDF.T
#add the new movie to the movies dataframe
movies = movies.append(tempDF, ignore_index=True)
end = time.time()
time_took = round(end-start, 2)
percentage = round(((i+1) / float(len(imdb_ids))) * 100,1)
print i+1,"of",len(imdb_ids),"(" + str(percentage)+'%)','completed',time_took,'sec'
#increment counter
i+=1
#save the dataframe to a csv file
movies.to_csv('movie_data.csv', index=False)
end_time = time.time()
print round((end_time-start_time)/60,1), "min"
2 个回答
在进行网页抓取时,我们通常会遇到两种瓶颈:
- IO阻塞 - 每当我们发送请求时,需要等待服务器的响应,这可能会导致整个程序停滞。
- CPU阻塞 - 在解析抓取到的内容时,我们的代码可能会受到CPU处理能力的限制。
CPU速度
CPU阻塞比较好解决 - 我们可以启动更多的进程。一般来说,1个CPU核心可以高效地处理1个进程。所以如果我们的抓取程序在一台有12个CPU核心的机器上运行,我们可以启动12个进程,从而实现12倍的速度提升:
from concurrent.futures import ProcessPoolExecutor
def parse(html):
... # CPU intensive parsing
htmls = [...]
with ProcessPoolExecutor() as executor:
for result in executor.map(parse, htmls):
print(result)
Python的ProcessPoolExecutor
可以启动与CPU核心数量相等的最佳线程,并通过这些线程分配任务。
IO速度
对于IO阻塞,我们有更多的选择,因为我们的目标是消除无用的等待,这可以通过线程、进程和异步循环来实现。
如果我们要发送成千上万的请求,就不能启动数百个进程。线程的开销会小一些,但还有更好的选择 - 异步循环(asyncio)。
异步循环可以以不特定的顺序执行任务。换句话说,当任务A被阻塞时,任务B可以接管程序。这对于网页抓取来说非常合适,因为几乎没有额外的计算开销。我们可以在一个程序中处理成千上万的请求。
不幸的是,要让异步工作,我们需要使用支持异步的Python库。例如,通过使用httpx和asyncio
,我们可以显著加快抓取速度:
# comparing synchronous `requests`:
import requests
from time import time
_start = time()
for i in range(50):
request.get("http://httpbin.org/delay/1")
print(f"finished in: {time() - _start:.2f} seconds")
# finished in: 52.21 seconds
# versus asynchronous `httpx`
import httpx
import asyncio
from time import time
_start = time()
async def main():
async with httpx.AsyncClient() as client:
tasks = [client.get("http://httpbin.org/delay/1") for i in range(50)]
for response_future in asyncio.as_completed(tasks):
response = await response_future
print(f"finished in: {time() - _start:.2f} seconds")
asyncio.run(main())
# finished in: 3.55 seconds
结合两者
通过异步代码,我们可以避免IO阻塞,而通过进程,我们可以扩展CPU密集型的解析任务 - 这是优化网页抓取的完美组合:
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
import httpx
async def scrape(urls):
"""this is our async scraper that scrapes"""
results = []
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
scrape_tasks = [client.get(url) for url in urls]
for response_f in asyncio.as_completed(scrape_tasks):
response = await response_f
# emulate data parsing/calculation
sleep(0.5)
...
results.append("done")
return results
def scrape_wrapper(args):
i, urls = args
print(f"subprocess {i} started")
result = asyncio.run(scrape(urls))
print(f"subprocess {i} ended")
return result
def multi_process(urls):
_start = time()
batches = []
batch_size = multiprocessing.cpu_count() - 1 # let's keep 1 core for ourselves
print(f"scraping {len(urls)} urls through {batch_size} processes")
for i in range(0, len(urls), batch_size):
batches.append(urls[i : i + batch_size])
with ProcessPoolExecutor() as executor:
for result in executor.map(scrape_wrapper, enumerate(batches)):
print(result)
print("done")
print(f"multi-process finished in {time() - _start:.2f}")
def single_process(urls):
_start = time()
results = asyncio.run(scrape(urls))
print(f"single-process finished in {time() - _start:.2f}")
if __name__ == "__main__":
urls = ["http://httpbin.org/delay/1" for i in range(100)]
multi_process(urls)
# multi-process finished in 7.22
single_process(urls)
# single-process finished in 51.28
这些基础概念听起来复杂,但一旦你把问题简化到根本,解决方案其实很简单,并且已经在Python中存在了!
想了解更多相关内容,可以查看我的博客 网页抓取速度:进程、线程和异步
使用Eventlet库进行并发抓取
根据评论的建议,你应该并发地抓取你的数据。这可以通过使用 treading
、multiprocessing
或者 eventlet
来实现。
安装eventlet
$ pip install eventlet
尝试使用 eventlet
的网络爬虫示例
查看: http://eventlet.net/doc/examples.html#web-crawler
理解使用 eventlet
的并发
使用 threading
时,系统会在你的线程之间切换。这在你需要访问一些共享数据时会带来很大问题,因为你永远不知道哪个线程正在访问你的数据。于是你就得开始使用同步块、锁、信号量等来同步对共享数据的访问。
而使用 eventlet
就简单多了——你始终只运行一个线程,只有在进行输入输出操作或其他 eventlet
调用时才会切换线程。其余的代码可以不被打断地运行,也不必担心其他线程会干扰到你的数据。
你只需注意以下几点:
所有的输入输出操作必须是非阻塞的(这通常很简单,
eventlet
为你需要的大部分输入输出操作提供了非阻塞版本)。你的其余代码不能消耗太多CPU资源,否则会阻止“绿色”线程之间的切换,导致“绿色”多线程的优势消失。
使用 eventlet
的一个很大优势是,它允许你以简单直接的方式编写代码,而不需要过多地使用锁、信号量等。
将 eventlet
应用到你的代码中
如果我理解得没错,待抓取的URL列表是提前知道的,并且在你的分析中处理的顺序并不重要。这将允许你几乎直接复制 eventlet
的示例。我看到索引 i
有一定的意义,所以你可以考虑将URL和索引混合成一个元组,并将它们作为独立的任务进行处理。
当然还有其他方法,但就我个人而言,我发现 eventlet
相比其他技术使用起来真的很简单,同时能获得很好的效果(尤其是在抓取数据时)。你只需掌握主要概念,并稍微小心遵循 eventlet
的要求(保持非阻塞)。
使用requests和eventlet抓取URL - erequests
有多种包可以与 requests
一起进行异步处理,其中一个使用 eventlet
,名为 erequests
,可以查看 https://github.com/saghul/erequests
简单示例抓取一组URL
import erequests
# have list of urls to fetch
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
# erequests.async.get(url) creates asynchronous request
async_reqs = [erequests.async.get(url) for url in urls]
# each async request is ready to go, but not yet performed
# erequests.map will call each async request to the action
# what returns processed request `req`
for req in erequests.map(async_reqs):
if req.ok:
content = req.content
# process it here
print "processing data from:", req.url
处理这个特定问题的挑战
我们能够抓取并处理所有需要的URL。但在这个问题中,处理是与源数据中的特定记录相关的,因此我们需要将处理的请求与我们需要的记录索引匹配,以获取进一步的详细信息进行最终处理。
正如我们稍后将看到的,异步处理并不遵循请求的顺序,有些请求处理得更快,有些则更慢,而 map
会返回任何完成的请求。
一种选择是将给定URL的索引附加到请求中,并在处理返回的数据时使用它。
复杂示例:抓取和处理URL并保留URL索引
注意:以下示例相对复杂,如果你能接受上面提供的解决方案,可以跳过这一部分。但请确保你不会遇到下面检测到并解决的问题(URL被修改,请求跟随重定向)。
import erequests
from itertools import count, izip
from functools import partial
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
def print_url_index(index, req, *args, **kwargs):
content_length = req.headers.get("content-length", None)
todo = "PROCESS" if req.status_code == 200 else "WAIT, NOT YET READY"
print "{todo}: index: {index}: status: {req.status_code}: length: {content_length}, {req.url}".format(**locals())
async_reqs = (erequests.async.get(url, hooks={"response": partial(print_url_index, i)}) for i, url in izip(count(), urls))
for req in erequests.map(async_reqs):
pass
为请求附加钩子
requests
(以及 erequests
)允许定义一个名为 response
的事件钩子。每当请求收到响应时,这个钩子函数就会被调用,可以执行某些操作,甚至修改响应。
以下行定义了一个响应的钩子:
erequests.async.get(url, hooks={"response": partial(print_url_index, i)})
将URL索引传递给钩子函数
任何钩子的签名应为 func(req, *args, *kwargs)
但我们还需要将正在处理的URL的索引传递给钩子函数。
为此,我们使用 functools.partial
,它允许通过将某些参数固定为特定值来创建简化的函数。这正是我们需要的,如果你查看 print_url_index
的签名,我们只需固定 index
的值,其余部分将符合钩子函数的要求。
在我们的调用中,我们使用 partial
和简化函数 print_url_index
的名称,并为每个URL提供唯一的索引。
索引可以通过 enumerate
在循环中提供,对于参数较多的情况,我们可以更高效地使用 count
,它每次生成一个从0开始递增的数字。
让我们运行它:
$ python ereq.py
WAIT, NOT YET READY: index: 3: status: 301: length: 66, http://python-requests.org/
WAIT, NOT YET READY: index: 4: status: 301: length: 58, http://kennethreitz.com/
WAIT, NOT YET READY: index: 0: status: 301: length: None, http://www.heroku.com/
PROCESS: index: 2: status: 200: length: 7700, http://httpbin.org/
WAIT, NOT YET READY: index: 1: status: 301: length: 64, http://python-tablib.org/
WAIT, NOT YET READY: index: 4: status: 301: length: None, http://kennethreitz.org
WAIT, NOT YET READY: index: 3: status: 302: length: 0, http://docs.python-requests.org
WAIT, NOT YET READY: index: 1: status: 302: length: 0, http://docs.python-tablib.org
PROCESS: index: 3: status: 200: length: None, http://docs.python-requests.org/en/latest/
PROCESS: index: 1: status: 200: length: None, http://docs.python-tablib.org/en/latest/
PROCESS: index: 0: status: 200: length: 12064, https://www.heroku.com/
PROCESS: index: 4: status: 200: length: 10478, http://www.kennethreitz.org/
这表明:
- 请求并不是按照生成的顺序处理的
- 有些请求会跟随重定向,因此钩子函数会被多次调用
- 仔细检查URL值,我们可以看到,原始列表
urls
中没有任何URL被响应报告,即使对于索引2,我们也得到了额外的/
附加。这就是为什么简单地在原始URL列表中查找响应URL不会帮助我们的原因。