Python: 类似于`map`的线程处理方法

31 投票
5 回答
22258 浏览
提问于 2025-04-16 01:48

我原以为标准库里有类似的东西,但看来我错了。

我有一堆网址想要同时打开,也就是想用 urlopen 一下子处理多个网址。我希望能有一个像内置的 map 函数那样的东西,不过是通过多个线程同时来完成这项工作。

有没有什么好的模块可以做到这一点呢?

5 个回答

5

这是我实现的多线程映射:

from threading import Thread
from queue import Queue

def thread_map(f, iterable, pool=None):
    """
    Just like [f(x) for x in iterable] but each f(x) in a separate thread.
    :param f: f
    :param iterable: iterable
    :param pool: thread pool, infinite by default
    :return: list if results
    """
    res = {}
    if pool is None:
        def target(arg, num):
            try:
                res[num] = f(arg)
            except:
                res[num] = sys.exc_info()

        threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
    else:
        class WorkerThread(Thread):
            def run(self):
                while True:
                    try:
                        num, arg = queue.get(block=False)
                        try:
                            res[num] = f(arg)
                        except:
                            res[num] = sys.exc_info()
                    except Empty:
                        break

        queue = Queue()
        for i, arg in enumerate(iterable):
            queue.put((i, arg))

        threads = [WorkerThread() for _ in range(pool)]

    [t.start() for t in threads]
    [t.join() for t in threads]
    return [res[i] for i in range(len(res))]
58

multiprocessing.Pool 里,有一个叫 map 的方法。这个方法可以同时处理多个进程。

如果你不喜欢用多个进程的话,可以试试 multiprocessing.dummy,它是用线程来处理的。

import urllib
import multiprocessing.dummy

p = multiprocessing.dummy.Pool(5)
def f(post):
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)

print p.map(f, range(3329361, 3329361 + 5))
20

有人建议我使用 futures 这个包来解决这个问题。我试了一下,感觉还挺好用的。

http://pypi.python.org/pypi/futures

下面是一个例子:

"Download many URLs in parallel."

import functools
import urllib.request
import futures

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(50) as executor:
   future_list = executor.run_to_futures(
           [functools.partial(load_url, url, 30) for url in URLS])

撰写回答