在Python中使用多进程的策略选择
我对多进程完全是个新手。最近我在看关于多进程模块的文档,了解了像池(Pool)、线程(Threads)、队列(Queues)这些概念,但我现在完全搞不清楚了。
我想用多进程来改造我的简单HTTP下载器,让它能同时工作多个“工人”。目前我的做法是,下载一个页面,然后解析这个页面,找出有趣的链接。一直这样做,直到所有有趣的链接都下载完。现在,我想用多进程来实现这个功能,但我现在完全不知道该怎么组织这个工作流程。我有两个想法。首先,我想用两个队列。一个队列用来存放需要下载的链接,另一个用来存放需要解析的链接。一个工人负责下载页面,然后把下载的页面放到需要解析的队列里。另一个进程负责解析页面,把找到的有趣链接放到另一个队列里。我预期这个方法会遇到一些问题;首先,为什么要一次下载一个页面,再一次解析一个页面呢?而且,一个进程怎么知道队列里还有东西可以添加,等它把队列里的所有项目都处理完了之后。
我想到的另一个方法是,创建一个可以接收URL作为参数的函数。这个函数下载文档并开始解析链接。每当它遇到一个有趣的链接时,就立即创建一个新的线程,运行和它自己一样的函数。这个方法的问题是,我怎么跟踪所有到处生成的进程,怎么知道还有没有进程在运行。同时,我还想知道怎么限制最大进程数。
所以我现在完全迷茫了。有没有人能给我建议一个好的策略,或者给我一些示例代码,帮我实现这个想法。
2 个回答
你所描述的其实就是图的遍历。大多数比深度优先更复杂的图遍历算法,会跟踪两组节点,在你的例子中,这些节点是网址。
第一组叫做“闭合集”,它代表所有已经访问过并处理过的节点。如果在处理某个页面时,发现一个链接已经在闭合集中,那就可以忽略它,因为它已经处理过了。
第二组叫做“开放集”,它包含所有已经找到但还没有处理的节点。
基本的工作机制是,首先把根节点放入开放集中(闭合集一开始是空的,因为还没有处理任何节点),然后开始工作。每个工作者从开放集中取出一个节点,把它复制到闭合集中,处理这个节点,并把发现的新节点添加回开放集中(只要这些新节点不在开放集或闭合集中)。一旦开放集为空(并且没有工作者在处理节点),就说明图已经完全遍历完了。
在multiprocessing
中实际实现这个过程,可能意味着你会有一个主任务来跟踪开放集和闭合集。如果工作池中的某个工作者表示它准备好工作,主工作者就会负责把节点从开放集中移动到闭合集中,并启动这个工作者。然后,工作者可以把它们找到的所有节点传回给主工作者,而不必担心这些节点是否已经在闭合集中,主工作者会忽略那些已经闭合的节点。
这里有一种方法,使用了多进程处理。(非常感谢@Voo,提出了很多代码改进的建议。)
import multiprocessing as mp
import logging
import Queue
import time
logger=mp.log_to_stderr(logging.DEBUG) # or,
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages
def worker(url_queue,seen):
while True:
url=url_queue.get()
if url not in seen:
logger.info('downloading {u}'.format(u=url))
seen[url]=True
# Replace this with code to dowload url
# urllib2.open(...)
time.sleep(0.5)
content=url
logger.debug('parsing {c}'.format(c=content))
# replace this with code that finds interesting links and
# puts them in url_queue
for i in range(3):
if content<5:
u=2*content+i-1
logger.debug('adding {u} to url_queue'.format(u=u))
time.sleep(0.5)
url_queue.put(u)
else:
logger.debug('skipping {u}; seen before'.format(u=url))
url_queue.task_done()
if __name__=='__main__':
num_workers=4
url_queue=mp.JoinableQueue()
manager=mp.Manager()
seen=manager.dict()
# prime the url queue with at least one url
url_queue.put(1)
downloaders=[mp.Process(target=worker,args=(url_queue,seen))
for i in range(num_workers)]
for p in downloaders:
p.daemon=True
p.start()
url_queue.join()
- 创建了一个包含4个工作进程的池。
- 有一个叫做
url_queue
的JoinableQueue
队列。 - 每个工作进程从
url_queue
中获取一个网址,找到新的网址并把它们添加到url_queue
中。 - 只有在添加了新的网址后,它才会调用
url_queue.task_done()
。 - 主进程会调用
url_queue.join()
。这会让主进程停下来,直到url_queue
中的每个任务都调用了task_done
。 - 由于工作进程的
daemon
属性被设置为 True,当主进程结束时,它们也会结束。
这个例子中使用的所有组件也可以在 Doug Hellman 的优秀 Python 周模块教程中找到,专门讲解多进程处理。