从时间序列d中获取间隔

2024-05-29 07:48:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我有个很奇怪的问题。我目前正在处理时间序列数据,数据集中有几个峰值。这些数据是用中子密度测井仪收集的,它描述了传感器连续记录一段时间的事件。当这台机器沿着钻孔向下移动时,数据中的峰值对应于一些有趣的间隔。所以,峰值很重要。然而,重要的不仅仅是峰值。它是整个时间间隔(或者至少如我所描述的,请参阅我的附图)。现在我的问题是,有没有一种信号处理方法(最好是Python)我可以使用或研究,它可以让我把这个信号分成不同的区间,每个区间对应于局部最小值/最大值?你知道吗

我最初的方法是使用Kleinberg 2002描述的突发检测算法,但是我没有成功,所以我想听听其他人对此的看法。你知道吗

这是原始数据:

graph of proportion of target events vs. time

这就是我想做的:

annotated graph


Tags: 数据方法机器间隔记录时间事件请参阅
2条回答

由于原始数据不可用,我生成了一些测试数据。方法如下。如果数据是一组有跳跃的平台,检查移动窗口上的标准偏差,将给出跳跃处的峰值。用阈值隔离峰。在应用移动平均值后,通过查看数据得到跳跃的估计值。这为装配提供了一个良好的起点。作为fit函数,我再次使用我最喜欢的tanh。你知道吗

结果如下:

import matplotlib.pyplot as plt
import numpy as np
from operator import itemgetter
from itertools import groupby
from scipy.optimize import leastsq

def moving_average( data, windowsize, **kwargs ):
    mode=kwargs.pop('mode', 'valid')
    weight = np.ones( windowsize )/ float( windowsize )
    return np.convolve( data, weight, mode=mode)

def moving_sigma( data, windowsize ):
    l = len( data )
    sout = list()
    for i in range( l - windowsize + 1 ):
        sout += [ np.std( data[ i : i + windowsize ] ) ]
    return np.array( sout )

def m_step( x, s, x0List, ampList, off ):
    assert len( x0List ) == len( ampList )
    out = [ a * 0.5 * ( 1 + np.tanh( s * (x - x0) ) ) for a, x0 in zip( ampList, x0List )]
    return sum( out ) + off

def residuals( params, xdata, ydata ):
    assert not len(params) % 2
    off = params[0]
    s = params[1]
    rest = params[2:]
    n = len( rest )
    x0 = rest[:n/2]
    am = rest[n/2:]
    diff = [ y - m_step( x,s, x0, am, off ) for x, y in zip( xdata, ydata ) ]
    return diff 

### generate data
np.random.seed(776)
a=0
data = list()
for i in range(15):
    a = 50 * ( 1 - 2 * np.random.random() )
    for _ in range ( np.random.randint( 5, 35 ) ):
        data += [a]

xx =  np.arange( len( data ), dtype=np.float )

noisydata = list()
for x in data:
    noisydata += [ x + np.random.normal( scale=5 ) ]

###define problem specific parameters
myWindow = 10
thresh = 8
cutoffscale = 1.1

### data treatment
avdata = moving_average( noisydata, myWindow )
avx = moving_average( xx, myWindow )
sdata = moving_sigma( noisydata, myWindow )

### getting start values for the fit
seq = [x for x in np.where( sdata > thresh )[0] ]
# from https://stackoverflow.com/a/3149493/803359
jumps = [ map( itemgetter(1), g ) for k, g in groupby( enumerate( seq ), lambda ( i, x ) : i - x ) ]
xjumps = [ [ avx[ k ] for k in xList ] for xList in jumps ]
means = [ np.mean( x ) for x in xjumps ]
### the delta is too small as one only looks above thresh so lets increase it by guessing
deltas = [ ( avdata[ x[-1] ] - avdata[ x[0] ] ) * cutoffscale for x in jumps ]

guess = [ avdata[0], 2] + means + deltas

### fitting
sol, err = leastsq( residuals, guess, args=( xx, noisydata ), maxfev=10000 )
fittedoff = sol[0]
fitteds = sol[1]
fittedx0 = sol[ 2 : 2 + len( means ) ] 
fittedam = sol[ 2 + len( means ) : ] 

### plotting
fig = plt.figure()
ax = fig.add_subplot( 1, 1, 1 )
# ~ ax.plot( data )
ax.plot( xx, noisydata, label='noisy data' )
ax.plot( avx, avdata, label='moving average' )
ax.plot( avx, sdata, label='window sigma' )

for j in means:
    ax.axvline( j, c='#606060', linestyle=':' )

yy = [ m_step( x, 2, means, deltas, avdata[0] ) for x in xx]
bestyy = [ m_step( x, fitteds, fittedx0, fittedam, fittedoff ) for x in xx ]
ax.plot( xx, yy, label='guess' )
ax.plot( xx, bestyy, label='fit' )

plt.legend( loc=0 )
plt.show()

它给出了以下图表:

enter image description here

一些较小的跳跃没有被检测到,但应该很清楚,这是意料之中的。而且,整个过程也不是很快。由于阈值的原因,初始猜测越大越糟糕x出现的跳跃越多。最后,不检测到一系列小跳跃,因此数据中产生的斜率被拟合为一个具有明显大误差的大平台。我们可以引入一个额外的检查,并添加相应的跳跃位置。你知道吗

还请注意,窗口大小的选择决定了两个跳转作为单个跳转检测到的距离。你知道吗

我从散点图中提取数据进行分析,这是我第一次切入问题,并使用示例代码进行构建。这样做的目的是将三阶多项式拟合到所有数据作为基线,然后在标记为“矩形化部分”的代码部分中,在基线曲线上方或下方的截止距离处查找数据点。我希望这至少能为解决这个问题提供一条途径。想到的一个增强是在“矩形化”代码中实现最小宽度。你知道吗

plot

import numpy, matplotlib
import matplotlib.pyplot as plt


##########################################################
# data load section
f = open('/home/zunzun/neutron_refl_bore/rawdata.dat')
rawdata = f.read()
f.close()

xData = []
yData = []
for line in rawdata.split('\n'):
    if line:
        spl = line.split()
        xData.append(float(spl[0]))
        yData.append(float(spl[1]))
xData = numpy.array(xData)
yData = numpy.array(yData)


##########################################################
#rectangularize section
cutoffLimit = 1.6
polynomialOrder = 3 # for baseline curve
modelParameters = numpy.polyfit(xData, yData, polynomialOrder) 
modelPredictions = numpy.polyval(modelParameters, xData)
modelDifference = yData - modelPredictions
maxDiff = max(modelDifference)
minDiff = min(modelDifference)

aboveLimit = (modelDifference > cutoffLimit) * maxDiff
belowLimit = (modelDifference < -cutoffLimit) * minDiff
rectanglarized = modelPredictions +  + belowLimit + aboveLimit

##########################################################
# graphics output section
def ModelAndScatterPlot(graphWidth, graphHeight):
    f = plt.figure(figsize=(graphWidth/100.0, graphHeight/100.0), dpi=100)
    axes = f.add_subplot(111)

    # first the raw data as a scatter plot
    axes.plot(xData, yData,  'D')

    # create data for the fitted equation plot
    xModel = numpy.linspace(min(xData), max(xData))
    yModel = numpy.polyval(modelParameters, xModel)

    # now the model as a line plot
    axes.plot(xModel, yModel)

    # now show rectangularized
    axes.plot(xData, rectanglarized)

    axes.set_title('Data Outside Polynomial Baseline +/- Cutoff') # add a title
    axes.set_xlabel('X Data') # X axis data label
    axes.set_ylabel('Y Data') # Y axis data label

    plt.show()
    plt.close('all') # clean up after using pyplot

graphWidth = 800
graphHeight = 600
ModelAndScatterPlot(graphWidth, graphHeight)

相关问题 更多 >

    热门问题