python -> 多进程模块

6 投票
2 回答
4959 浏览
提问于 2025-04-16 03:24

我想要实现的目标是:

  1. 我有大约一百万个文件需要处理,并把处理后的内容追加到一个文件里。
  2. 因为单个进程处理起来太慢,所以这个方法不行。
  3. 在Python中不使用线程,因为这实际上还是在运行一个进程(由于全局解释器锁GIL的限制)。
  4. 所以我决定使用多进程模块,也就是启动4个子进程来充分利用所有的处理器核心。

到目前为止一切顺利,现在我需要一个所有子进程都能访问的共享对象。我使用了多进程模块中的队列(Queues)。另外,所有子进程需要把它们的输出写入同一个文件。我想这可能是使用锁(Locks)的地方。在这种设置下,当我运行程序时,没有出现错误(所以父进程看起来没问题),但程序就卡住了。当我按下ctrl-C时,我看到了一些错误信息(每个子进程都有一个)。而且输出文件里没有任何内容。以下是代码(注意在没有多进程的情况下,代码运行得很好) -

import os
import glob
from multiprocessing import Process, Queue, Pool

data_file  = open('out.txt', 'w+')

def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return

def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop

    # this is the block of code that needs correction.
    if multi_process:
        # One way to spawn 4 processes
        # pool = Pool(processes=4) #Start worker processes
        # res  = pool.apply_async(worker, [task_queue, data_file])

        # But I chose to do it like this for now.
        for i in range(4):
            proc = Process(target=worker, args=[task_queue])
            proc.start()
    else: # single process mode is working fine!
        worker(task_queue)
    data_file.close()
    return

我哪里出错了?我还尝试在启动每个进程时把打开的文件对象传递给它们,但没有效果。例如:Process(target=worker, args=[task_queue, data_file])。但这并没有改变任何事情。我觉得子进程出于某种原因无法写入文件。可能是file_object的实例在启动时没有被复制,或者是其他什么问题……有人知道怎么回事吗?

额外问题:有没有办法保持一个持久的mysql连接打开,并把它传递给子进程?也就是说,我在父进程中打开一个mysql连接,这个连接应该对所有子进程都可用。基本上这相当于Python中的共享内存。这里有什么想法吗?

2 个回答

3

关于多进程的文档提到了一些在不同进程之间共享状态的方法:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

我相信每个进程都会有一个新的解释器,然后目标函数和参数会被加载到这个解释器里。在这种情况下,你脚本中的全局命名空间会和你的工作函数绑定在一起,所以数据文件应该是可以访问的。不过,我不太确定文件描述符在复制过程中会发生什么。你有没有尝试把文件对象作为参数之一传递过去?

另一种方法是传递一个队列(Queue),这个队列用来保存工作进程的结果。工作进程会把结果放入这个队列,而主代码则从队列中获取结果并写入文件。

4

虽然我和Eric的讨论很有收获,但后来我发现了一种更好的方法。在多进程模块里,有一个叫做“Pool”的方法,非常适合我的需求。

它会根据我系统的核心数量自动优化,也就是说,它只会启动和核心数量一样多的进程。当然,这个设置是可以自定义的。下面是代码,可能对以后的人有帮助-

from multiprocessing import Pool

def main():
    po = Pool()
    for file in glob.glob('*.csv'):
        filepath = os.path.join(DATA_DIR, file)
        po.apply_async(mine_page, (filepath,), callback=save_data)
    po.close()
    po.join()
    file_ptr.close()

def mine_page(filepath):
    #do whatever it is that you want to do in a separate process.
    return data

def save_data(data):
    #data is a object. Store it in a file, mysql or...
    return

我还在研究这个庞大的模块。现在不太确定save_data()是由父进程执行的,还是这个函数是由子进程调用的。如果是子进程在保存数据,可能在某些情况下会出现并发问题。如果有人对这个模块有更多的经验,欢迎分享更多的知识……

撰写回答