如何快速对多个数据集进行最小二乘拟合?
我正在尝试对很多数据点进行高斯拟合。比如说,我有一个256 x 262144的数据数组,其中256个点需要拟合成高斯分布,而我需要做262144次这样的拟合。
有时候,高斯分布的峰值会超出数据范围,所以为了得到准确的平均结果,曲线拟合是最好的方法。即使峰值在范围内,曲线拟合也能提供更好的标准差,因为其他数据可能不在这个范围内。
我已经成功地对一个数据点进行了拟合,使用的是来自http://www.scipy.org/Cookbook/FittingData的代码。
我尝试重复这个算法,但看起来需要大约43分钟才能解决这个问题。有没有已经写好的更快的方法,可以并行处理或者更高效地完成这个任务呢?
from scipy import optimize
from numpy import *
import numpy
# Fitting code taken from: http://www.scipy.org/Cookbook/FittingData
class Parameter:
def __init__(self, value):
self.value = value
def set(self, value):
self.value = value
def __call__(self):
return self.value
def fit(function, parameters, y, x = None):
def f(params):
i = 0
for p in parameters:
p.set(params[i])
i += 1
return y - function(x)
if x is None: x = arange(y.shape[0])
p = [param() for param in parameters]
optimize.leastsq(f, p)
def nd_fit(function, parameters, y, x = None, axis=0):
"""
Tries to an n-dimensional array to the data as though each point is a new dataset valid across the appropriate axis.
"""
y = y.swapaxes(0, axis)
shape = y.shape
axis_of_interest_len = shape[0]
prod = numpy.array(shape[1:]).prod()
y = y.reshape(axis_of_interest_len, prod)
params = numpy.zeros([len(parameters), prod])
for i in range(prod):
print "at %d of %d"%(i, prod)
fit(function, parameters, y[:,i], x)
for p in range(len(parameters)):
params[p, i] = parameters[p]()
shape[0] = len(parameters)
params = params.reshape(shape)
return params
请注意,数据不一定是256x262144的,我在nd_fit中做了一些调整以使其工作。
我用来实现这个功能的代码是
from curve_fitting import *
import numpy
frames = numpy.load("data.npy")
y = frames[:,0,0,20,40]
x = range(0, 512, 2)
mu = Parameter(x[argmax(y)])
height = Parameter(max(y))
sigma = Parameter(50)
def f(x): return height() * exp (-((x - mu()) / sigma()) ** 2)
ls_data = nd_fit(f, [mu, sigma, height], frames, x, 0)
注意:下面@JoeKington发布的解决方案非常好,解决得也很快。不过,似乎只有在高斯的显著区域在适当范围内时才有效。我还需要测试一下平均值是否仍然准确,因为这是我主要使用这个方法的目的。

1 个回答
最简单的方法就是把问题线性化。你现在使用的是一种非线性的迭代方法,这种方法比线性最小二乘法要慢。
基本上,你有:
y = height * exp(-(x - mu)^2 / (2 * sigma
^2))
为了把这个变成线性方程,你需要对两边取自然对数:
ln(y) = ln(height) - (x - mu)^2 / (2 * sigma^2)
这样就简化成了一个多项式:
ln(y) = -x^2 / (2 * sigma^2) + x * mu / sigma^2 - mu^2 / sigma^2 + ln(height)
我们可以把它换成一个更简单的形式:
ln(y) = A * x^2 + B * x + C
其中:
A = 1 / (2 * sigma^2)
B = mu / (2 * sigma^2)
C = mu^2 / sigma^2 + ln(height)
不过,有一个问题。如果数据中有噪声,尤其是在分布的“尾部”,这个方法会变得不稳定。
因此,我们需要只使用接近分布“峰值”的数据。其实只包含超过某个阈值的数据进行拟合是很简单的。在这个例子中,我只包括那些大于给定高斯曲线最大观察值的20%的数据。
一旦我们这样做了,速度就会很快。解决262144个不同的高斯曲线只需要大约1分钟(如果你在这么大的数据上运行,记得去掉绘图部分的代码...)。如果你想的话,这个过程也很容易并行处理。
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import itertools
def main():
x, data = generate_data(256, 6)
model = [invert(x, y) for y in data.T]
sigma, mu, height = [np.array(item) for item in zip(*model)]
prediction = gaussian(x, sigma, mu, height)
plot(x, data, linestyle='none', marker='o')
plot(x, prediction, linestyle='-')
plt.show()
def invert(x, y):
# Use only data within the "peak" (20% of the max value...)
key_points = y > (0.2 * y.max())
x = x[key_points]
y = y[key_points]
# Fit a 2nd order polynomial to the log of the observed values
A, B, C = np.polyfit(x, np.log(y), 2)
# Solve for the desired parameters...
sigma = np.sqrt(-1 / (2.0 * A))
mu = B * sigma**2
height = np.exp(C + 0.5 * mu**2 / sigma**2)
return sigma, mu, height
def generate_data(numpoints, numcurves):
np.random.seed(3)
x = np.linspace(0, 500, numpoints)
height = 100 * np.random.random(numcurves)
mu = 200 * np.random.random(numcurves) + 200
sigma = 100 * np.random.random(numcurves) + 0.1
data = gaussian(x, sigma, mu, height)
noise = 5 * (np.random.random(data.shape) - 0.5)
return x, data + noise
def gaussian(x, sigma, mu, height):
data = -np.subtract.outer(x, mu)**2 / (2 * sigma**2)
return height * np.exp(data)
def plot(x, ydata, ax=None, **kwargs):
if ax is None:
ax = plt.gca()
colorcycle = itertools.cycle(mpl.rcParams['axes.color_cycle'])
for y, color in zip(ydata.T, colorcycle):
ax.plot(x, y, color=color, **kwargs)
main()
为了实现并行版本,我们只需要改变主函数。(我们还需要一个虚拟函数,因为multiprocessing.Pool.imap
不能给它的函数提供额外的参数...)它看起来会像这样:
def parallel_main():
import multiprocessing
p = multiprocessing.Pool()
x, data = generate_data(256, 262144)
args = itertools.izip(itertools.repeat(x), data.T)
model = p.imap(parallel_func, args, chunksize=500)
sigma, mu, height = [np.array(item) for item in zip(*model)]
prediction = gaussian(x, sigma, mu, height)
def parallel_func(args):
return invert(*args)
编辑:如果简单的多项式拟合效果不好,可以尝试根据y值加权这个问题,正如@tslisten分享的链接/论文中提到的(斯特凡·范德瓦尔特实现了这个,不过我的实现有点不同)。
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import itertools
def main():
def run(x, data, func, threshold=0):
model = [func(x, y, threshold=threshold) for y in data.T]
sigma, mu, height = [np.array(item) for item in zip(*model)]
prediction = gaussian(x, sigma, mu, height)
plt.figure()
plot(x, data, linestyle='none', marker='o', markersize=4)
plot(x, prediction, linestyle='-', lw=2)
x, data = generate_data(256, 6, noise=100)
threshold = 50
run(x, data, weighted_invert, threshold=threshold)
plt.title('Weighted by Y-Value')
run(x, data, invert, threshold=threshold)
plt.title('Un-weighted Linear Inverse'
plt.show()
def invert(x, y, threshold=0):
mask = y > threshold
x, y = x[mask], y[mask]
# Fit a 2nd order polynomial to the log of the observed values
A, B, C = np.polyfit(x, np.log(y), 2)
# Solve for the desired parameters...
sigma, mu, height = poly_to_gauss(A,B,C)
return sigma, mu, height
def poly_to_gauss(A,B,C):
sigma = np.sqrt(-1 / (2.0 * A))
mu = B * sigma**2
height = np.exp(C + 0.5 * mu**2 / sigma**2)
return sigma, mu, height
def weighted_invert(x, y, weights=None, threshold=0):
mask = y > threshold
x,y = x[mask], y[mask]
if weights is None:
weights = y
else:
weights = weights[mask]
d = np.log(y)
G = np.ones((x.size, 3), dtype=np.float)
G[:,0] = x**2
G[:,1] = x
model,_,_,_ = np.linalg.lstsq((G.T*weights**2).T, d*weights**2)
return poly_to_gauss(*model)
def generate_data(numpoints, numcurves, noise=None):
np.random.seed(3)
x = np.linspace(0, 500, numpoints)
height = 7000 * np.random.random(numcurves)
mu = 1100 * np.random.random(numcurves)
sigma = 100 * np.random.random(numcurves) + 0.1
data = gaussian(x, sigma, mu, height)
if noise is None:
noise = 0.1 * height.max()
noise = noise * (np.random.random(data.shape) - 0.5)
return x, data + noise
def gaussian(x, sigma, mu, height):
data = -np.subtract.outer(x, mu)**2 / (2 * sigma**2)
return height * np.exp(data)
def plot(x, ydata, ax=None, **kwargs):
if ax is None:
ax = plt.gca()
colorcycle = itertools.cycle(mpl.rcParams['axes.color_cycle'])
for y, color in zip(ydata.T, colorcycle):
#kwargs['color'] = kwargs.get('color', color)
ax.plot(x, y, color=color, **kwargs)
main()
如果这仍然让你感到困扰,可以尝试迭代加权最小二乘法问题(这是@tslisten提到的链接中推荐的最终“最佳”方法)。不过要记住,这样会慢很多。
def iterative_weighted_invert(x, y, threshold=None, numiter=5):
last_y = y
for _ in range(numiter):
model = weighted_invert(x, y, weights=last_y, threshold=threshold)
last_y = gaussian(x, *model)
return model