从pandas.rolling_apply返回两个值

20 投票
4 回答
7074 浏览
提问于 2025-04-17 21:04

我正在使用 pandas.rolling_apply 来将数据拟合到一个分布上,并从中获取一个值,但我还需要它报告一个滚动的拟合优度(具体来说,是 p 值)。目前我这样做:

def func(sample):
    fit = genextreme.fit(sample)
    return genextreme.isf(0.9, *fit)

def p_value(sample):
    fit = genextreme.fit(sample)
    return kstest(sample, 'genextreme', fit)[1]

values = pd.rolling_apply(data, 30, func)
p_values = pd.rolling_apply(data, 30, p_value)
results = pd.DataFrame({'values': values, 'p_value': p_values})

问题是我有很多数据,而拟合函数的计算开销很大,所以我不想对每个样本调用两次这个函数。我更希望能这样做:

def func(sample):
    fit = genextreme.fit(sample)
    value = genextreme.isf(0.9, *fit)
    p_value = kstest(sample, 'genextreme', fit)[1]
    return {'value': value, 'p_value': p_value}

results = pd.rolling_apply(data, 30, func)

这里的结果是一个 DataFrame,里面有两列。如果我尝试运行这个,我会遇到一个异常: TypeError: a float is required。请问这样做可能吗?如果可以的话,应该怎么做呢?

4 个回答

1

我也遇到过同样的问题。我是通过生成一个全局的数据框,然后从滚动函数中输入数据来解决的。在下面这个示例脚本中,我生成了一些随机的输入数据。接着,我用一个滚动应用函数来计算最小值、最大值和平均值。

import pandas as pd
import numpy as np

global outputDF
global index

def myFunction(array):

    global index
    global outputDF

    # Some random operation
    outputDF['min'][index] = np.nanmin(array)
    outputDF['max'][index] = np.nanmax(array)
    outputDF['mean'][index] = np.nanmean(array)

    index += 1
    # Returning a useless variable
    return 0

if __name__ == "__main__":

    global outputDF
    global index

    # A random window size
    windowSize = 10

    # Preparing some random input data
    inputDF = pd.DataFrame({ 'randomValue': [np.nan] * 500 })
    for i in range(len(inputDF)):
        inputDF['randomValue'].values[i] = np.random.rand()


    # Pre-Allocate memory
    outputDF = pd.DataFrame({ 'min': [np.nan] * len(inputDF),
                              'max': [np.nan] * len(inputDF),
                              'mean': [np.nan] * len(inputDF)
                              })   

    # Precise the staring index (due to the window size)
    d = (windowSize - 1) / 2
    index = np.int(np.floor( d ) )

    # Do the rolling apply here
    inputDF['randomValue'].rolling(window=windowSize,center=True).apply(myFunction,args=())

    assert index + np.int(np.ceil(d)) == len(inputDF), 'Length mismatch'

    outputDF.set_index = inputDF.index

    # Optional : Clean the nulls
    outputDF.dropna(inplace=True)

    print(outputDF)
2

我使用并喜欢@yi-yu的回答,所以我把它做得更通用了一些:

from collections import deque
from functools import partial

def make_class(func, dim_output):

    class your_multi_output_function_class:
        def __init__(self, func, dim_output):
            assert dim_output >= 2
            self.func = func
            self.deques = {i: deque() for i in range(1, dim_output)}

        def f0(self, *args, **kwargs):
            k = self.func(*args, **kwargs)
            for queue in sorted(self.deques):
                self.deques[queue].append(k[queue])
            return k[0]

    def accessor(self, index, *args, **kwargs):
        return self.deques[index].popleft()

    klass = your_multi_output_function_class(func, dim_output)

    for i in range(1, dim_output):
        f = partial(accessor, klass, i)
        setattr(klass, 'f' + str(i), f)

    return klass

假设你有一个函数f,它可以处理一个pandas的序列(可以是窗口形式,但不一定),并返回n个值,你可以这样使用它:

rolling_func = make_class(f, n)
# dict to map the function's outputs to new columns. Eg:
agger = {'output_' + str(i): getattr(rolling_func, 'f' + str(i)) for i in range(n)} 
windowed_series.agg(agger)
5

我之前也遇到过类似的问题。这是我解决这个问题的方法:

from collections import deque
class your_multi_output_function_class:
    def __init__(self):
        self.deque_2 = deque()
        self.deque_3 = deque()

    def f1(self, window):
        self.k = somefunction(y)
        self.deque_2.append(self.k[1])
        self.deque_3.append(self.k[2])
        return self.k[0]    

    def f2(self, window):
        return self.deque_2.popleft()   
    def f3(self, window):
        return self.deque_3.popleft() 

func = your_multi_output_function_class()

output = your_pandas_object.rolling(window=10).agg(
    {'a':func.f1,'b':func.f2,'c':func.f3}
    )
5

我之前也遇到过类似的问题,后来通过在应用时使用一个单独的辅助类中的成员函数解决了。这个成员函数确实能返回一个值,但我把其他的计算结果存储为类的成员,这样之后就可以使用它们了。

简单的例子:

class CountCalls:
    def __init__(self):
        self.counter = 0

    def your_function(self, window):
        retval = f(window)
        self.counter = self.counter + 1


TestCounter = CountCalls()

pandas.Series.rolling(your_seriesOrDataframeColumn, window = your_window_size).apply(TestCounter.your_function)

print TestCounter.counter

假设你的函数 f 会返回一对值 v1 和 v2。那么你可以返回 v1,并把它赋值给你的数据框中的 column_v1。第二个值 v2,你可以在辅助类中用一个 Series 叫 series_val2 来累积。之后,你只需将这个 series 作为新列添加到你的数据框中。

撰写回答