我使用多重处理对一组数据进行大量计算,以减少计算时间。它的工作非常出色,除了一个小警告,当我让我的监听器进程写我的输出时,它以错误的顺序出现,这是非常糟糕的。我需要它以同样的顺序出来。不知道如何做到这一点。下面是示例代码。你知道吗
import numpy, os, multiprocessing
from multiprocessing.sharedctypes import Value, Array, RawArray, RawValue
from multiprocessing import Process, Lock
def domorestuff(value):
value += value # sample, some other calculation
q.put(value)
return
def dostuff(somevalue):
somevalue += 1 # do some calculation instead of just +=1 here
domorestuff(somevalue)
return
def listener(q):
f = open(os.path.join(outdir, fileout.value), 'w')
while 1:
#print("Listener...", flush=True)
m = q.get()
if(m == 'kill'):
break
#print("Listen write...", flush=True)
f.write(str(m) + '\n')
f.flush()
f.close()
def main():
manager = multiprocessing.Manager()
q = manager.Queue()
pool = multiprocessing.Pool(9)
watcher = pool.apply_async(listener, (q,))
pool.map(dostuff, range(8))
q.put('kill')
pool.close()
我希望它能在文件中给出一组线性值,即:
2, 4, 6, 8, 10, 12, 14, 18
但是他们每次都是随机出现的。在不知道如何同步的情况下,当我不使用监听器并且不进行文件编写时,它似乎按线程的数量顺序连接进程。但很难确定,因为我无法将多个线程的输出安全地写入单个文件。你知道吗
为了更清楚一点,处理发生在一个输入文件上,每个线程读取它需要的部分,然后将基于处理的输出写入侦听器。但是,它不是像上面提到的那样,将块按顺序排列,而是按随机顺序排列。你知道吗
@M.Rau实际上是不对的,您可以运行池中的作业并将它们重新连接在一起,保持顺序,幸运的是
multiprocessing
模块使用pool.apply_async
或pool.imap
内置了此功能。你知道吗我稍微清理了一下您的代码(请注意,队列已完全消失),这就是我想到的:
有关更多信息,请参阅an example from the official docs。他们有不同的技巧。顺便说一句,问题中的python代码甚至没有编译,侦听器函数也无关紧要。希望这有帮助!你知道吗
您正在异步运行进程。您不能期望这些独立的进程以任何预期的顺序处理/完成它们的任务。你知道吗
相关问题 更多 >
编程相关推荐