阿诺-勒古移动平均线和numpy

2024-06-16 11:35:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我想用NumPy(或Pandas)编写计算Arnaud-Legoux移动平均值的代码的向量化版本。你能帮我拿这个吗?谢谢。在

非矢量化版本如下所示(见下文)。在

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s

    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma

Tags: 版本nparraylengthsigmakwargsoffsetsum
1条回答
网友
1楼 · 发布于 2024-06-16 11:35:06

开始进近

我们可以沿着第一个轴创建滑动窗口,然后使用范围为wtd的张量积进行和约化。在

实现应该是这样的-

# Get all wtd values in an array
wtds = np.exp(-(np.arange(length) - m)**2/dss)

# Get the sliding windows for input array along first axis
pnp_array3D = strided_axis0(pnp_array,len(wtds))

# Initialize o/p array
out = np.zeros(pnp_array.shape)

# Get sum-reductions for the windows which don't need wrapping over
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]

# Last element of the output needed wrapping. So, do it separately.
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])

# Finally perform the divisions
out /= wtds.sum()

获取滑动窗口的函数:strided_axis0来自^{}。在

使用1D卷积增强

那些有wtds值的乘法和它们的和约化基本上是沿着第一轴卷积的。因此,我们可以沿着axis=0使用^{}。考虑到内存效率,这会快得多,因为我们不会创建巨大的滑动窗口。在

实施将是-

^{pr2}$

因此,out[length-1:],即非零行将与avgs[:-length+1]相同。在

如果我们使用的是来自wtds的非常小的内核数,那么精度可能会有所不同。所以,如果使用这个convolution方法,请记住这一点。在

运行时测试

方法-

def original_app(pnp_array, length, m, dss):
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]
    return alma

def vectorized_app1(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    out = np.zeros(pnp_array.shape)
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    out /= wtds.sum()
    return out

def vectorized_app2(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

时间安排-

In [470]: np.random.seed(0)
     ...: m,n = 1000,100
     ...: pnp_array = np.random.rand(m,n)
     ...: 
     ...: length = 6
     ...: sigma = 0.3
     ...: offset = 0.5
     ...: 
     ...: asize = length - 1
     ...: m = np.floor(offset * asize)
     ...: s = length  / sigma
     ...: dss = 2 * s * s
     ...: 

In [471]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop

In [472]: np.random.seed(0)
     ...: m,n = 10000,1000 # rest same as previous one

In [473]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop

相关问题 更多 >