python -> 多进程模块
我想要实现的目标是:
- 我有大约一百万个文件需要处理,并把处理后的内容追加到一个文件里。
- 因为单个进程处理起来太慢,所以这个方法不行。
- 在Python中不使用线程,因为这实际上还是在运行一个进程(由于全局解释器锁GIL的限制)。
- 所以我决定使用多进程模块,也就是启动4个子进程来充分利用所有的处理器核心。
到目前为止一切顺利,现在我需要一个所有子进程都能访问的共享对象。我使用了多进程模块中的队列(Queues)。另外,所有子进程需要把它们的输出写入同一个文件。我想这可能是使用锁(Locks)的地方。在这种设置下,当我运行程序时,没有出现错误(所以父进程看起来没问题),但程序就卡住了。当我按下ctrl-C时,我看到了一些错误信息(每个子进程都有一个)。而且输出文件里没有任何内容。以下是代码(注意在没有多进程的情况下,代码运行得很好) -
import os
import glob
from multiprocessing import Process, Queue, Pool
data_file = open('out.txt', 'w+')
def worker(task_queue):
for file in iter(task_queue.get, 'STOP'):
data = mine_imdb_page(os.path.join(DATA_DIR, file))
if data:
data_file.write(repr(data)+'\n')
return
def main():
task_queue = Queue()
for file in glob.glob('*.csv'):
task_queue.put(file)
task_queue.put('STOP') # so that worker processes know when to stop
# this is the block of code that needs correction.
if multi_process:
# One way to spawn 4 processes
# pool = Pool(processes=4) #Start worker processes
# res = pool.apply_async(worker, [task_queue, data_file])
# But I chose to do it like this for now.
for i in range(4):
proc = Process(target=worker, args=[task_queue])
proc.start()
else: # single process mode is working fine!
worker(task_queue)
data_file.close()
return
我哪里出错了?我还尝试在启动每个进程时把打开的文件对象传递给它们,但没有效果。例如:Process(target=worker, args=[task_queue, data_file])
。但这并没有改变任何事情。我觉得子进程出于某种原因无法写入文件。可能是file_object
的实例在启动时没有被复制,或者是其他什么问题……有人知道怎么回事吗?
额外问题:有没有办法保持一个持久的mysql连接打开,并把它传递给子进程?也就是说,我在父进程中打开一个mysql连接,这个连接应该对所有子进程都可用。基本上这相当于Python中的共享内存。这里有什么想法吗?
2 个回答
关于多进程的文档提到了一些在不同进程之间共享状态的方法:
http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes
我相信每个进程都会有一个新的解释器,然后目标函数和参数会被加载到这个解释器里。在这种情况下,你脚本中的全局命名空间会和你的工作函数绑定在一起,所以数据文件应该是可以访问的。不过,我不太确定文件描述符在复制过程中会发生什么。你有没有尝试把文件对象作为参数之一传递过去?
另一种方法是传递一个队列(Queue),这个队列用来保存工作进程的结果。工作进程会把结果放入这个队列,而主代码则从队列中获取结果并写入文件。
虽然我和Eric的讨论很有收获,但后来我发现了一种更好的方法。在多进程模块里,有一个叫做“Pool”的方法,非常适合我的需求。
它会根据我系统的核心数量自动优化,也就是说,它只会启动和核心数量一样多的进程。当然,这个设置是可以自定义的。下面是代码,可能对以后的人有帮助-
from multiprocessing import Pool
def main():
po = Pool()
for file in glob.glob('*.csv'):
filepath = os.path.join(DATA_DIR, file)
po.apply_async(mine_page, (filepath,), callback=save_data)
po.close()
po.join()
file_ptr.close()
def mine_page(filepath):
#do whatever it is that you want to do in a separate process.
return data
def save_data(data):
#data is a object. Store it in a file, mysql or...
return
我还在研究这个庞大的模块。现在不太确定save_data()是由父进程执行的,还是这个函数是由子进程调用的。如果是子进程在保存数据,可能在某些情况下会出现并发问题。如果有人对这个模块有更多的经验,欢迎分享更多的知识……