从pandas.rolling_apply返回两个值
我正在使用 pandas.rolling_apply
来将数据拟合到一个分布上,并从中获取一个值,但我还需要它报告一个滚动的拟合优度(具体来说,是 p 值)。目前我这样做:
def func(sample):
fit = genextreme.fit(sample)
return genextreme.isf(0.9, *fit)
def p_value(sample):
fit = genextreme.fit(sample)
return kstest(sample, 'genextreme', fit)[1]
values = pd.rolling_apply(data, 30, func)
p_values = pd.rolling_apply(data, 30, p_value)
results = pd.DataFrame({'values': values, 'p_value': p_values})
问题是我有很多数据,而拟合函数的计算开销很大,所以我不想对每个样本调用两次这个函数。我更希望能这样做:
def func(sample):
fit = genextreme.fit(sample)
value = genextreme.isf(0.9, *fit)
p_value = kstest(sample, 'genextreme', fit)[1]
return {'value': value, 'p_value': p_value}
results = pd.rolling_apply(data, 30, func)
这里的结果是一个 DataFrame
,里面有两列。如果我尝试运行这个,我会遇到一个异常:
TypeError: a float is required
。请问这样做可能吗?如果可以的话,应该怎么做呢?
4 个回答
1
我也遇到过同样的问题。我是通过生成一个全局的数据框,然后从滚动函数中输入数据来解决的。在下面这个示例脚本中,我生成了一些随机的输入数据。接着,我用一个滚动应用函数来计算最小值、最大值和平均值。
import pandas as pd
import numpy as np
global outputDF
global index
def myFunction(array):
global index
global outputDF
# Some random operation
outputDF['min'][index] = np.nanmin(array)
outputDF['max'][index] = np.nanmax(array)
outputDF['mean'][index] = np.nanmean(array)
index += 1
# Returning a useless variable
return 0
if __name__ == "__main__":
global outputDF
global index
# A random window size
windowSize = 10
# Preparing some random input data
inputDF = pd.DataFrame({ 'randomValue': [np.nan] * 500 })
for i in range(len(inputDF)):
inputDF['randomValue'].values[i] = np.random.rand()
# Pre-Allocate memory
outputDF = pd.DataFrame({ 'min': [np.nan] * len(inputDF),
'max': [np.nan] * len(inputDF),
'mean': [np.nan] * len(inputDF)
})
# Precise the staring index (due to the window size)
d = (windowSize - 1) / 2
index = np.int(np.floor( d ) )
# Do the rolling apply here
inputDF['randomValue'].rolling(window=windowSize,center=True).apply(myFunction,args=())
assert index + np.int(np.ceil(d)) == len(inputDF), 'Length mismatch'
outputDF.set_index = inputDF.index
# Optional : Clean the nulls
outputDF.dropna(inplace=True)
print(outputDF)
2
我使用并喜欢@yi-yu的回答,所以我把它做得更通用了一些:
from collections import deque
from functools import partial
def make_class(func, dim_output):
class your_multi_output_function_class:
def __init__(self, func, dim_output):
assert dim_output >= 2
self.func = func
self.deques = {i: deque() for i in range(1, dim_output)}
def f0(self, *args, **kwargs):
k = self.func(*args, **kwargs)
for queue in sorted(self.deques):
self.deques[queue].append(k[queue])
return k[0]
def accessor(self, index, *args, **kwargs):
return self.deques[index].popleft()
klass = your_multi_output_function_class(func, dim_output)
for i in range(1, dim_output):
f = partial(accessor, klass, i)
setattr(klass, 'f' + str(i), f)
return klass
假设你有一个函数f
,它可以处理一个pandas的序列(可以是窗口形式,但不一定),并返回n
个值,你可以这样使用它:
rolling_func = make_class(f, n)
# dict to map the function's outputs to new columns. Eg:
agger = {'output_' + str(i): getattr(rolling_func, 'f' + str(i)) for i in range(n)}
windowed_series.agg(agger)
5
我之前也遇到过类似的问题。这是我解决这个问题的方法:
from collections import deque
class your_multi_output_function_class:
def __init__(self):
self.deque_2 = deque()
self.deque_3 = deque()
def f1(self, window):
self.k = somefunction(y)
self.deque_2.append(self.k[1])
self.deque_3.append(self.k[2])
return self.k[0]
def f2(self, window):
return self.deque_2.popleft()
def f3(self, window):
return self.deque_3.popleft()
func = your_multi_output_function_class()
output = your_pandas_object.rolling(window=10).agg(
{'a':func.f1,'b':func.f2,'c':func.f3}
)
5
我之前也遇到过类似的问题,后来通过在应用时使用一个单独的辅助类中的成员函数解决了。这个成员函数确实能返回一个值,但我把其他的计算结果存储为类的成员,这样之后就可以使用它们了。
简单的例子:
class CountCalls:
def __init__(self):
self.counter = 0
def your_function(self, window):
retval = f(window)
self.counter = self.counter + 1
TestCounter = CountCalls()
pandas.Series.rolling(your_seriesOrDataframeColumn, window = your_window_size).apply(TestCounter.your_function)
print TestCounter.counter
假设你的函数 f 会返回一对值 v1 和 v2。那么你可以返回 v1,并把它赋值给你的数据框中的 column_v1。第二个值 v2,你可以在辅助类中用一个 Series 叫 series_val2 来累积。之后,你只需将这个 series 作为新列添加到你的数据框中。