在Python中使用多进程的策略选择

5 投票
2 回答
2243 浏览
提问于 2025-04-17 03:00

我对多进程完全是个新手。最近我在看关于多进程模块的文档,了解了像池(Pool)、线程(Threads)、队列(Queues)这些概念,但我现在完全搞不清楚了。

我想用多进程来改造我的简单HTTP下载器,让它能同时工作多个“工人”。目前我的做法是,下载一个页面,然后解析这个页面,找出有趣的链接。一直这样做,直到所有有趣的链接都下载完。现在,我想用多进程来实现这个功能,但我现在完全不知道该怎么组织这个工作流程。我有两个想法。首先,我想用两个队列。一个队列用来存放需要下载的链接,另一个用来存放需要解析的链接。一个工人负责下载页面,然后把下载的页面放到需要解析的队列里。另一个进程负责解析页面,把找到的有趣链接放到另一个队列里。我预期这个方法会遇到一些问题;首先,为什么要一次下载一个页面,再一次解析一个页面呢?而且,一个进程怎么知道队列里还有东西可以添加,等它把队列里的所有项目都处理完了之后。

我想到的另一个方法是,创建一个可以接收URL作为参数的函数。这个函数下载文档并开始解析链接。每当它遇到一个有趣的链接时,就立即创建一个新的线程,运行和它自己一样的函数。这个方法的问题是,我怎么跟踪所有到处生成的进程,怎么知道还有没有进程在运行。同时,我还想知道怎么限制最大进程数。

所以我现在完全迷茫了。有没有人能给我建议一个好的策略,或者给我一些示例代码,帮我实现这个想法。

2 个回答

2

你所描述的其实就是图的遍历。大多数比深度优先更复杂的图遍历算法,会跟踪两组节点,在你的例子中,这些节点是网址。

第一组叫做“闭合集”,它代表所有已经访问过并处理过的节点。如果在处理某个页面时,发现一个链接已经在闭合集中,那就可以忽略它,因为它已经处理过了。

第二组叫做“开放集”,它包含所有已经找到但还没有处理的节点。

基本的工作机制是,首先把根节点放入开放集中(闭合集一开始是空的,因为还没有处理任何节点),然后开始工作。每个工作者从开放集中取出一个节点,把它复制到闭合集中,处理这个节点,并把发现的新节点添加回开放集中(只要这些新节点不在开放集或闭合集中)。一旦开放集为空(并且没有工作者在处理节点),就说明图已经完全遍历完了。

multiprocessing中实际实现这个过程,可能意味着你会有一个主任务来跟踪开放集和闭合集。如果工作池中的某个工作者表示它准备好工作,主工作者就会负责把节点从开放集中移动到闭合集中,并启动这个工作者。然后,工作者可以把它们找到的所有节点传回给主工作者,而不必担心这些节点是否已经在闭合集中,主工作者会忽略那些已经闭合的节点。

4

这里有一种方法,使用了多进程处理。(非常感谢@Voo,提出了很多代码改进的建议。)

import multiprocessing as mp
import logging
import Queue
import time

logger=mp.log_to_stderr(logging.DEBUG)  # or, 
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages

def worker(url_queue,seen):
    while True:
        url=url_queue.get()
        if url not in seen:
            logger.info('downloading {u}'.format(u=url))
            seen[url]=True
            # Replace this with code to dowload url
            # urllib2.open(...)
            time.sleep(0.5)
            content=url
            logger.debug('parsing {c}'.format(c=content))
            # replace this with code that finds interesting links and
            # puts them in url_queue
            for i in range(3):
                if content<5:
                    u=2*content+i-1
                    logger.debug('adding {u} to url_queue'.format(u=u))
                    time.sleep(0.5)
                    url_queue.put(u)
        else:
            logger.debug('skipping {u}; seen before'.format(u=url))
        url_queue.task_done()

if __name__=='__main__':
    num_workers=4
    url_queue=mp.JoinableQueue()
    manager=mp.Manager()
    seen=manager.dict()

    # prime the url queue with at least one url
    url_queue.put(1)
    downloaders=[mp.Process(target=worker,args=(url_queue,seen))
                 for i in range(num_workers)]
    for p in downloaders:
        p.daemon=True
        p.start()
    url_queue.join()
  • 创建了一个包含4个工作进程的池。
  • 有一个叫做 url_queueJoinableQueue 队列。
  • 每个工作进程从 url_queue 中获取一个网址,找到新的网址并把它们添加到 url_queue 中。
  • 只有在添加了新的网址后,它才会调用 url_queue.task_done()
  • 主进程会调用 url_queue.join()。这会让主进程停下来,直到 url_queue 中的每个任务都调用了 task_done
  • 由于工作进程的 daemon 属性被设置为 True,当主进程结束时,它们也会结束。

这个例子中使用的所有组件也可以在 Doug Hellman 的优秀 Python 周模块教程中找到,专门讲解多进程处理

撰写回答