加速Python中的蒙特卡洛代码
考虑一些点Y,这些点是从0到T之间按顺序排列的。我们把这些点看作是在一个周长为T的圆上。现在再考虑一些点X,同样是从0到T之间的点,也是在这个周长为T的圆上。
我们说X和Y之间的距离是X中每个点与Y中最近的点之间的绝对距离之和。记这个距离为Delta(X, Y)。
我正在寻找一种快速的方法来近似计算在所有可能的X旋转情况下,两个圆之间距离的分布。目前我使用的是蒙特卡洛模拟。首先,这是我用来生成一些假数据的代码。
import random
import numpy as np
from bisect import bisect_left
def simul(rate, T):
time = np.random.exponential(rate)
times = [0]
newtime = times[-1]+time
while (newtime < T):
times.append(newtime)
newtime = newtime+np.random.exponential(rate)
return times[1:]
接下来是计算两个圆之间距离的代码。
def takeClosest(myList, myNumber, T):
"""
Assumes myList is sorted. Returns closest value to myNumber in a circle of circumference T.
If two numbers are equally close, return the smallest number.
"""
pos = bisect_left(myList, myNumber)
if (pos == 0 and myList[pos] != myNumber):
before = myList[pos - 1] - T
after = myList[0]
elif (pos == len(myList)):
before = myList[pos-1]
after = myList[0] + T
else:
before = myList[pos - 1]
after = myList[pos]
if after - myNumber < myNumber - before:
return after
else:
return before
def circle_dist(timesY, timesX):
dist = 0
for t in timesX:
closest_number = takeClosest(timesY, t, T)
dist += np.abs(closest_number - t)
return dist
然后是主要的代码,用来生成数据并尝试1000种不同的随机旋转。
T = 50000
timesX = simul(1, T)
timesY = simul(10, T)
dists=[]
iters = 100
for i in xrange(iters):
offset = np.random.randint(0,T)
timesX = [(t+offset) % T for t in timesX]
dists.append(circle_dist(timesY, timesX))
现在我们可以打印出任何我们想要的距离统计数据。我特别关注的是方差。
print "Variance is ", np.var(dists)
不幸的是,我需要做很多次这个计算,现在大约需要16秒。我觉得这个速度有点慢。欢迎任何加速的建议。
编辑 1. 将迭代次数减少到100(之前的值与我的时间不符)。现在在我的电脑上大约需要16秒。
编辑 2. 修复了takeClosest中的一个错误。
2 个回答
编辑: 我刚刚注意到,性能优化有点早,因为表达式 closest_number - t
并不是“圆”上任何距离定义的有效实现——这只是开放线段上的一种距离。
示例测试案例(伪代码):
T = 10
X = [1, 2]
Y = [9]
dist(X, Y) = dist(1, 9) + dist(2, 9)
dist_on_line = 8 + 7 = 15
dist_on_circle = 2 + 3 = 5
注意,圆的定义 [0,10)
意味着 dist(0, 10)
是没有定义的,但在极限情况下,它接近于0:lim(dist(0, t), t->10) = 0
在圆上正确实现距离的方式是:
dist_of_t = min(t - closest_number_before_t,
closes_number_after_t - t,
T - t + closes_number_before_t,
T - closest_number_after_t + t)
原始回答:
你可以旋转并遍历 timesY
,而不是 timesX
,因为那个数组要小得多——对 timeX
进行 bisect_left
的操作几乎可以忽略不计(O(logn)
),相比之下遍历所有元素的时间复杂度是(O(n)
)。
但在我看来,真正的性能瓶颈是因为 Python 的动态类型(在你尝试将 timesX
中的 ~50000 个项目与其他值进行比较时,每个项目都必须检查类型兼容性)——因此,将 timesX
和 timesY
转换为 numpy 数组应该会有所帮助,如果这还不够加速的话(cython, numba, ...)就是你需要考虑的方案。
这个 circle_dist
函数可以用一行代码来替代。所以你可以把它直接放到外面的 for i
循环里:
sum(abs(takeClosest(timesY, t) - t) for t in timesX)
此外,如果可能的话,最好一次性分配像 dists
这样的数组,避免反复添加元素,这样会添加很多次。
不过,很遗憾,这两种改进只节省了几个百分点的计算时间。
编辑 1:把 np.abs(...)
换成 abs(...)
,在我的机器上(使用一个减少的数据集)计算时间减少了50%!
编辑 2:根据 Aprillion 的评论更新了一行代码。