使用协程与线程时吞吐量差异

6 投票
3 回答
7851 浏览
提问于 2025-04-17 12:53

几天前,我在Stack Overflow上问了一个问题,想要帮助我设计一个处理多个HTTP请求的结构。

这是我的情况。我想要一个多生产者、多消费者的系统。我的生产者会爬取和抓取一些网站,把找到的链接放进一个队列里。因为我会爬取多个网站,所以我想要有多个生产者/爬虫。

消费者/工作者会从这个队列中获取链接,向这些链接发送TCP/UDP请求,并把结果保存到我的Django数据库里。我也想要多个工作者,因为每个队列中的项目都是完全独立的。

有人建议我使用协程库,比如Gevent或Eventlet。因为我从来没有使用过协程,我了解到虽然这种编程方式和线程的方式有点相似,但实际上只有一个线程在执行。当发生阻塞调用,比如输入输出操作时,程序会在内存中切换堆栈,另一个绿色线程会接管,直到它遇到某种阻塞的输入输出调用。希望我理解得没错?这是我在Stack Overflow上发的代码:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

这个方法效果很好,因为sleep调用是阻塞调用,当发生sleep事件时,另一个绿色线程会接管。这比顺序执行要快得多。正如你所看到的,我的程序中没有任何代码是故意让一个线程让出执行权给另一个线程。我不明白这怎么适用于我上面的情况,因为我希望所有线程能够同时执行。

一切都运行得很好,但我觉得使用Gevent/Eventlet获得的吞吐量比原来的顺序运行程序要高,但远低于使用真正线程时能达到的效果。

如果我用线程机制重新实现我的程序,我的每个生产者和消费者可以同时工作,而不需要像协程那样频繁切换堆栈。

我应该用线程重新实现吗?我的设计错了吗?我没有看到使用协程的真正好处。

也许我的概念有点模糊,但这是我理解的内容。如果能对我的设计和概念提供一些帮助或澄清,那就太好了。

谢谢!

3 个回答

0

在这种情况下,你的问题不是程序的运行速度(比如选择使用gevent还是线程),而是网络输入输出的速度。这才是决定程序运行快慢的瓶颈。

使用Gevent是一个很好的方法,可以确保瓶颈确实是网络速度,而不是你程序的结构设计。

你应该采用这样的流程:

import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all


patch_all()  # Patch urllib2, etc


def worker(work_queue, output_queue):
    for work_unit in work_queue:
        finished = do_work(work_unit)
        output_queue.put(finished)
        work_queue.task_done()


def producer(input_queue, work_queue):
    for url in input_queue:
        url_list = crawl(url)
        for work in url_list:
            work_queue.put(work)
        input_queue.task_done()


def do_work(work):
    gevent.sleep(0)  # Actually proces link here
    return work


def crawl(url):
    gevent.sleep(0)
    return list(url)  # Actually process url here

input = JoinableQueue()
work = JoinableQueue()
output = Queue()

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]


list_of_urls = ['foo', 'bar']

for url in list_of_urls:
    input.put(url)

# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'

# We now have output!
print 'output:'
for message in output:
    print message
# Or if you'd like, you could use the output as it comes!

你不需要等输入和工作队列完成,我在这里只是演示了一下。

1

gevent在你有很多(绿色)线程的时候非常好用。我测试过成千上万的线程,它运行得非常顺利。你需要确保你用到的所有库,无论是抓取数据还是保存到数据库,都能够支持绿色线程。根据我的了解,如果这些库使用的是Python的socket,gevent的注入应该是有效的。不过,用C语言写的扩展(比如mysqldb)会阻塞,这时候你需要使用绿色线程的替代品。

如果你使用gevent,基本上可以不需要队列了,可以为每个任务创建一个新的(绿色)线程,线程的代码可以简单到像这样:db.save(web.get(address))。gevent会处理当数据库或网络中的某个库阻塞时的情况。只要你的任务能放进内存里,它就能正常工作。

6

如你所见,我的程序里没有任何代码是故意让一个线程暂停,去让另一个线程执行的。我不明白这和我想要的场景有什么关系,因为我希望所有的线程能够同时运行。

这里有一个操作系统的线程,但有好几个绿色线程(greenlets)。在你的情况下,gevent.sleep() 让工作线程可以同时执行。如果你使用经过 gevent 修改的 urllib2,像 urllib2.urlopen(url).read() 这样的阻塞输入输出调用也能做到这一点(通过调用 gevent.monkey.patch_*())。

想要了解如何在单线程环境中实现代码的并发执行,可以参考 《关于协程和并发的有趣课程》

如果你想比较 gevent、线程(threading)和多进程(multiprocessing)之间的吞吐量差异,可以写一段兼容所有方法的代码:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

其余的脚本在所有并发实现中都是一样的:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

不要在模块级别执行代码,把它放在 main() 函数里:

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()

撰写回答