如何使用joblib并行写入文件?JoinableQueue问题
我正在尝试将对超过10万个文件的计算结果写入一个文件。处理一个文件大约需要1秒钟,并且每次只写入一行到输出文件。这个问题本身是“非常容易并行处理”的,我只是遇到了如何正确写入文件(比如CSV格式)的问题。这是我很久以前(大概在Python 3.4的时候)用过的方法:
import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + '\n')
q.task_done()
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
q = JoinableQueue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()
但是今天(在Python 3.6及以上版本中)运行时出现了以下错误:
joblib.externals.loky.process_executor._RemoteTraceback:
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""
如何使用joblib正确地写入一个文件呢?
相关问题:
- 暂无相关问题
1 个回答
1
其实,有一种方法可以完成这个任务,就是使用 multiprocessing.Manager
,像这样:
import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + '\n')
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
m = Manager()
q = m.Queue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()
我们让 Manager
来管理上下文,其他部分保持不变(除了用普通的 Queue
替代 JoinableQueue
)。
如果有人知道更好或更简洁的方法,我会很乐意把它当作答案。