用于修改X和y的sklearn管道自定义转换器

31 投票
8 回答
21203 浏览
提问于 2025-04-18 18:47

我想为sklearn的Pipeline创建自己的转换器。

我正在创建一个类,这个类需要实现fit和transform这两个方法。这个转换器的目的是从矩阵中删除那些包含超过指定数量NaN(缺失值)的行。

我现在遇到的问题是我该如何同时修改传给转换器的X和y矩阵?

我觉得这应该在fit方法中完成,因为它可以访问到X和y。由于Python是通过赋值来传递参数的,一旦我把X重新赋值为一个行数更少的新矩阵,原来的X的引用就会丢失(y也是一样)。有没有办法保持这个引用呢?

我正在使用pandas的DataFrame来方便地删除那些NaN过多的行,但这可能不是我用例的最佳做法。现在的代码看起来是这样的:

class Dropna():

    # thresh is max number of NaNs allowed in a row
    def __init__(self, thresh=0):
        self.thresh = thresh

    def fit(self, X, y):
        total = X.shape[1]
        # +1 to account for 'y' being added to the dframe                                                                                                                            
        new_thresh = total + 1 - self.thresh
        df = pd.DataFrame(X)
        df['y'] = y
        df.dropna(thresh=new_thresh, inplace=True)
        X = df.drop('y', axis=1).values
        y = df['y'].values
        return self

    def transform(self, X):
        return X

8 个回答

1

你可以通过使用sklearn.preprocessing.FunctionTransformer这个方法来轻松解决这个问题(http://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.FunctionTransformer.html)。

你只需要把你对X的修改放在一个函数里。

def drop_nans(X, y=None):
    total = X.shape[1]                                           
    new_thresh = total - thresh
    df = pd.DataFrame(X)
    df.dropna(thresh=new_thresh, inplace=True)
    return df.values

然后你可以通过调用

transformer = FunctionTransformer(drop_nans, validate=False)

来获得你的转换器,这样你就可以在流程中使用它。阈值可以在drop_nans函数外部设置。

2

补充一下@João Matias的回答:

这里有一个使用imblearn的例子,展示了如何设置一个处理步骤,来删除那些有缺失值的行:

from imblearn import FunctionSampler
def drop_rows_with_any_nan(X, y):
    return X[~np.isnan(X).any(axis=1), :], y[~np.isnan(X).any(axis=1)]
drop_rows_with_any_nan_sampler = FunctionSampler(func=drop_rows_with_any_nan, validate=False)
model_clf2 = pipeline.Pipeline(
    [
        ('preprocess', column_transformer),
        ('drop_na', drop_rows_with_any_nan_sampler),
        ('smote', SMOTE()),
        ('xgb', xgboost.XGBClassifier()),
    ]
)

请注意,你需要使用imblearn的管道功能。

7

这个叫做 imblearn 的包是建立在 sklearn 之上的,它里面有一个叫 FunctionSampler 的工具,可以在处理数据的过程中同时调整特征数组 X 和目标数组 y

需要注意的是,如果你想在一个流程步骤中使用它,就得用 imblearn 里的 Pipeline 类,这个类是从 sklearn 的那个类继承来的。此外,默认情况下,在 Pipelineresample 方法如果不是在 fit 之后立刻调用(像 fit_resample 那样),就不会有任何作用。所以,最好提前看看相关的文档。

9

你需要对sklearn的Pipeline内部代码进行一些修改。

我们定义了一个转换器,它在训练时会去掉那些特征值或目标值为NaN的样本(使用fit_transform)。而在推理时,它会去掉那些特征值为NaN的样本(使用transform)。需要注意的是,我们的转换器在fit_transform中会返回X和y,所以我们需要在sklearn的Pipeline中处理这个行为。

class Dropna():

    def fit(self, X, y):
        return self

    def fit_transform(self, X, y):
        
        mask = (np.isnan(X).any(-1) | np.isnan(y))
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        if hasattr(y, 'loc'):
            y = y.loc[~mask]
        else:
            y = y[~mask]
        
        return X, y   ###### make fit_transform return X and y
    
    def transform(self, X):
        
        mask = np.isnan(X).any(-1)
        if hasattr(X, 'loc'):
            X = X.loc[~mask]
        else:
            X = X[~mask]
        
        return X

我们只需要在fit_fit方法的两个特定地方修改原来的sklearn Pipeline,其他部分保持不变。

from sklearn import pipeline
from sklearn.base import clone
from sklearn.utils import _print_elapsed_time
from sklearn.utils.validation import check_memory

class Pipeline(pipeline.Pipeline):

    def _fit(self, X, y=None, **fit_params_steps):
        self.steps = list(self.steps)
        self._validate_steps()
        memory = check_memory(self.memory)

        fit_transform_one_cached = memory.cache(pipeline._fit_transform_one)

        for (step_idx, name, transformer) in self._iter(
            with_final=False, filter_passthrough=False
        ):
                        
            if transformer is None or transformer == "passthrough":
                with _print_elapsed_time("Pipeline", self._log_message(step_idx)):
                    continue

            try:
                # joblib >= 0.12
                mem = memory.location
            except AttributeError:
                mem = memory.cachedir
            finally:
                cloned_transformer = clone(transformer) if mem else transformer

            X, fitted_transformer = fit_transform_one_cached(
                cloned_transformer,
                X,
                y,
                None,
                message_clsname="Pipeline",
                message=self._log_message(step_idx),
                **fit_params_steps[name],
            )
            
            if isinstance(X, tuple):    ###### unpack X if is tuple: X = (X,y)
                X, y = X
            
            self.steps[step_idx] = (name, fitted_transformer)
        
        return X, y
    
    def fit(self, X, y=None, **fit_params):
        fit_params_steps = self._check_fit_params(**fit_params)
        Xt = self._fit(X, y, **fit_params_steps)
        
        if isinstance(Xt, tuple):    ###### unpack X if is tuple: X = (X,y)
            Xt, y = Xt 
        
        with _print_elapsed_time("Pipeline", self._log_message(len(self.steps) - 1)):
            if self._final_estimator != "passthrough":
                fit_params_last_step = fit_params_steps[self.steps[-1][0]]
                self._final_estimator.fit(Xt, y, **fit_params_last_step)

        return self

这样做是为了能够将Dropna().fit_transform(X, y)生成的值拆分成新的Xy

下面是完整的管道工作示例:

from sklearn.linear_model import Ridge

X = np.random.uniform(0,1, (100,3))
y = np.random.uniform(0,1, (100,))
X[np.random.uniform(0,1, (100)) < 0.1] = np.nan
y[np.random.uniform(0,1, (100)) < 0.1] = np.nan

pipe = Pipeline([('dropna', Dropna()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

再试一次,增加一个中间的预处理步骤:

from sklearn.preprocessing import StandardScaler

pipe = Pipeline([('dropna', Dropna()), ('scaler', StandardScaler()), ('model', Ridge())])
pipe.fit(X, y)

pipe.predict(X).shape

根据需要,其他简单的修改也可以实现更复杂的行为。如果你对Pipeline().fit_transformPipeline().fit_predict感兴趣,也需要进行相同的更改。

17

修改样本的轴,比如说删除样本,这个操作目前还不符合scikit-learn的转换器API的要求。所以如果你需要这样做,最好在使用scikit-learn之前先处理好,作为预处理步骤。

现在的转换器API是用来把给定样本的特征转换成新的东西。这个过程中可能会隐含其他样本的信息,但样本本身是不会被删除的。

还有一种选择是尝试填补缺失的值。不过同样的,如果你需要删除样本,最好在使用scikit-learn之前先处理好,作为预处理。

撰写回答