如何使用joblib并行写入文件?JoinableQueue问题

4 投票
1 回答
2388 浏览
提问于 2025-06-08 05:48

我正在尝试将对超过10万个文件的计算结果写入一个文件。处理一个文件大约需要1秒钟,并且每次只写入一行到输出文件。这个问题本身是“非常容易并行处理”的,我只是遇到了如何正确写入文件(比如CSV格式)的问题。这是我很久以前(大概在Python 3.4的时候)用过的方法:

import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    q = JoinableQueue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None) 
    p.join() 

但是今天(在Python 3.6及以上版本中)运行时出现了以下错误:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""

如何使用joblib正确地写入一个文件呢?

相关问题:

  • 暂无相关问题
暂无标签

1 个回答

1

其实,有一种方法可以完成这个任务,就是使用 multiprocessing.Manager,像这样:

import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    m = Manager()
    q = m.Queue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None)
    p.join()

我们让 Manager 来管理上下文,其他部分保持不变(除了用普通的 Queue 替代 JoinableQueue)。

如果有人知道更好或更简洁的方法,我会很乐意把它当作答案。

撰写回答