从时间序列CSV d构建时间轴

2024-05-16 16:25:08 发布

您现在位置:Python中文网/ 问答频道 /正文

这不是我遇到的具体错误,更多的是我应该使用什么样的数据结构和算法来实现我的目标。让我知道,如果这将更适合像软件工程不同的董事会。你知道吗

我有物联网设备,它将数据转储到CSV中,这些CSV变得太大,人类无法解析。CSV看起来像这样:

Time,Sensor1,State1,State2,State3,Sensor2,Sensor3,Sensor4,State4...
2019-05-04T10:04:45.601000Z,0.19,0,0,1,25.67,298.8,12.3,5...
2019-05-04T10:04:58.133000Z,0.23,0,1,2,24.53,300.1,14.4,6...
...

在一个目录中,可能有一打或两个CSV我想作为一个小组来分析。每个CSV有几十列和上万行。每个CSV的第一列是Time,在这个CSV中,timesteps是统一的(尽管它们并不精确)。除时间外,CSV组中的列标题是唯一的(除了所有CSV都具有的时间外,没有两个CSV共享一个列名)。有时时间步长是10秒,有时是2秒甚至更小的间隔。在不同的csv中,时间列很可能覆盖相同的时间跨度,但时间间隔可能并不完全一致。例如,在上面的CSV中,我们有一个数据点位于2019-05-04T10:04:45.601000Z。同一目录(组)中的另一个CSV的最近时间戳可能位于2019-05-04T10:04:49.601000Z(四秒之后)。你知道吗

我想通过扫描csv的状态转换来构建一个“事件”的时间线。并不是所有的变量变化都是显著的,所以我希望能够声明我所关心的状态转换。我想使用以下类:

class StateTransition:
    def __init__(self, variable, from_state, to_state, name, relevant_measurements=()):
        self.variable = variable
        self.from_state = from_state
        self.to_state = to_state
        self.name = name
        self.relevant_measurements = relevant_measurements

我将维护一个要查找的StateTransforms实例列表,如下所示:

states = (
    StateTransition(variable='State2',
                    from_state=0, to_state=1,
                    name='Cable plugged in',
                    relevant_measurements=('State1', 'Sensor4', )),
    StateTransition(variable='State2',
                    from_state=1, to_state=0,
                    name='Cable unplugged',
                    relevant_measurements('State1', 'Sensor3', )),
)

相关测量是指其他变量,这些变量可能来自或不来自同一个CSV。相关测量的重点是了解在状态转换时那些其他测量(或状态)的值。由于相关的度量值可能来自另一个CSV,因此我必须在最接近状态转换的时间戳的时间处确定该度量值的值。我想象这样构造一个物体:

from datetime import datetime
class TimelineEvent:
    def __init__(self, state_transition: StateTransition, timestamp: datetime):
        self.st = state_transition
        self.timestamp = timestamp
        self.measurements = {}

在哪里自我测量是一个dict,对于“电缆插入”事件,它看起来是这样的:

{'State1': 0,
 'Sensor4': 14.4,}

最终目标是根据我定义的statetransforms为CSV组创建一个TimelineEvents列表。一旦我有了一个时间线事件列表,我就可以随心所欲地显示它们,这一部分我很容易就能搞清楚。我挣扎的地方是如何建立一个时间表。我应该一次浏览一个CSV,然后根据时间戳对时间线进行排序吗?我应该同时检查所有CSV吗?我应该根据CSV的时间戳来检查它们吗?是否有一个有用的数据结构,我可以加载所有的csv到其中,使搜索时间对齐的数据变得容易?我从未使用过pandas,但我将它包含在标签中,因为我认为它提供的工具在这里可能有用,但答案肯定不需要它。你知道吗

内存使用在这里不是很重要。至于运行时,它不需要很快,但如果它在太阳内爆之前完成就好了。最坏的情况是,假设定义了十万行CSV、100列和50个状态转换。你知道吗

我考虑将状态转换存储到一个字典中,该字典如下所示:

states_dict[variable][from_state][to_state] = {'name': 'something', relevant_measurements=('some_sensor',),}

^类似于上面的内容将提供状态转换的有效查找。如果有帮助的话,你可以假设我已经做过了。你知道吗

澄清一下:我不需要实现来接受答案。如果你能给我指出工具和/或描述一个算法,以我能理解的方式来实现这一点,我会接受这样的答案。你知道吗


Tags: csvto数据namefromself状态时间
1条回答
网友
1楼 · 发布于 2024-05-16 16:25:08

虽然这个问题并不十分具体,但人们通常会对来自物联网设备的时间序列数据进行各种处理。你知道吗

重采样、将重采样与实际相结合、应用逻辑(如状态机)和在数据帧之间查找值是常见的。你知道吗

下面是一个例子,一种繁忙的框,显示了上面关于时间序列数据的一些内容,由datetimeindex操作。你知道吗

import pandas as pd
import random as r
from pandas.compat import StringIO
print(pd.__version__)

daterange = pd.date_range('2019-01-01 11:00', '2019-01-01 13:00', periods = r.randint(10,30))
df = pd.DataFrame(index=daterange, data={'sensor_data': [r.randint(0,5) for i in range(len(daterange))]})

# the actual datapoints
actual_datapoints = df.copy()
actual_datapoints['actual'] = True

# resample e.g. visualization purposes
df = df.resample('5T').last().ffill()
# but let's not confuse these datapoints and the result of resampling with actual datapoints
df['actual'] = False
# for these false datapoints, delete the ones for which there is an actual
mask = df.index.isin(actual_datapoints.index)
df = df.drop(df[mask].index)

# combine actual datapoints with the resampled timeseries
df = pd.concat([actual_datapoints, df])
df.sort_index(inplace=True)

# a lookup dataframe, lookups by datetimeindex
data = """datetime,lookup_val
2019-01-01 11:00,100
2019-01-01 11:30,200
2019-01-01 12:00,300
"""
lookup_df = pd.read_csv(StringIO(data), index_col='datetime', parse_dates=True, infer_datetime_format=True)

# a really bad state machine. :-)
state=None
def statefunc(x):
    global state
    if x != state:
        state = x
        return 'edge'
    return state

df['state'] = df['sensor_data'].apply(statefunc)

# actual, resampled, and looked up values 
df = df.join(lookup_df)
print(df)

下面提到的是基于近似时间戳的查找。见:https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html

相关问题 更多 >