在Python中过滤长时间序列的最有效方法

9 投票
1 回答
10871 浏览
提问于 2025-04-17 14:45

我有一个很大的时间序列数据,比如说有10亿个数据点,这些数据是记录神经活动的电压值。在进行进一步分析之前,我想对这些数据进行带通滤波,范围在300赫兹到7000赫兹之间。下面是我设计的巴特沃斯滤波器的代码。我该如何让这个滤波器运行得更快呢?现在运行的时间太长了。

更新:样本数据

这里是一个链接,里面有每行数据的样子,和data中的数据类似。

关于数据格式,每一行代表一个不同的记录来源,每一列代表一个时间点。数据的采样频率是20,000赫兹。

 def butter_bandpass(lowcut,highcut,fs,order=8):
     nyq = 0.5*fs
     low = lowcut/nyq
     high = highcut/nyq

     b,a = butter(order, [low, high], btype='band')
     return b,a

 def butter_bandpass_filter(data,lowcut,highcut,fs,order=8):
     b,a = butter_bandpass(lowcut,highcut,fs,order=order)
     return lfilter(b,a,data) 

 print 'Band-pass filtering data'
 data = map(lambda channel: butter_bandpass_filter(channel,300,7000,20000),data)

更新

像大多数NumPy和SciPy的函数一样,lfilter可以处理多维输入,所以使用map会造成不必要的开销。也就是说,可以将

 data = map(lambda channel:butter_bandpass_filter(channel,300,7000,20000),data)

重写为

 data = butter_bandpass_filter(data,300,7000,20000)

默认情况下,lfilter会在最后一个非单一轴上操作。对于一个二维矩阵来说,这意味着这个函数会应用到每一行,这正是我需要的。不过,遗憾的是,它还是太慢了。

1 个回答

12

首先,你的数据样本是一个专有格式,对吧?即使使用Python的biosig工具箱,这种格式也无法读取。也许我错了,但我没有成功读取它。

所以,我将基于人工生成的数据来回答这个问题,这些数据是从一个Rössler振荡器生成的。这个振荡器是一个混沌的三维振荡器,常用于非线性时间序列分析领域。

import numpy as np
from scipy.signal import butter, lfilter

##############################################################
# For generating sample-data
##############################################################
from scipy.integrate import odeint

def roessler_ode(y,t,omega=1,a=0.165,b=0.2,c=10):
    dy = np.zeros((3))
    dy[0] = -1.0*(omega*y[1] + y[2]) #+ e1*(y[3]-y[0])
    dy[1] = omega * y[0] + a * y[1]
    dy[2] = b + y[2] * (y[0] - c)
    return dy

class Roessler(object):
    """A single coupled Roessler oscillators"""
    def __init__(self, y=None, omega=1.0, a=0.165,b=0.2,c=10):
        self.omega = omega
        self.a = a
        self.b = b
        self.c = c
        if y==None:
            self.y = np.random.random((3))+0.5
        else:
            self.y = y

    def ode(self,y,t):
        dy = roessler_ode(y[:],t,self.omega,self.a,self.b,self.c)
        return dy

    def integrate(self,ts):
        rv = odeint(self.ode,self.y,ts)
        self.y = rv[-1,:]
        return rv
###############################################################

接下来是你的函数定义:

def butter_bandpass(lowcut,highcut,fs,order=8):
    nyq = 0.5*fs
    low = lowcut/nyq
    high = highcut/nyq

    b,a = butter(order, [low, high], btype='band')
    return b,a

def butter_bandpass_filter(data,lowcut,highcut,fs,order=8):
    b,a = butter_bandpass(lowcut,highcut,fs,order=order)
    return lfilter(b,a,data) 

我没有改变它们。我用我的振荡器生成了一些数据,但我只取了其中的第三个分量。我还添加了一些高斯噪声,以便有东西可以过滤掉。

# generate sample data
data = Roessler().integrate(np.arange(0,1000,0.1))[:,2]
data += np.random.normal(size=data.shape)

现在我们来谈谈速度的问题。我使用timeit模块来检查执行时间。这些语句执行过滤操作100次,并测量总时间。我对2阶和8阶进行了这个测量(是的,我知道你想要更尖锐的频谱边缘,但等等)。

# time execution
from timeit import timeit
time_order8 = timeit("butter_bandpass_filter(data,300,2000,20000,8)", "from __main__ import butter_bandpass_filter, butter_bandpass, data", number=100)
time_order2 = timeit("butter_bandpass_filter(data,300,2000,20000,2)", "from __main__ import butter_bandpass_filter, butter_bandpass, data", number=100)
print "For order 8: %.2f seconds" % time_order8
print "For order 2: %.2f seconds" % time_order2

这两个打印语句的输出是:

For order 8: 11.70 seconds
For order 2: 0.54 seconds

这差了20倍!使用8阶的巴特沃斯滤波器绝对不是个好主意。我想不出任何情况下这样做是有意义的。为了证明使用这种滤波器时出现的其他问题,我们来看看这些滤波器的效果。我们对数据进行带通滤波,一次用8阶,一次用2阶:

data_bp8 = butter_bandpass_filter(data,300,2000,20000,8)
data_bp2 = butter_bandpass_filter(data,300,2000,20000,2)

现在我们来做一些图表。首先,简单的线条(我没有在意x轴)。

# plot signals
import matplotlib.pyplot as plt
plt.figure(1)
plt.plot(data, label="raw")
plt.plot(data_bp8, label="order 8")
plt.plot(data_bp2, label="order 2")
plt.legend()

这给我们带来了:

Signals

哦,绿色线条发生了什么?奇怪,不是吗?原因是8阶的巴特沃斯滤波器变得相当不稳定。你听说过共振灾难吗?这就是它的样子。

这些信号的功率谱密度可以绘制成:

# plot power spectral densities
plt.figure(2)
plt.psd(data, Fs=200000, label="raw")
plt.psd(data_bp8, Fs=20000, label="order 8")
plt.psd(data_bp2, Fs=20000, label="order 2")
plt.legend()

plt.show()

PSD

在这里,你可以看到绿色线条的边缘更尖锐,但代价是什么呢?在大约300 Hz处出现了一个人工峰值。信号完全扭曲了。

那么你该怎么办呢?

  • 绝对不要使用8阶的巴特沃斯滤波器。
  • 如果低阶滤波器足够,就用低阶的。
  • 如果不够,可以使用Parks-McClellan或Remez-Exchange算法创建一些FIR滤波器。例如,可以使用scipy.signal.remez。

另一个提示:如果你关心信号的相位,绝对应该在时间上前向和后向进行滤波。lfilter会改变相位。一个这样的算法实现,通常称为filtfilt,可以在我的GitHub仓库找到。

还有一个编程提示:

如果你遇到传递参数的情况(butter_bandpass_filter的四个参数只是传递给butter_bandpass),你可以使用*args**kwargs

def butter_bandpass(lowcut,highcut,fs,order=8):
    nyq = 0.5*fs
    low = lowcut/nyq
    high = highcut/nyq

    b,a = butter(order, [low, high], btype='band')
    return b,a

def butter_bandpass_filter(data, *args, **kwargs):
    b,a = butter_bandpass(*args, **kwargs)
    return lfilter(b,a,data) 

这可以减少代码的冗余,使你的代码更不容易出错。

最后,这里是完整的脚本,方便你复制粘贴来试试。

import numpy as np
from scipy.signal import butter, lfilter

##############################################################
# For generating sample-data
##############################################################
from scipy.integrate import odeint

def roessler_ode(y,t,omega=1,a=0.165,b=0.2,c=10):
    dy = np.zeros((3))
    dy[0] = -1.0*(omega*y[1] + y[2]) #+ e1*(y[3]-y[0])
    dy[1] = omega * y[0] + a * y[1]
    dy[2] = b + y[2] * (y[0] - c)
    return dy

class Roessler(object):
    """A single coupled Roessler oscillators"""
    def __init__(self, y=None, omega=1.0, a=0.165,b=0.2,c=10):
        self.omega = omega
        self.a = a
        self.b = b
        self.c = c
        if y==None:
            self.y = np.random.random((3))+0.5
        else:
            self.y = y

    def ode(self,y,t):
        dy = roessler_ode(y[:],t,self.omega,self.a,self.b,self.c)
        return dy

    def integrate(self,ts):
        rv = odeint(self.ode,self.y,ts)
        self.y = rv[-1,:]
        return rv
###############################################################


def butter_bandpass(lowcut,highcut,fs,order=8):
    nyq = 0.5*fs
    low = lowcut/nyq
    high = highcut/nyq

    b,a = butter(order, [low, high], btype='band')
    return b,a

def butter_bandpass_filter(data,lowcut,highcut,fs,order=8):
    b,a = butter_bandpass(lowcut,highcut,fs,order=order)
    return lfilter(b,a,data) 

# generate sample data
data = Roessler().integrate(np.arange(0,1000,0.1))[:,2]
data += np.random.normal(size=data.shape)

# time execution
from timeit import timeit
time_order8 = timeit("butter_bandpass_filter(data,300,2000,20000,8)", "from __main__ import butter_bandpass_filter, butter_bandpass, data", number=100)
time_order2 = timeit("butter_bandpass_filter(data,300,2000,20000,2)", "from __main__ import butter_bandpass_filter, butter_bandpass, data", number=100)
print "For order 8: %.2f seconds" % time_order8
print "For order 2: %.2f seconds" % time_order2

data_bp8 = butter_bandpass_filter(data,300,2000,20000,8)
data_bp2 = butter_bandpass_filter(data,300,2000,20000,2)

# plot signals
import matplotlib.pyplot as plt
plt.figure(1)
plt.plot(data, label="raw")
plt.plot(data_bp8, label="order 8")
plt.plot(data_bp2, label="order 2")
plt.legend()

# plot power spectral densities
plt.figure(2)
plt.psd(data, Fs=200000, label="raw")
plt.psd(data_bp8, Fs=20000, label="order 8")
plt.psd(data_bp2, Fs=20000, label="order 2")
plt.legend()

plt.show()

撰写回答