如何加速Python中的网页爬虫

5 投票
2 回答
6861 浏览
提问于 2025-04-18 05:20

我正在做一个学校的项目,想要获取关于电影的数据。我已经写了一个脚本,可以从IMDbPY和Open Movie DB API(omdbapi.com)获取我需要的数据。现在遇到的挑战是,我需要获取22,305部电影的数据,而每次请求大约需要0.7秒。也就是说,我现在的脚本大约需要8个小时才能完成。我在寻找任何可以同时发送多个请求的方法,或者其他能显著加快获取数据速度的建议。

import urllib2
import json
import pandas as pd
import time
import imdb

start_time = time.time() #record time at beginning of script

#used to make imdb.com think we are getting this data from a browser
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = { 'User-Agent' : user_agent }

#Open Movie Database Query url for IMDb IDs
url = 'http://www.omdbapi.com/?tomatoes=true&i='

#read the ids from the imdb_id csv file
imdb_ids = pd.read_csv('ids.csv')

cols = [u'Plot', u'Rated', u'tomatoImage', u'Title', u'DVD', u'tomatoMeter',
 u'Writer', u'tomatoUserRating', u'Production', u'Actors', u'tomatoFresh',
 u'Type', u'imdbVotes', u'Website', u'tomatoConsensus', u'Poster', u'tomatoRotten',
 u'Director', u'Released', u'tomatoUserReviews', u'Awards', u'Genre', u'tomatoUserMeter',
 u'imdbRating', u'Language', u'Country', u'imdbpy_budget', u'BoxOffice', u'Runtime',
 u'tomatoReviews', u'imdbID', u'Metascore', u'Response', u'tomatoRating', u'Year',
 u'imdbpy_gross']

#create movies dataframe
movies = pd.DataFrame(columns=cols)

i=0
for i in range(len(imdb_ids)-1):

    start = time.time()
    req = urllib2.Request(url + str(imdb_ids.ix[i,0]), None, headers) #request page
    response = urllib2.urlopen(req) #actually call the html request
    the_page = response.read() #read the json from the omdbapi query
    movie_json = json.loads(the_page) #convert the json to a dict

    #get the gross revenue and budget from IMDbPy
    data = imdb.IMDb()
    movie_id = imdb_ids.ix[i,['imdb_id']]
    movie_id = movie_id.to_string()
    movie_id = int(movie_id[-7:])
    data = data.get_movie_business(movie_id)
    data = data['data']
    data = data['business']

    #get the budget $ amount out of the budget IMDbPy string
    try:
        budget = data['budget']
        budget = budget[0]
        budget = budget.replace('$', '')
        budget = budget.replace(',', '')
        budget = budget.split(' ')
        budget = str(budget[0]) 
    except:
        None

    #get the gross $ amount out of the gross IMDbPy string
    try:
        budget = data['budget']
        budget = budget[0]
        budget = budget.replace('$', '')
        budget = budget.replace(',', '')
        budget = budget.split(' ')
        budget = str(budget[0])

        #get the gross $ amount out of the gross IMDbPy string
        gross = data['gross']
        gross = gross[0]
        gross = gross.replace('$', '')
        gross = gross.replace(',', '')
        gross = gross.split(' ')
        gross = str(gross[0])
    except:
        None

    #add gross to the movies dict 
    try:
        movie_json[u'imdbpy_gross'] = gross
    except:
        movie_json[u'imdbpy_gross'] = 0

    #add gross to the movies dict    
    try:
        movie_json[u'imdbpy_budget'] = budget
    except:
        movie_json[u'imdbpy_budget'] = 0

    #create new dataframe that can be merged to movies DF    
    tempDF = pd.DataFrame.from_dict(movie_json, orient='index')
    tempDF = tempDF.T

    #add the new movie to the movies dataframe
    movies = movies.append(tempDF, ignore_index=True)
    end = time.time()
    time_took = round(end-start, 2)
    percentage = round(((i+1) / float(len(imdb_ids))) * 100,1)
    print i+1,"of",len(imdb_ids),"(" + str(percentage)+'%)','completed',time_took,'sec'
    #increment counter
    i+=1  

#save the dataframe to a csv file            
movies.to_csv('movie_data.csv', index=False)
end_time = time.time()
print round((end_time-start_time)/60,1), "min"

2 个回答

2

在进行网页抓取时,我们通常会遇到两种瓶颈:

  1. IO阻塞 - 每当我们发送请求时,需要等待服务器的响应,这可能会导致整个程序停滞。
  2. CPU阻塞 - 在解析抓取到的内容时,我们的代码可能会受到CPU处理能力的限制。

CPU速度

CPU阻塞比较好解决 - 我们可以启动更多的进程。一般来说,1个CPU核心可以高效地处理1个进程。所以如果我们的抓取程序在一台有12个CPU核心的机器上运行,我们可以启动12个进程,从而实现12倍的速度提升:

from concurrent.futures import ProcessPoolExecutor

def parse(html):
    ...  # CPU intensive parsing
    
htmls = [...]
with ProcessPoolExecutor() as executor:
    for result in executor.map(parse, htmls):
        print(result)

Python的ProcessPoolExecutor可以启动与CPU核心数量相等的最佳线程,并通过这些线程分配任务。

IO速度

对于IO阻塞,我们有更多的选择,因为我们的目标是消除无用的等待,这可以通过线程、进程和异步循环来实现。

如果我们要发送成千上万的请求,就不能启动数百个进程。线程的开销会小一些,但还有更好的选择 - 异步循环(asyncio)

异步循环可以以不特定的顺序执行任务。换句话说,当任务A被阻塞时,任务B可以接管程序。这对于网页抓取来说非常合适,因为几乎没有额外的计算开销。我们可以在一个程序中处理成千上万的请求。

不幸的是,要让异步工作,我们需要使用支持异步的Python库。例如,通过使用httpxasyncio,我们可以显著加快抓取速度:

# comparing synchronous `requests`:
import requests
from time import time

_start = time()
for i in range(50):
    request.get("http://httpbin.org/delay/1")
print(f"finished in: {time() - _start:.2f} seconds")
# finished in: 52.21 seconds

# versus asynchronous `httpx`
import httpx
import asyncio
from time import time

_start = time()

async def main():
    async with httpx.AsyncClient() as client:
        tasks = [client.get("http://httpbin.org/delay/1") for i in range(50)]
        for response_future in asyncio.as_completed(tasks):
            response = await response_future
    print(f"finished in: {time() - _start:.2f} seconds")

asyncio.run(main())
# finished in: 3.55 seconds

结合两者

通过异步代码,我们可以避免IO阻塞,而通过进程,我们可以扩展CPU密集型的解析任务 - 这是优化网页抓取的完美组合:

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

import httpx


async def scrape(urls):
    """this is our async scraper that scrapes"""
    results = []
    async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
        scrape_tasks = [client.get(url) for url in urls]
        for response_f in asyncio.as_completed(scrape_tasks):
            response = await response_f
            # emulate data parsing/calculation
            sleep(0.5)
            ...
            results.append("done")
    return results


def scrape_wrapper(args):
    i, urls = args
    print(f"subprocess {i} started")
    result = asyncio.run(scrape(urls))
    print(f"subprocess {i} ended")
    return result


def multi_process(urls):
    _start = time()

    batches = []
    batch_size = multiprocessing.cpu_count() - 1  # let's keep 1 core for ourselves
    print(f"scraping {len(urls)} urls through {batch_size} processes")
    for i in range(0, len(urls), batch_size):
        batches.append(urls[i : i + batch_size])
    with ProcessPoolExecutor() as executor:
        for result in executor.map(scrape_wrapper, enumerate(batches)):
            print(result)
        print("done")

    print(f"multi-process finished in {time() - _start:.2f}")

def single_process(urls):
    _start = time()
    results = asyncio.run(scrape(urls))
    print(f"single-process finished in {time() - _start:.2f}")



if __name__ == "__main__":
    urls = ["http://httpbin.org/delay/1" for i in range(100)]
    multi_process(urls)
    # multi-process finished in 7.22
    single_process(urls)
    # single-process finished in 51.28

这些基础概念听起来复杂,但一旦你把问题简化到根本,解决方案其实很简单,并且已经在Python中存在了!

想了解更多相关内容,可以查看我的博客 网页抓取速度:进程、线程和异步

9

使用Eventlet库进行并发抓取

根据评论的建议,你应该并发地抓取你的数据。这可以通过使用 treadingmultiprocessing 或者 eventlet 来实现。

安装eventlet

$ pip install eventlet

尝试使用 eventlet 的网络爬虫示例

查看: http://eventlet.net/doc/examples.html#web-crawler

理解使用 eventlet 的并发

使用 threading 时,系统会在你的线程之间切换。这在你需要访问一些共享数据时会带来很大问题,因为你永远不知道哪个线程正在访问你的数据。于是你就得开始使用同步块、锁、信号量等来同步对共享数据的访问。

而使用 eventlet 就简单多了——你始终只运行一个线程,只有在进行输入输出操作或其他 eventlet 调用时才会切换线程。其余的代码可以不被打断地运行,也不必担心其他线程会干扰到你的数据。

你只需注意以下几点:

  • 所有的输入输出操作必须是非阻塞的(这通常很简单,eventlet 为你需要的大部分输入输出操作提供了非阻塞版本)。

  • 你的其余代码不能消耗太多CPU资源,否则会阻止“绿色”线程之间的切换,导致“绿色”多线程的优势消失。

使用 eventlet 的一个很大优势是,它允许你以简单直接的方式编写代码,而不需要过多地使用锁、信号量等。

eventlet 应用到你的代码中

如果我理解得没错,待抓取的URL列表是提前知道的,并且在你的分析中处理的顺序并不重要。这将允许你几乎直接复制 eventlet 的示例。我看到索引 i 有一定的意义,所以你可以考虑将URL和索引混合成一个元组,并将它们作为独立的任务进行处理。

当然还有其他方法,但就我个人而言,我发现 eventlet 相比其他技术使用起来真的很简单,同时能获得很好的效果(尤其是在抓取数据时)。你只需掌握主要概念,并稍微小心遵循 eventlet 的要求(保持非阻塞)。

使用requests和eventlet抓取URL - erequests

有多种包可以与 requests 一起进行异步处理,其中一个使用 eventlet,名为 erequests,可以查看 https://github.com/saghul/erequests

简单示例抓取一组URL

import erequests

# have list of urls to fetch
urls = [
    'http://www.heroku.com',
    'http://python-tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]
# erequests.async.get(url) creates asynchronous request
async_reqs = [erequests.async.get(url) for url in urls]
# each async request is ready to go, but not yet performed

# erequests.map will call each async request to the action
# what returns processed request `req`
for req in erequests.map(async_reqs):
    if req.ok:
        content = req.content
        # process it here
        print "processing data from:", req.url

处理这个特定问题的挑战

我们能够抓取并处理所有需要的URL。但在这个问题中,处理是与源数据中的特定记录相关的,因此我们需要将处理的请求与我们需要的记录索引匹配,以获取进一步的详细信息进行最终处理。

正如我们稍后将看到的,异步处理并不遵循请求的顺序,有些请求处理得更快,有些则更慢,而 map 会返回任何完成的请求。

一种选择是将给定URL的索引附加到请求中,并在处理返回的数据时使用它。

复杂示例:抓取和处理URL并保留URL索引

注意:以下示例相对复杂,如果你能接受上面提供的解决方案,可以跳过这一部分。但请确保你不会遇到下面检测到并解决的问题(URL被修改,请求跟随重定向)。

import erequests
from itertools import count, izip
from functools import partial

urls = [
    'http://www.heroku.com',
    'http://python-tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

def print_url_index(index, req, *args, **kwargs):
    content_length = req.headers.get("content-length", None)
    todo = "PROCESS" if req.status_code == 200 else "WAIT, NOT YET READY"
    print "{todo}: index: {index}: status: {req.status_code}: length: {content_length}, {req.url}".format(**locals())

async_reqs = (erequests.async.get(url, hooks={"response": partial(print_url_index, i)}) for i, url in izip(count(), urls))

for req in erequests.map(async_reqs):
    pass

为请求附加钩子

requests(以及 erequests)允许定义一个名为 response 的事件钩子。每当请求收到响应时,这个钩子函数就会被调用,可以执行某些操作,甚至修改响应。

以下行定义了一个响应的钩子:

erequests.async.get(url, hooks={"response": partial(print_url_index, i)})

将URL索引传递给钩子函数

任何钩子的签名应为 func(req, *args, *kwargs)

但我们还需要将正在处理的URL的索引传递给钩子函数。

为此,我们使用 functools.partial,它允许通过将某些参数固定为特定值来创建简化的函数。这正是我们需要的,如果你查看 print_url_index 的签名,我们只需固定 index 的值,其余部分将符合钩子函数的要求。

在我们的调用中,我们使用 partial 和简化函数 print_url_index 的名称,并为每个URL提供唯一的索引。

索引可以通过 enumerate 在循环中提供,对于参数较多的情况,我们可以更高效地使用 count,它每次生成一个从0开始递增的数字。

让我们运行它:

$ python ereq.py
WAIT, NOT YET READY: index: 3: status: 301: length: 66, http://python-requests.org/
WAIT, NOT YET READY: index: 4: status: 301: length: 58, http://kennethreitz.com/
WAIT, NOT YET READY: index: 0: status: 301: length: None, http://www.heroku.com/
PROCESS: index: 2: status: 200: length: 7700, http://httpbin.org/
WAIT, NOT YET READY: index: 1: status: 301: length: 64, http://python-tablib.org/
WAIT, NOT YET READY: index: 4: status: 301: length: None, http://kennethreitz.org
WAIT, NOT YET READY: index: 3: status: 302: length: 0, http://docs.python-requests.org
WAIT, NOT YET READY: index: 1: status: 302: length: 0, http://docs.python-tablib.org
PROCESS: index: 3: status: 200: length: None, http://docs.python-requests.org/en/latest/
PROCESS: index: 1: status: 200: length: None, http://docs.python-tablib.org/en/latest/
PROCESS: index: 0: status: 200: length: 12064, https://www.heroku.com/
PROCESS: index: 4: status: 200: length: 10478, http://www.kennethreitz.org/

这表明:

  • 请求并不是按照生成的顺序处理的
  • 有些请求会跟随重定向,因此钩子函数会被多次调用
  • 仔细检查URL值,我们可以看到,原始列表 urls 中没有任何URL被响应报告,即使对于索引2,我们也得到了额外的 / 附加。这就是为什么简单地在原始URL列表中查找响应URL不会帮助我们的原因。

撰写回答