用于修改X和y的sklearn管道自定义转换器
我想为sklearn的Pipeline
创建自己的转换器。
我正在创建一个类,这个类需要实现fit和transform这两个方法。这个转换器的目的是从矩阵中删除那些包含超过指定数量NaN(缺失值)的行。
我现在遇到的问题是我该如何同时修改传给转换器的X和y矩阵?
我觉得这应该在fit方法中完成,因为它可以访问到X和y。由于Python是通过赋值来传递参数的,一旦我把X重新赋值为一个行数更少的新矩阵,原来的X的引用就会丢失(y也是一样)。有没有办法保持这个引用呢?
我正在使用pandas的DataFrame来方便地删除那些NaN过多的行,但这可能不是我用例的最佳做法。现在的代码看起来是这样的:
class Dropna():
# thresh is max number of NaNs allowed in a row
def __init__(self, thresh=0):
self.thresh = thresh
def fit(self, X, y):
total = X.shape[1]
# +1 to account for 'y' being added to the dframe
new_thresh = total + 1 - self.thresh
df = pd.DataFrame(X)
df['y'] = y
df.dropna(thresh=new_thresh, inplace=True)
X = df.drop('y', axis=1).values
y = df['y'].values
return self
def transform(self, X):
return X
8 个回答
你可以通过使用sklearn.preprocessing.FunctionTransformer这个方法来轻松解决这个问题(http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)。
你只需要把你对X的修改放在一个函数里。
def drop_nans(X, y=None):
total = X.shape[1]
new_thresh = total - thresh
df = pd.DataFrame(X)
df.dropna(thresh=new_thresh, inplace=True)
return df.values
然后你可以通过调用
transformer = FunctionTransformer(drop_nans, validate=False)
来获得你的转换器,这样你就可以在流程中使用它。阈值可以在drop_nans函数外部设置。
补充一下@João Matias的回答:
这里有一个使用imblearn的例子,展示了如何设置一个处理步骤,来删除那些有缺失值的行:
from imblearn import FunctionSampler
def drop_rows_with_any_nan(X, y):
return X[~np.isnan(X).any(axis=1), :], y[~np.isnan(X).any(axis=1)]
drop_rows_with_any_nan_sampler = FunctionSampler(func=drop_rows_with_any_nan, validate=False)
model_clf2 = pipeline.Pipeline(
[
('preprocess', column_transformer),
('drop_na', drop_rows_with_any_nan_sampler),
('smote', SMOTE()),
('xgb', xgboost.XGBClassifier()),
]
)
请注意,你需要使用imblearn的管道功能。
这个叫做 imblearn
的包是建立在 sklearn
之上的,它里面有一个叫 FunctionSampler 的工具,可以在处理数据的过程中同时调整特征数组 X
和目标数组 y
。
需要注意的是,如果你想在一个流程步骤中使用它,就得用 imblearn
里的 Pipeline
类,这个类是从 sklearn
的那个类继承来的。此外,默认情况下,在 Pipeline
resample 方法如果不是在 fit
之后立刻调用(像 fit_resample
那样),就不会有任何作用。所以,最好提前看看相关的文档。
你需要对sklearn的Pipeline
内部代码进行一些修改。
我们定义了一个转换器,它在训练时会去掉那些特征值或目标值为NaN的样本(使用fit_transform
)。而在推理时,它会去掉那些特征值为NaN的样本(使用transform
)。需要注意的是,我们的转换器在fit_transform
中会返回X和y,所以我们需要在sklearn的Pipeline
中处理这个行为。
class Dropna():
def fit(self, X, y):
return self
def fit_transform(self, X, y):
mask = (np.isnan(X).any(-1) | np.isnan(y))
if hasattr(X, 'loc'):
X = X.loc[~mask]
else:
X = X[~mask]
if hasattr(y, 'loc'):
y = y.loc[~mask]
else:
y = y[~mask]
return X, y ###### make fit_transform return X and y
def transform(self, X):
mask = np.isnan(X).any(-1)
if hasattr(X, 'loc'):
X = X.loc[~mask]
else:
X = X[~mask]
return X
我们只需要在fit
和_fit
方法的两个特定地方修改原来的sklearn Pipeline
,其他部分保持不变。
from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory
class Pipeline(pipeline.Pipeline):
def _fit(self, X, y=None, **fit_params_steps):
self.steps = list(self.steps)
self._validate_steps()
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)
for (step_idx, name, transformer) in self._iter(
with_final=False, filter_passthrough=False
):
if transformer is None or transformer == "passthrough":
with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
continue
try:
# joblib >= 0.12
mem = memory.location
except AttributeError:
mem = memory.cachedir
finally:
cloned_transformer = clone(transformer) if mem else transformer
X, fitted_transformer = fit_transform_one_cached(
cloned_transformer,
X,
y,
None,
message_clsname="Pipeline",
message=self._log_message(step_idx),
**fit_params_steps[name],
)
if isinstance(X, tuple): ###### unpack X if is tuple: X = (X,y)
X, y = X
self.steps[step_idx] = (name, fitted_transformer)
return X, y
def fit(self, X, y=None, **fit_params):
fit_params_steps = self._check_fit_params(**fit_params)
Xt = self._fit(X, y, **fit_params_steps)
if isinstance(Xt, tuple): ###### unpack X if is tuple: X = (X,y)
Xt, y = Xt
with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
if self._final_estimator != "passthrough":
fit_params_last_step = fit_params_steps[self.steps[-1][0]]
self._final_estimator.fit(Xt, y, **fit_params_last_step)
return self
这样做是为了能够将Dropna().fit_transform(X, y)
生成的值拆分成新的X
和y
。
下面是完整的管道工作示例:
from sklearn.linear_model import Ridge
X = np.random.uniform(0,1, (100,3))
y = np.random.uniform(0,1, (100,))
X[np.random.uniform(0,1, (100)) < 0.1] = np.nan
y[np.random.uniform(0,1, (100)) < 0.1] = np.nan
pipe = Pipeline([('dropna', Dropna()), ('model', Ridge())])
pipe.fit(X, y)
pipe.predict(X).shape
再试一次,增加一个中间的预处理步骤:
from sklearn.preprocessing import StandardScaler
pipe = Pipeline([('dropna', Dropna()), ('scaler', StandardScaler()), ('model', Ridge())])
pipe.fit(X, y)
pipe.predict(X).shape
根据需要,其他简单的修改也可以实现更复杂的行为。如果你对Pipeline().fit_transform
或Pipeline().fit_predict
感兴趣,也需要进行相同的更改。
修改样本的轴,比如说删除样本,这个操作目前还不符合scikit-learn的转换器API的要求。所以如果你需要这样做,最好在使用scikit-learn之前先处理好,作为预处理步骤。
现在的转换器API是用来把给定样本的特征转换成新的东西。这个过程中可能会隐含其他样本的信息,但样本本身是不会被删除的。
还有一种选择是尝试填补缺失的值。不过同样的,如果你需要删除样本,最好在使用scikit-learn之前先处理好,作为预处理。