自定义Python pandas中的rolling_apply函数

3 投票
2 回答
5384 浏览
提问于 2025-04-17 19:33

设置

我有一个数据表(DataFrame),里面有三列:

  • “类别”这一列包含了真(True)和假(False),我用 df.groupby('Category') 把这些值分组。
  • “时间”这一列记录了时间戳(以秒为单位),也就是记录值的时间点。
  • “值”这一列则是实际的数值。

在每个时间点上,都会记录两个值:一个是类别为“真”,另一个是类别为“假”。

滑动计算问题

在每个类别组内,我想计算一个数字,并把它存储在结果列(Result)中。这个结果是在时间 t-60t 之间,值在1到3之间的百分比。

实现这个的最简单方法可能是先通过 rolling_count 计算这个时间段内的总值数量,然后再用 rolling_apply 只计算在这个区间内,值在1到3之间的数量。

这是我目前的代码:

groups = df.groupby(['Category'])
for key, grp in groups:
    grp = grp.reindex(grp['Time']) # reindex by time so we can count with rolling windows
    grp['total'] = pd.rolling_count(grp['Value'], window=60) # count number of values in the last 60 seconds
    grp['in_interval'] = ? ## Need to count number of values where 1<v<3 in the last 60 seconds

    grp['Result'] = grp['in_interval'] / grp['total'] # percentage of values between 1 and 3 in the last 60 seconds

那么,正确的 rolling_apply() 调用应该怎么写,才能找到 grp['in_interval'] 呢?

2 个回答

2

如果我理解你的问题没错的话,你其实可以不使用 rolling count,如果你只是为了计算百分比的话。rolling_apply 需要一个函数作为参数,这个函数会进行聚合操作,也就是说,它会接收一个数组作为输入,然后返回一个数字作为输出。

有了这个概念,我们先来定义一个函数:

def between_1_3_perc(x):
    # pandas Series is basically a numpy array, we can do boolean indexing
    return float(len(x[(x > 1) & (x < 3)])) / float(len(x))

然后在循环中把这个函数的名字作为 rolling_apply 的参数使用:

grp['Result'] = pd.rolling_apply(grp['Value'], 60, between_1_3_perc)
7

让我们通过一个例子来理解:

import pandas as pd
import numpy as np
np.random.seed(1)

def setup(regular=True):
    N = 10
    x = np.arange(N)
    a = np.arange(N)
    b = np.arange(N)

    if regular:
        timestamps = np.linspace(0, 120, N)
    else:
        timestamps = np.random.uniform(0, 120, N)

    df = pd.DataFrame({
        'Category': [True]*N + [False]*N,
        'Time': np.hstack((timestamps, timestamps)),
        'Value': np.hstack((a,b))
        })
    return df

df = setup(regular=False)
df.sort(['Category', 'Time'], inplace=True)

所以这个数据框 df 看起来是这样的:

In [4]: df
Out[4]: 
   Category       Time  Value    Result
12    False   0.013725      2  1.000000
15    False  11.080631      5  0.500000
14    False  17.610707      4  0.333333
16    False  22.351225      6  0.250000
13    False  36.279909      3  0.400000
17    False  41.467287      7  0.333333
18    False  47.612097      8  0.285714
10    False  50.042641      0  0.250000
19    False  64.658008      9  0.125000
11    False  86.438939      1  0.333333
2      True   0.013725      2  1.000000
5      True  11.080631      5  0.500000
4      True  17.610707      4  0.333333
6      True  22.351225      6  0.250000
3      True  36.279909      3  0.400000
7      True  41.467287      7  0.333333
8      True  47.612097      8  0.285714
0      True  50.042641      0  0.250000
9      True  64.658008      9  0.125000
1      True  86.438939      1  0.333333

现在,跟着 @herrfz 的思路,我们来定义一下

def between(a, b):
    def between_percentage(series):
        return float(len(series[(a <= series) & (series < b)])) / float(len(series))
    return between_percentage

between(1,3) 是一个函数,它接受一个序列作为输入,并返回这个序列中有多少元素在半开区间 [1,3) 内。例如:

In [9]: series = pd.Series([1,2,3,4,5])

In [10]: between(1,3)(series)
Out[10]: 0.4

现在我们要对数据框 df 按照 Category 分组:

df.groupby(['Category'])

对于每个分组,我们想要应用一个函数:

df['Result'] = df.groupby(['Category']).apply(toeach_category)

这个函数 toeach_category 会接受一个(子)数据框作为输入,并返回一个数据框作为输出。最终的结果会被赋值给 df 中一个新的列,叫做 Result

那么 toeach_category 具体需要做什么呢?如果我们这样写 toeach_category

def toeach_category(subf):
    print(subf)

那么我们会看到每个 subf 是一个像这样的数据框(当 Category 为 False 时):

   Category       Time  Value    Result
12    False   0.013725      2  1.000000
15    False  11.080631      5  0.500000
14    False  17.610707      4  0.333333
16    False  22.351225      6  0.250000
13    False  36.279909      3  0.400000
17    False  41.467287      7  0.333333
18    False  47.612097      8  0.285714
10    False  50.042641      0  0.250000
19    False  64.658008      9  0.125000
11    False  86.438939      1  0.333333

我们想要处理 Times 列,并且对每个时间值应用一个函数。这可以通过 applymap 来实现:

def toeach_category(subf):
    result = subf[['Time']].applymap(percentage)

函数 percentage 会接受一个时间值作为输入,并返回一个值作为输出。这个值是行中在 1 和 3 之间的比例。applymap 是非常严格的:percentage 不能接受其他参数。

给定一个时间 t,我们可以使用 ix 方法选择 subf 中时间在半开区间 (t-60, t] 内的 Value

subf.ix[(t-60 < subf['Time']) & (subf['Time'] <= t), 'Value']

然后,我们可以通过应用 between(1,3) 来找到这些 Values 中在 1 和 3 之间的比例:

between(1,3)(subf.ix[(t-60 < subf['Time']) & (subf['Time'] <= t), 'Value'])

现在记住,我们想要一个函数 percentage,它接受 t 作为输入,并返回上面的表达式作为输出:

def percentage(t):
    return between(1,3)(subf.ix[(t-60 < subf['Time']) & (subf['Time'] <= t), 'Value'])

但是要注意,percentage 依赖于 subf,而我们不能把 subf 作为参数传递给 percentage(再次强调,因为 applymap 是非常严格的)。

那么我们该如何解决这个问题呢?解决方案是把 percentage 定义在 toeach_category 内部。Python 的作用域规则是:首先在局部作用域查找像 subf 这样的名称,然后是封闭作用域,接着是全局作用域,最后是内置作用域。当调用 percentage(t) 时,Python 首先在局部作用域查找 subf 的值。由于 subf 不是 percentage 的局部变量,Python 会在 toeach_category 的封闭作用域中查找它。它在那里找到了 subf。太好了,这正是我们需要的。

所以现在我们有了我们的函数 toeach_category

def toeach_category(subf):
    def percentage(t):
        return between(1, 3)(
            subf.ix[(t - 60 < subf['Time']) & (subf['Time'] <= t), 'Value'])
    result = subf[['Time']].applymap(percentage)
    return result

把所有内容放在一起,

import pandas as pd
import numpy as np
np.random.seed(1)


def setup(regular=True):
    N = 10
    x = np.arange(N)
    a = np.arange(N)
    b = np.arange(N)

    if regular:
        timestamps = np.linspace(0, 120, N)
    else:
        timestamps = np.random.uniform(0, 120, N)

    df = pd.DataFrame({
        'Category': [True] * N + [False] * N,
        'Time': np.hstack((timestamps, timestamps)),
        'Value': np.hstack((a, b))
    })
    return df


def between(a, b):
    def between_percentage(series):
        return float(len(series[(a <= series) & (series < b)])) / float(len(series))
    return between_percentage


def toeach_category(subf):
    def percentage(t):
        return between(1, 3)(
            subf.ix[(t - 60 < subf['Time']) & (subf['Time'] <= t), 'Value'])
    result = subf[['Time']].applymap(percentage)
    return result


df = setup(regular=False)
df.sort(['Category', 'Time'], inplace=True)
df['Result'] = df.groupby(['Category']).apply(toeach_category)
print(df)

得到

   Category       Time  Value    Result
12    False   0.013725      2  1.000000
15    False  11.080631      5  0.500000
14    False  17.610707      4  0.333333
16    False  22.351225      6  0.250000
13    False  36.279909      3  0.200000
17    False  41.467287      7  0.166667
18    False  47.612097      8  0.142857
10    False  50.042641      0  0.125000
19    False  64.658008      9  0.000000
11    False  86.438939      1  0.166667
2      True   0.013725      2  1.000000
5      True  11.080631      5  0.500000
4      True  17.610707      4  0.333333
6      True  22.351225      6  0.250000
3      True  36.279909      3  0.200000
7      True  41.467287      7  0.166667
8      True  47.612097      8  0.142857
0      True  50.042641      0  0.125000
9      True  64.658008      9  0.000000
1      True  86.438939      1  0.166667

撰写回答