如何编写一个测试时间连续性的函数?

0 投票
3 回答
91 浏览
提问于 2025-04-14 18:03

我有一个很大的三维数组,形状是 (100000, 24, 24)。其中有一列是整数,表示一天中的小时。这个数组的每一层都必须包含一天的24个小时,并且顺序要正确,但并不是所有的层都是从0开始的。这意味着这个数组可能不是从00:00开始的,这没问题。

有时候,小时这一列会跳过某些小时或者顺序不对。如果出现这种情况,我需要完全删除这一层。

比如,这种情况是可以的:[20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

但这种情况就不行:[0, 2, 3, 4, ...]

我已经想出了办法,但需要把每一列转换成pandas的时间序列,这样会占用很多内存。而且我的方法也没有使用向量化。我想知道怎么用numpy的向量化来高效地完成这个任务。

下面是一个最小可复现的例子,用5个小时代替24个小时:


import numpy as np
# Define the shape of the array, pretend we only have 1 column, 3 bands, and 5 rows.
shape = (3, 5, 1)
# Create the random 3D numpy array
random_array = np.random.randint(0, 10, size=shape)
# Assign these 2 bands as examples of ones that should stay 
random_array[0, :, :] = [[0], [1], [2], [3], [4]]
random_array[1, :, :] = [[3], [4], [0], [1], [2]]
# Assign this one as one that should be removed
random_array[2, :, :] = [[1], [2], [4], [4], [0]]

给定这个数组,写一个函数来保留前两层,删除最后一层。

3 个回答

2

假设:你数组中的所有“小时”都是0到23之间的整数。

简而言之:

test[(np.diff(test) % 24 == 1).all(axis=1)]

(可以根据你的需要使用不同的轴)

详细情况请考虑以下内容:

a = np.arange(24) # OK
b = (np.arange(24) + 5) % 24 # OK
c = (np.arange(24) + 5) % 24
c[10] = 0 # BAD
d = (np.arange(24) + 5) % 24
d[10:] = np.arange(14) # BAD
test = np.vstack((a,b,c,d))

diff = np.diff(test)

d1 = diff == 1
d_23 = diff == -23

d1表示你增加了1小时,d_23表示你减少了23小时。

要让这个序列有效,你需要满足以下两个条件:

  • 在每一步中,要么d1为真,要么d3为真
  • d_23最多只能为真一次

这可以用代码表示为:

(d1 | d_23).all(axis=1) & (d_23.sum(axis=1) <= 1)

输出:

array([ True,  True, False, False])

然后你可以把这个当作掩码来使用:

test[(d1 | d_23).all(axis=1) & (d_23.sum(axis=1) <= 1)]

输出:

array([[ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15,
    16, 17, 18, 19, 20, 21, 22, 23],
   [ 5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
    21, 22, 23,  0,  1,  2,  3,  4]])

我还有一个强烈的直觉,第二个测试((d_23.sum(axis=1) <= 1)其实不是必要的,因为如果d_23为真超过一次,你也会违反第一个条件。不过我没有精力去仔细研究这个,留给你去验证... :)

是的,考虑到小时在[0,23]之间,这个条件其实是多余的,可以简化为:

test[(d1 | d_23).all(axis=1)]

甚至可以更简单地写成一行:

test[(np.diff(test) % 24 == 1).all(axis=1)]
2

根据你的例子

l5=np.arange(5)
random_array[((random_array[:,:,0]-l5)%5 == random_array[:,:1,0]).all(axis=1)]

或者,针对24小时

l24=np.arange(24)
random_array[((random_array[:,:,0]-l24)%24 == random_array[:,:1,0]).all(axis=1)]

根据这个事实,如果一条线 [x₀, x₁, ×₂, ..., x₂₃] 是正确的,那么 (xₖ-k)≡x₀ (mod 24),对所有的k都成立

3

如果我理解这个问题没错的话,后面的每个值应该是前一个值加1,然后再对24取余。这个操作其实很简单,可以通过切片来提取需要检查的那一列,然后用一个函数来处理,像下面这样:

import numpy as np

def good_column(v):
    return np.all(arr_a[1:] == (arr_a[:-1] + 1) % 24)

good_column(np.array([0, 2, 3, 4]))    # False
good_column(np.array([1, 2, 3, 4]))    # True

撰写回答