当前位置:   article > 正文

浅析Python的Pipeline的原理与机制_python pipeline

python pipeline

Pipelines

背景

Pipelines直译过来就是管道,为什么要用呢?我认为就是在数据处理过程中,很多步骤都是重复或者类似的,比如特征选择处理、归一化、分类等等,pipeline可以帮助我们减少这些重复的内容,更加专注于选择组合而不用重复设计代码

接下来我们开始理解

Pipelines使用步骤

在sklearn中一个完整的Pipeline示例步骤如下:
(1)首先,对数据进行预处理,比如缺失值的处理;
(2)数据的标准化;
(3)降维;
(4)特征选择算法
(5)分类或者预测或者聚类算法(估计器,estimator)。
实际上,调用pipeline的fit方法,是用前n-1个变换器处理特征,之后传递给最后的评估器(estimator)进行训练。pipeline会继承最后一个评估器(estimator)的所有方法,输出一个模型出来。

Pipelines初步演示

我们参考一个例子,看下pipeline在只做【数据标准化→不降维→使用SVM模型】的情况
我们用的是一个数字分类的数据集

# 导入库
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt

# 导入数据
digits = datasets.load_digits()
X = digits.data
y = digits.target

# 数据可视化
plt.gray()
plt.matshow(digits.images[0])  # 显示图片
plt.show()

# 数据拆分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=33)

# 模型串联
pipeline = Pipeline([
    ('scaler', StandardScaler()),  # 数据标准化
    ('svm', SVC())  # SVM分类器
])

# 模型训练
pipeline.fit(X_train, y_train)

# 预测
y_pred = pipeline.predict(X_test)

# 结果分析
accuracy = accuracy_score(y_test, y_pred)
print("预测准确率:", accuracy)
report = classification_report(y_test, y_pred)
print("分类报告:\n", report)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

输出结果如下
在这里插入图片描述
我们看到了Pipeline实现了【数据标准化→不降维→使用SVM模型】的模型训练,并且拿训练完的模型去预测,获得一定的效果。
并且,我们可以进一步看到Pipeline会按顺序一步步执行管道里的内容
在这里插入图片描述

理解Pipeline源码

我们来看看源码,主要分为

  1. Pipeline类
  2. FeautreUnion类

piepline偏向于流程控制,FeautreUnion倾向于并发计算
我们先理解下Pipeline类在干什么

一、Pipeline类

Pipeline主要方法

Pipline的方法都是执行各个学习器中对应的方法,如果该学习器没有该方法,就会报错。
假设该Pipeline共有n个学习器: 首先假设:

pl=Pipeline([
    ('Normal',Normalizer()),
    ('PCA',PCA()),
    ('SVC',SVC())
])
  • 1
  • 2
  • 3
  • 4
  • 5

pl.transform:依次执行各个学习器的transform方法。
pl.fit:依次对前n-1个学习器执行fit和transform方法,第n个学习器(最后一个学习器)执行fit方法。
pl.predict:执行第n个学习器的predict方法。
pl.score:执行第n个学习器的score方法。
pl.set_params:设置第n个学习器的参数。
pl.get_param:获取第n个学习器的参数。

Pipelines原理与源码解析

Pipeline of transforms with a final estimator.

    Sequentially apply a list of transforms and a final estimator.
    Intermediate steps of the pipeline must be 'transforms', that is, they
    must implement fit and transform methods.
    The final estimator only needs to implement fit.
    The transformers in the pipeline can be cached using ``memory`` argument.

    The purpose of the pipeline is to assemble several steps that can be
    cross-validated together while setting different parameters.
    For this, it enables setting parameters of the various steps using their
    names and the parameter name separated by a '__', as in the example below.
    A step's estimator may be replaced entirely by setting the parameter
    with its name to another estimator, or a transformer removed by setting
    to None.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在这里插入图片描述

pipeline是以一个元组列表,每个元素为(key, value),其中 key 是你给这个步骤起的名字, value 是一个评估器对象,每一个可以称为一个步骤。

pipeline管道按顺序应用转换列表每个元素中对转换器,给出最后的末尾转换器。管道中前n-1的对象都会经历拟合(fit)和转换(transform),最后一个转换器只需要进行拟合操作

可以看到管道的最终目的是为了给出一个转换器,也就是我们所说的模型。通常我们都会把训练数据塞进管道,让训练数据经历上述的转换,最后得到一个训练完成的模型

首先来看Pipeline的初始化方法:

sklearn.pipeline.Pipeline(steps, memory=None, verbose=False)
  • 1

steps: 步骤,使用(key, value)列表来构建,其中 key 是你给这个步骤起的名字(可以随便起), value 是一个对象(也是评估器,Estimators)。
memory: 内存参数,当需要保存Pipeline中间的"transformer"时,才需要用到memory参数,默认None。

举个例子,当我们使用pipeline时,会这么用

pl_svm=Pipeline([
    # 要给每个步骤起名,然后再调用模型
    ('Normal',Normalizer()),
    ('PCA',PCA()),
    ('SVC',SVC())
])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

这里steps会存储我们例子中元组列表的内容

首先看Pipeline类的初始化方法

def __init__(self, steps, *, memory=None, verbose=False):
        self.steps = steps
        self.memory = memory
        self.verbose = verbose
        self._validate_steps()
  • 1
  • 2
  • 3
  • 4
  • 5
'
运行

接下来两个方法,
get_params:一个是用来获取评估器的参数的
set_params:一个是用来给评估器的参数赋值

def get_params(self, deep=True):
        """Get parameters for this estimator.
        Parameters
        ----------
        deep : bool, default=True
            If True, will return the parameters for this estimator and
            contained subobjects that are estimators.
        Returns
        -------
        params : mapping of string to any
            Parameter names mapped to their values.
        """
        return self._get_params('steps', deep=deep)


    def set_params(self, **kwargs):
        """Set the parameters of this estimator.
        Valid parameter keys can be listed with ``get_params()``.
        Returns
        -------
        self
        """
        self._set_params('steps', **kwargs)
        return self
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

下面是验证函数,主要干了以下几件事情

  1. 拆分列表里的名字与评估器,检查是否合法
  2. 规定前n-1个是转换器,要进行fit/ transform方法,第n个(最后一个)是评估器,只能 fit 方法
  3. 遍历每个转换器,检查它们是否具有fit或fit_transform和transform方法。如果没有这些方法,则会引发TypeError错误
  def _validate_steps(self):
       names, estimators = zip(*self.steps)

       # validate names
       self._validate_names(names)

       # validate estimators
       transformers = estimators[:-1]
       estimator = estimators[-1]

       for t in transformers:
           if t is None:
               continue
           if (not (hasattr(t, "fit") or hasattr(t, "fit_transform")) or not
                   hasattr(t, "transform")):
               raise TypeError("All intermediate steps should be "
                               "transformers and implement fit and transform."
                               " '%s' (type %s) doesn't" % (t, type(t)))

       # We allow last estimator to be None as an identity transformation
       if estimator is not None and not hasattr(estimator, "fit"):
           raise TypeError("Last step of Pipeline should implement fit. "
                           "'%s' (type %s) doesn't"
                           % (estimator, type(estimator)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
'
运行

这是比较关键的一部分,用来训练前n个转换器的接口

    # Estimator interface

    def _fit(self, X, y=None, **fit_params):
        # shallow copy of steps - this should really be steps_
        self.steps = list(self.steps) #得到管道中的转换器
        self._validate_steps()
        # Setup the memory
        memory = check_memory(self.memory) # memory 用来缓存数据

        fit_transform_one_cached = memory.cache(_fit_transform_one)
		# fit_params_steps这里用来以字典形式存储每个转换器中间产生的参数
        fit_params_steps = dict((name, {}) for name, step in self.steps
                                if step is not None)
        for pname, pval in six.iteritems(fit_params):
            step, param = pname.split('__', 1)
            fit_params_steps[step][param] = pval
        Xt = X
        for step_idx, (name, transformer) in enumerate(self.steps[:-1]):
            if transformer is None:
                pass
            else:
                if hasattr(memory, 'location'):
                    # joblib >= 0.12
                    if memory.location is None:
                        # we do not clone when caching is disabled to
                        # preserve backward compatibility
                        cloned_transformer = transformer
                    else:
                        cloned_transformer = clone(transformer)
                elif hasattr(memory, 'cachedir'):
                    # joblib < 0.11
                    if memory.cachedir is None:
                        # we do not clone when caching is disabled to
                        # preserve backward compatibility
                        cloned_transformer = transformer
                    else:
                        cloned_transformer = clone(transformer)
                else:
                    cloned_transformer = clone(transformer)
                # Fit or load from cache the current transfomer
                Xt, fitted_transformer = fit_transform_one_cached(
                    cloned_transformer, Xt, y, None,
                    **fit_params_steps[name])
                # Replace the transformer of the step with the fitted
                # transformer. This is necessary when loading the transformer
                # from the cache.
                self.steps[step_idx] = (name, fitted_transformer)
        if self._final_estimator is None:
            return Xt, {}
        return Xt, fit_params_steps[self.steps[-1][0]] # 返回最终转换出来的数据Xt和第n-1个转换器的参数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
def fit(self, X, y=None, **fit_params):
        """Fit the model

        Fit all the transforms one after the other and transform the
        data, then fit the transformed data using the final estimator.

        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.

        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.

        **fit_params : dict of string -> object
            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.

        Returns
        -------
        self : Pipeline
            This estimator
        """
        # 这里会调用_fit方法,拟合转换前n-1个转换器,拿到最终转换出来的数据Xt,和第n-1个转换器的参数
        Xt, fit_params = self._fit(X, y, **fit_params)
        if self._final_estimator is not None:
            self._final_estimator.fit(Xt, y, **fit_params)
        return self
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
'
运行

fit_transform 主要是执行最后一个模型的执行拟合转换操作
最终估计器的 fit 方法对 Xt 进行拟合,并再调用 transform 方法将 Xt 转换为结果

    def fit_transform(self, X, y=None, **fit_params):
        """Fit the model and transform with the final estimator

        Fits all the transforms one after the other and transforms the
        data, then uses fit_transform on transformed data with the final
        estimator.

        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of the
            pipeline.

        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps of
            the pipeline.

        **fit_params : dict of string -> object
            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.

        Returns
        -------
        Xt : array-like, shape = [n_samples, n_transformed_features]
            Transformed samples
        """
        last_step = self._final_estimator
        Xt, fit_params = self._fit(X, y, **fit_params)
        if hasattr(last_step, 'fit_transform'):
            return last_step.fit_transform(Xt, y, **fit_params)
        elif last_step is None:
            return Xt
        else:
            return last_step.fit(Xt, y, **fit_params).transform(Xt)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
'
运行

以下这些函数都是对模型进行一些其他操作,不感兴趣可以跳过,大概如下:

**predict 方法:**对数据 X 进行转换,并使用最终估计器进行预测。返回预测结果。

**fit_predict 方法:**将流水线中的转换应用于数据 X,然后应用最终估计器的 fit_predict 方法。返回预测结果。

**predict_proba 方法:**对数据 X 进行转换,并使用最终估计器的 predict_proba 方法进行预测。返回预测的概率值。

**decision_function 方法:**对数据 X 进行转换,并使用最终估计器的 decision_function 方法进行预测。返回决策函数的值。

**predict_log_proba 方法:**对数据 X 进行转换,并使用最终估计器的 predict_log_proba 方法进行预测。返回预测的对数概率值。

**transform 方法:**应用转换并使用最终估计器进行转换。

**inverse_transform 方法:**以相反的顺序应用逆转换。

**score 方法:**应用转换并使用最终估计器进行评分。

**classes_ 属性:**返回最终估计器的类标签。

** _pairwise 属性:** 检查第一个估计器是否需要成对输入。

    @if_delegate_has_method(delegate='_final_estimator')
    def predict(self, X, **predict_params):
        """Apply transforms to the data, and predict with the final estimator

        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step
            of the pipeline.

        **predict_params : dict of string -> object
            Parameters to the ``predict`` called at the end of all
            transformations in the pipeline. Note that while this may be
            used to return uncertainties from some models with return_std
            or return_cov, uncertainties that are generated by the
            transformations in the pipeline are not propagated to the
            final estimator.

        Returns
        -------
        y_pred : array-like
        """
        Xt = X
        for name, transform in self.steps[:-1]:
            if transform is not None:
                Xt = transform.transform(Xt)
        return self.steps[-1][-1].predict(Xt, **predict_params)

    @if_delegate_has_method(delegate='_final_estimator')
    def fit_predict(self, X, y=None, **fit_params):
        """Applies fit_predict of last step in pipeline after transforms.

        Applies fit_transforms of a pipeline to the data, followed by the
        fit_predict method of the final estimator in the pipeline. Valid
        only if the final estimator implements fit_predict.

        Parameters
        ----------
        X : iterable
            Training data. Must fulfill input requirements of first step of
            the pipeline.

        y : iterable, default=None
            Training targets. Must fulfill label requirements for all steps
            of the pipeline.

        **fit_params : dict of string -> object
            Parameters passed to the ``fit`` method of each step, where
            each parameter name is prefixed such that parameter ``p`` for step
            ``s`` has key ``s__p``.

        Returns
        -------
        y_pred : array-like
        """
        Xt, fit_params = self._fit(X, y, **fit_params)
        return self.steps[-1][-1].fit_predict(Xt, y, **fit_params)

    @if_delegate_has_method(delegate='_final_estimator')
    def predict_proba(self, X):
        """Apply transforms, and predict_proba of the final estimator

        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step
            of the pipeline.

        Returns
        -------
        y_proba : array-like, shape = [n_samples, n_classes]
        """
        Xt = X
        for name, transform in self.steps[:-1]:
            if transform is not None:
                Xt = transform.transform(Xt)
        return self.steps[-1][-1].predict_proba(Xt)

    @if_delegate_has_method(delegate='_final_estimator')
    def decision_function(self, X):
        """Apply transforms, and decision_function of the final estimator

        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step
            of the pipeline.

        Returns
        -------
        y_score : array-like, shape = [n_samples, n_classes]
        """
        Xt = X
        for name, transform in self.steps[:-1]:
            if transform is not None:
                Xt = transform.transform(Xt)
        return self.steps[-1][-1].decision_function(Xt)

    @if_delegate_has_method(delegate='_final_estimator')
    def predict_log_proba(self, X):
        """Apply transforms, and predict_log_proba of the final estimator

        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step
            of the pipeline.

        Returns
        -------
        y_score : array-like, shape = [n_samples, n_classes]
        """
        Xt = X
        for name, transform in self.steps[:-1]:
            if transform is not None:
                Xt = transform.transform(Xt)
        return self.steps[-1][-1].predict_log_proba(Xt)

    @property
    def transform(self):
        """Apply transforms, and transform with the final estimator

        This also works where final estimator is ``None``: all prior
        transformations are applied.

        Parameters
        ----------
        X : iterable
            Data to transform. Must fulfill input requirements of first step
            of the pipeline.

        Returns
        -------
        Xt : array-like, shape = [n_samples, n_transformed_features]
        """
        # _final_estimator is None or has transform, otherwise attribute error
        # XXX: Handling the None case means we can't use if_delegate_has_method
        if self._final_estimator is not None:
            self._final_estimator.transform
        return self._transform

    def _transform(self, X):
        Xt = X
        for name, transform in self.steps:
            if transform is not None:
                Xt = transform.transform(Xt)
        return Xt

    @property
    def inverse_transform(self):
        """Apply inverse transformations in reverse order

        All estimators in the pipeline must support ``inverse_transform``.

        Parameters
        ----------
        Xt : array-like, shape = [n_samples, n_transformed_features]
            Data samples, where ``n_samples`` is the number of samples and
            ``n_features`` is the number of features. Must fulfill
            input requirements of last step of pipeline's
            ``inverse_transform`` method.

        Returns
        -------
        Xt : array-like, shape = [n_samples, n_features]
        """
        # raise AttributeError if necessary for hasattr behaviour
        # XXX: Handling the None case means we can't use if_delegate_has_method
        for name, transform in self.steps:
            if transform is not None:
                transform.inverse_transform
        return self._inverse_transform

    def _inverse_transform(self, X):
        Xt = X
        for name, transform in self.steps[::-1]:
            if transform is not None:
                Xt = transform.inverse_transform(Xt)
        return Xt

    @if_delegate_has_method(delegate='_final_estimator')
    def score(self, X, y=None, sample_weight=None):
        """Apply transforms, and score with the final estimator

        Parameters
        ----------
        X : iterable
            Data to predict on. Must fulfill input requirements of first step
            of the pipeline.

        y : iterable, default=None
            Targets used for scoring. Must fulfill label requirements for all
            steps of the pipeline.

        sample_weight : array-like, default=None
            If not None, this argument is passed as ``sample_weight`` keyword
            argument to the ``score`` method of the final estimator.

        Returns
        -------
        score : float
        """
        Xt = X
        for name, transform in self.steps[:-1]:
            if transform is not None:
                Xt = transform.transform(Xt)
        score_params = {}
        if sample_weight is not None:
            score_params['sample_weight'] = sample_weight
        return self.steps[-1][-1].score(Xt, y, **score_params)

    @property
    def classes_(self):
        return self.steps[-1][-1].classes_

    @property
    def _pairwise(self):
        # check if first estimator expects pairwise input
        return getattr(self.steps[0][1], '_pairwise', False)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220

make_pipeline 是一种更方便的方法,可以自动为每个步骤指定一个名称,而 Pipeline 则需要手动指定每个步骤的名称。

def make_pipeline(*steps, **kwargs):
    """Construct a Pipeline from the given estimators.

    This is a shorthand for the Pipeline constructor; it does not require, and
    does not permit, naming the estimators. Instead, their names will be set
    to the lowercase of their types automatically.

    Parameters
    ----------
    *steps : list of estimators.

    memory : None, str or object with the joblib.Memory interface, optional
        Used to cache the fitted transformers of the pipeline. By default,
        no caching is performed. If a string is given, it is the path to
        the caching directory. Enabling caching triggers a clone of
        the transformers before fitting. Therefore, the transformer
        instance given to the pipeline cannot be inspected
        directly. Use the attribute ``named_steps`` or ``steps`` to
        inspect estimators within the pipeline. Caching the
        transformers is advantageous when fitting is time consuming.

    See also
    --------
    sklearn.pipeline.Pipeline : Class for creating a pipeline of
        transforms with a final estimator.

    Returns
    -------
    p : Pipeline
    """
    memory = kwargs.pop('memory', None)
    if kwargs:
        raise TypeError('Unknown keyword arguments: "{}"'
                        .format(list(kwargs.keys())[0]))
    return Pipeline(_name_estimators(steps), memory=memory)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
'
运行

二、FeatureUnion类

FeatureUnion对象实例也是使用(key, value)构成的list来构造,key是你自己起的transformation的名称,value是一个estimator对象。

该类属于pipeline,所以也同样是流水线作业,但区别在于该类是可以并行执行,可以认为每一道工序属于不同的流水线,可以同时执行。这里每一道工序都可以用来做特征处理,比如pca,svd等等,最后将特征组合在一起,然后输出处理后的特征数据。

FeatureUnion类也是跟Pipeline类一样,有get_parmas, set_params之类的方法,具体跟Pipeline一样,感兴趣可以去官网看 Pipeline源码,但由于FeatureUnion是并行处理的,所以他不会有单独留下最后一个评估器的内容

FeatureUnion与pipeline的区别

pipeline相当于feature串行处理后一个transformer处理前一个transformer的feature结果

featureunion相当于feature的并行处理,将所有transformer的处理结果拼接成大的feature vector

FeatureUnion源码

具体怎么用可以参考这篇内容,我认为featureunion主要可以将所有transformer的处理结果拼接成大的feature vector。
featureunion例子

首先看下FeatureUnion初始化参数

def __init__(self, transformer_list, n_jobs=None,
                 transformer_weights=None):
        self.transformer_list = transformer_list
        self.n_jobs = n_jobs
        self.transformer_weights = transformer_weights
        self._validate_transformers()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
'
运行
  1. transformer_list,和Pipeline中的流水线工序的命名方式一样,名字即为流水线的名称,后面是流水线的真正函数,可以自己预定义参数。

  2. n_jobs=None,由于各个工序可以并行执行,所以可以自定义执行所用的核数,如果使用全部的核,则令n_jobs=-1

  3. transformer_weights,是一个字典形式,为不同的特征赋予不同的权重,这个用处本人目前还不大理解

  4. verbose=False,和pipeline一样,如果为True,将显示每一道工序的执行时间。

这里也有通过Parallel进行并行处理的内容

def fit(self, X, y=None):
        """Fit all transformers using X.

        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data, used to fit transformers.

        y : array-like, shape (n_samples, ...), optional
            Targets for supervised learning.

        Returns
        -------
        self : FeatureUnion
            This estimator
        """
        self.transformer_list = list(self.transformer_list)
        self._validate_transformers()
        transformers = Parallel(n_jobs=self.n_jobs)(
            delayed(_fit_one_transformer)(trans, X, y)
            for _, trans, _ in self._iter())
        self._update_transformer_list(transformers)
        return self

    def fit_transform(self, X, y=None, **fit_params):
        """Fit all transformers, transform the data and concatenate results.

        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data to be transformed.

        y : array-like, shape (n_samples, ...), optional
            Targets for supervised learning.

        Returns
        -------
        X_t : array-like or sparse matrix, shape (n_samples, sum_n_components)
            hstack of results of transformers. sum_n_components is the
            sum of n_components (output dimension) over transformers.
        """
        self._validate_transformers()
        result = Parallel(n_jobs=self.n_jobs)(
            delayed(_fit_transform_one)(trans, X, y, weight,
                                        **fit_params)
            for name, trans, weight in self._iter())

        if not result:
            # All transformers are None
            return np.zeros((X.shape[0], 0))
        Xs, transformers = zip(*result)
        self._update_transformer_list(transformers)
        if any(sparse.issparse(f) for f in Xs):
            Xs = sparse.hstack(Xs).tocsr()
        else:
            Xs = np.hstack(Xs)
        return Xs

    def transform(self, X):
        """Transform X separately by each transformer, concatenate results.

        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data to be transformed.

        Returns
        -------
        X_t : array-like or sparse matrix, shape (n_samples, sum_n_components)
            hstack of results of transformers. sum_n_components is the
            sum of n_components (output dimension) over transformers.
        """
        Xs = Parallel(n_jobs=self.n_jobs)(
            delayed(_transform_one)(trans, X, None, weight)
            for name, trans, weight in self._iter())
        if not Xs:
            # All transformers are None
            return np.zeros((X.shape[0], 0))
        if any(sparse.issparse(f) for f in Xs):
            Xs = sparse.hstack(Xs).tocsr()
        else:
            Xs = np.hstack(Xs)
        return Xs
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

这里 make_union 和 make_pipeline 函数一样,可以更加简要的使用 FeatureUnion类

def make_union(*transformers, **kwargs):
    """Construct a FeatureUnion from the given transformers.

    This is a shorthand for the FeatureUnion constructor; it does not require,
    and does not permit, naming the transformers. Instead, they will be given
    names automatically based on their types. It also does not allow weighting.

    Parameters
    ----------
    *transformers : list of estimators

    n_jobs : int or None, optional (default=None)
        Number of jobs to run in parallel.
        ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
        ``-1`` means using all processors. See :term:`Glossary <n_jobs>`
        for more details.

    Returns
    -------
    f : FeatureUnion

    See also
    --------
    sklearn.pipeline.FeatureUnion : Class for concatenating the results
        of multiple transformer objects.

    Examples
    --------
    >>> from sklearn.decomposition import PCA, TruncatedSVD
    >>> from sklearn.pipeline import make_union
    >>> make_union(PCA(), TruncatedSVD())    # doctest: +NORMALIZE_WHITESPACE
    FeatureUnion(n_jobs=None,
           transformer_list=[('pca',
                              PCA(copy=True, iterated_power='auto',
                                  n_components=None, random_state=None,
                                  svd_solver='auto', tol=0.0, whiten=False)),
                             ('truncatedsvd',
                              TruncatedSVD(algorithm='randomized',
                              n_components=2, n_iter=5,
                              random_state=None, tol=0.0))],
           transformer_weights=None)
    """
    n_jobs = kwargs.pop('n_jobs', None)
    if kwargs:
        # We do not currently support `transformer_weights` as we may want to
        # change its type spec in make_union
        raise TypeError('Unknown keyword arguments: "{}"'
                        .format(list(kwargs.keys())[0]))
    return FeatureUnion(_name_estimators(transformers), n_jobs=n_jobs)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
'
运行

三、Pipeline和FeatureUnion怎么实现多进程

def fit(self, X, y=None):
        """Fit all transformers using X.

        Parameters
        ----------
        X : iterable or array-like, depending on transformers
            Input data, used to fit transformers.

        y : array-like, shape (n_samples, ...), optional
            Targets for supervised learning.

        Returns
        -------
        self : FeatureUnion
            This estimator
        """
        self.transformer_list = list(self.transformer_list)
        self._validate_transformers()
        transformers = Parallel(n_jobs=self.n_jobs)(
            delayed(_fit_one_transformer)(trans, X, y)
            for _, trans, _ in self._iter())
        self._update_transformer_list(transformers)
        return self
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
'
运行

在FeatureUnion类的源码中,多个地方用到了Parallel,通过开启多进程并行的计算对象的方法
一般就是

from joblib.parallel import Parallel,delayed
  • 1

这里,Parallel对象会创建一个进程池,以便在多进程中执行每一个transformer要干的事情。函数delayed是一个创建元组(function, args, kwargs)的简单技巧
默认情况下,Parallel使用Python的多进程模块(multiprocessing)来fork工作进程,以便任务可以在独立的CPU上同时执行

但Pipeline里面没有提到多进程相关内容,他是强调顺序执行一个流程,所以并没有需要多进程来处理

怎么让pipeline也实现多进程,可以结合GridSearchCV,在GridSearchCV里面,它可以通过Parallel开多进程的
具体可以参考GridSearchCV+pipeline例子

如果要实现多模型统一调参的时候,有自己重定义管道流的类来完成,具体可以参考这一篇

四、参考文章

https://blog.csdn.net/u010230273/article/details/98493025?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-98493025-blog-106208747.235%5Ev38%5Epc_relevant_anti_vip&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-98493025-blog-106208747.235%5Ev38%5Epc_relevant_anti_vip&utm_relevant_index=2

https://zhuanlan.zhihu.com/p/465229699

https://zhuanlan.zhihu.com/p/465229699

https://zhuanlan.zhihu.com/p/82008008

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/901838
推荐阅读
相关标签
  

闽ICP备14008679号