从时间序列数据计算转移矩阵的有效方法是什么?

2024-04-25 04:55:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图从时间序列数据计算转移矩阵。我编写了一个自定义函数,如下代码所示

def compute_transition_matrix(data, n, step = 1):
    P = np.zeros((n, n))
    m = len(data)
    for i in range(m):
        initial, final = i, i + step
        if final < m:
            P[data[initial]][data[final]] += 1
    sums = np.sum(P, axis = 1)
    for i in range(n):
        for j in range(n):
            P[i][j] = P[i][j] / sums[i]
    return P

print(compute_transition_matrix([3, 0, 1, 3, 2, 6, 5, 4, 7, 5, 4], 8, 1))

在上述函数中,数据是输入的时间序列数据,n是马尔可夫链中状态的总数,步骤是过渡步骤

作为一个例子,我拿

data = [3, 0, 1, 3, 2, 6, 5, 4, 7, 5, 4]
n = 8 (this means there are 8 states in Markov chain from 0 - 7, both inclusive)
step = 1

然而,我只是想知道是否有一种方法可以使用NumPy/pandas/scikit中的内置函数来实现这一点


Tags: 数据函数infordatastepnp时间
1条回答
网友
1楼 · 发布于 2024-04-25 04:55:32

我不确定是否有内置函数来实现这一点,但我可以考虑在numpy(使用fancy indexing,{a2}和stride tricks)中这样做:

def compute_transition_matrix2(data, n, step = 1):
    
    t = np.array(data)
    step = step
    total_inds = t.size - (step + 1) + 1
    t_strided = np.lib.stride_tricks.as_strided(
                                    t,
                                    shape = (total_inds, 2),
                                    strides = (t.strides[0], step * t.strides[0]))
    
    inds, counts = np.unique(t_strided, axis = 0, return_counts = True)

    P = np.zeros((n, n))
    P[inds[:, 0], inds[:, 1]] = counts
    
    sums = P.sum(axis = 1)
    # Avoid divide by zero error by normalizing only non-zero rows
    P[sums != 0] = P[sums != 0] / sums[sums != 0][:, None]
    
    # P = P / P.sum(axis = 1)[:, None]
    return P

print(compute_transition_matrix2([3, 0, 1, 3, 2, 6, 5, 4, 7, 5, 4], 8, 1))
[[0.  1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.  0.  1.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1.  0. ]
 [0.5 0.  0.5 0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  0.  1. ]
 [0.  0.  0.  0.  1.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  1.  0.  0. ]
 [0.  0.  0.  0.  0.  1.  0.  0. ]]

代码的结果:

def compute_transition_matrix(data, n, step = 1):
    P = np.zeros((n, n))
    m = len(data)
    for i in range(m):
        initial, final = i, i + step
        if final < m:
            P[data[initial]][data[final]] += 1
    sums = np.sum(P, axis = 1)
    for i in range(n):
        if sums[i] != 0: # Added this check
            for j in range(n):
                P[i][j] = P[i][j] / sums[i]
    return P

print(compute_transition_matrix([3, 0, 1, 3, 2, 6, 5, 4, 7, 5, 4], 8, 1))
[[0.  1.  0.  0.  0.  0.  0.  0. ]
 [0.  0.  0.  1.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  1.  0. ]
 [0.5 0.  0.5 0.  0.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  0.  0.  1. ]
 [0.  0.  0.  0.  1.  0.  0.  0. ]
 [0.  0.  0.  0.  0.  1.  0.  0. ]
 [0.  0.  0.  0.  0.  1.  0.  0. ]]

我的代码中的中间值:(供参考)

t_strided =

array([[3, 0],
       [0, 1],
       [1, 3],
       [3, 2],
       [2, 6],
       [6, 5],
       [5, 4],
       [4, 7],
       [7, 5],
       [5, 4]])

inds, counts =

(array([[0, 1],
        [1, 3],
        [2, 6],
        [3, 0],
        [3, 2],
        [4, 7],
        [5, 4],
        [6, 5],
        [7, 5]]),
 array([1, 1, 1, 1, 1, 1, 2, 1, 1]))

时间比较:

# Generate some random large data
n = 1000
t = np.random.choice(np.arange(n), size = n)
data = list(t)

%timeit compute_transition_matrix(data, n, 1)
# 433 ms ± 21.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit compute_transition_matrix2(data, n, 1)
# 5.5 ms ± 304 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

相关问题 更多 >