我正在尝试实现一个在线递归并行算法,它是高度可并行化的。我的问题是,我的python实现并没有按照我想要的方式工作。我有两个2D矩阵,我想在时间步长t处每次观察到新的观察值时递归地更新每个列。 我的并行代码是这样的
def apply_async(t):
worker = mp.Pool(processes = 4)
for i in range(4):
X[:,i,np.newaxis], b[:,i,np.newaxis] = worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()
worker.close()
worker.join()
for t in range(p,T):
count = 0
for l in range(p):
for k in range(4):
gn[count]=train[t-l-1,k]
count+=1
G = G*v + gn @ gn.T
Gt = (1/(t-p+1))*G
if __name__ == '__main__':
apply_async(t)
这两个矩阵是X和b。我想直接替换master的内存,因为每个进程只递归更新矩阵的一个特定列。在
为什么这个实现比顺序执行慢?在
有没有什么方法可以恢复每一个时间步骤,而不是杀死它们然后重新创建它们?这可能是它变慢的原因吗?在
原因是,你的程序实际上是连续的。这是一个从并行角度来看与您的代码片段相同的示例代码片段:
运行这个程序,你会发现数字1-9一秒钟出现一次。为什么会这样?原因是你的
.get()
。这意味着每次调用apply\u async都将在get()
中的实践块中执行,直到结果可用为止。它将提交一个任务,等待第二个模拟处理延迟,然后返回结果,然后将另一个任务提交到池中。这意味着根本不存在并行执行。在尝试使用以下内容替换池管理部件:
^{pr2}$您现在可以看到并行工作,因为您的四个任务现在是同时处理的。循环不会在get中阻塞,因为get被移出循环,只有当结果准备好时才会收到结果。在
注意:如果你给你的worker的参数或者它们的返回值是大的数据结构,你会损失一些性能。在实践中,Python将这些作为队列来实现,通过队列传输大量数据相对于在分支子进程时获取数据结构的内存副本要慢得多。在
相关问题 更多 >
编程相关推荐