随机游走 - 并行处理
我现在正在用蒙特卡洛方法来解决一个扩散方程。这个方程的解可以用一个数学期望来表示,记作phi(W),其中phi是一个函数(根据扩散方程的不同而变化),而W是一个在边界处停止的对称随机游走。为了在某个点x上评估这个函数,我需要从x开始每次的随机游走。
我想在很多点上评估这个函数。所以我做了以下几件事:
- 我从每个点同时开始一次随机游走
- 每个点的步骤都是一样的
- 一旦每次游走到达边界,我就停止
- 我重复这些操作N次,以便通过N次模拟的频率来近似这个数学期望。
我的代码(Python)看起来是这样的:
for k in range(N): #For each "walk"
step = 0
while not(every walk has reach the boundary):
map(update_walk,points) #update the walk starting from each x in points
incr(step)
问题是:这个过程非常耗时,因为N可能很大,点的数量也很多。我在寻找任何可以帮助我优化这个代码的解决方案。
我考虑过使用并行处理(因为每次游走都是独立的),想用IPython来实现,但没有成功,因为它是在一个函数内部(返回了一个错误,像是
"无法启动函数'f',因为它没有被找到为'file.f'",但'f'是在file.big_f中定义的)
2 个回答
0
在一个单独的进程中启动计算(这样做是为了避免全局解释器锁,简称GIL)通常会用到多进程模块。要启动X个并行进程,可以使用以下代码:
from multiprocessing import Process
from Queue import Queue
queue = Queue(maxsize=MAX_NODES) # this is the number of processes you will spawn
procs = [Process(target=f, args=(node, queue) for node in nodes] # node will be the node for each process , the queue is common to all of the processes
[p.start() for p in procs] # start them
results = []
for i in range(len(nodes):
results.append(queue.get())
你只需要修改一下func(目标函数),让它接受相关数量的参数和一个队列,然后在计算结束时调用queue.put(result)来放入结果。
希望这样解释能让你明白。
1
这是我可能不使用并行计算的少数情况之一。我觉得直接用 numpy
会更快。
>>> import numpy as np
>>> def walk(x, n=100, box=.5, delta=.2):
... np.random.seed()
... w = np.cumsum(x + np.random.uniform(-delta,delta,n))
... w = np.where(abs(w) > box)[0]
... return w[0] if len(w) else n
...
>>> pwalk = np.vectorize(walk)
>>> pwalk(np.zeros(10))
array([10, 25, 4, 5, 100, 6, 28, 6, 25, 23])
我想你应该知道怎么从这里得到期望值。
你也可以把一个元组传给 np.random.uniform
的最后一个参数,这样就不需要使用 np.vectorize
了。
当然,如果你想使用并行计算的话,你可以选择一个合适的 map
函数,然后从 map
调用 walk
,而不是像我上面那样使用 vectorize
。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> p.map(walk, [0]*10)
[8, 7, 39, 7, 36, 7, 22, 27, 18, 31]
使用 pathos
,这样可以很方便地从解释器调用 map 函数。https://github.com/uqfoundation/pathos