当前位置:   article > 正文

Pipeline 设计模式的优缺点和实践案例_pipeline设计模式

pipeline设计模式

作者:明明如月学长, CSDN 博客专家,蚂蚁集团高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《EffectiveJava》独家解析》专栏作者。

热门文章推荐

一、概述

之前我们在《Java 中的 Pipeline 设计模式》 一文中介绍了 Pipeline 设计模式。其核心思想是创建一组操作(管道)并将数据在这些操作中传递,每个操作可以独立工作,也可以支持同时处理多个数据流。
pipeline.png

有同学提到几个不错的相关问题,本文简单探讨下。

  • (1)例子中 Pipeline 的代码使用硬编码也可以实现,为什么要用这个模式,有什么好处?
  • (2)Pipeline 设计模式在实际的编码中是怎样体现的?
  • (3)Pipeline 设计模式有什么缺点?如何解决?

二、答疑

2.1 为什么要用 Pipeline 设计模式,而不是硬编码?

“为什么用 XXX 设计模式,而不是硬编码?” 这个问题对其他设计模式都适用。
这个问题分为两个维度来回答:

  • (1) 该设计模式有什么优点?
  • (2) 该设计模式的适用场景有哪些?

2.1.1 Pipeline 设计模式 的优点

Pipeline 设计模式的优点主要有以下三点:降低耦合、增加灵活性、提高性能。

  • (1)降低耦合度
    高内聚、弱耦合:Pipeline 设计模式将不同的处理逻辑封装成独立的阶段,每个阶段只关注自己的输入和输出,不需要知道其他阶段的细节。这样可以方便地增加、删除或修改阶段,而不影响整个流程的运行。
    排查问题方便:对于适合该设计模式相对复杂或较长的代码,如果不采用 Pipeline 设计模式,直接编码,中间某个步骤出错时,通常还需要理解上下文才敢动手修改;采用 Pipeline 设计模式之后,由于不同的步骤之间解耦,出错只需要关注该步骤即可。
    可测试性强:由于不同的步骤之间相对独立,耦合较低,更符合单一职责原则,可以更方便地对每个步骤编写单测;编写单测时,更容易全面地覆盖代码逻辑。
  • (2)增加灵活性。Pipeline设计模式可以通过配置化来实现不同的业务走不同的流程,而不需要修改代码。这样可以根据需求变化快速地调整流程,提高开发效率和可维护性。
  • (3)提高性能。Pipeline设计模式可以利用多线程或异步机制来并行执行不同的阶段,从而提高整个流程的吞吐量和响应时间。

如果采用硬编码的方式来实现类似的功能:代码之间的耦合度更高,排查问题需要读懂更多代码;编写高质量的单测也很困难;无法灵活地实现不同步骤之间的组合复用;虽然有些步骤也可以自己使用线程池等方式实现异步化,但这种能力不能复用,换个场景又要自己写一遍。

2.1.2 Pipeline 设计模式的常见场景

一般来说,某个处理流程可以拆分成多个处理步骤,不同的步骤之间相对独立,数据在不同的步骤之间传递,可以通过特定编排来完成一个复杂的任务,此时可以考虑使用 Pipeline 设计模式。

下面给出一些常见的场景:

  • (1)数据处理:当需要对大量数据进行处理时,通常需要将处理过程分为多个阶段。例如,数据清洗、转换、归一化、特征提取等阶段都可以作为 Pipeline 中的一部分。

  • (2)图像处理:在图像处理中,需要对图像进行多个处理阶段,例如颜色空间转换、滤波、边缘检测、特征提取等。这些处理步骤可以被组合成一个 Pipeline,以便可以轻松地处理整个图像数据集。

  • (3)构建 DevOps 流水线:在软件开发过程中,需要对代码进行多个处理阶段,例如代码编译、单元测试、代码分析、代码部署等。这些步骤可以被组合成一个 Pipeline,以构成整个开发过程。

2.2 实际工作中怎样落地?

可能很多人觉得上面讲的 Pipeline 设计模式场景不够接地气,那么实际工作中 Pipeline 有哪些常见的落地方式?

2.2.1 Java Function API

我们可以使用 Function 来实现简单易用的 Pipeline。
示例代码:

Function<Integer, Integer> square = s -> s * s;
    Function<Integer, Integer> half = s -> s / 2;
    Function<Integer, String> toString = Object::toString;
    Function<Integer, String> pipeline = square.andThen(half)
        .andThen(toString);
    String result = pipeline.apply(5);

    String expected = "12";
    assertEquals(expected, result);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

我们可以使用 BiFunction 拓展 Function 的功能,支持两个对象转为一个对象。
示例代码:

    BiFunction<Integer, Integer, Integer> add = Integer::sum;
    BiFunction<Integer, Integer, Integer> mul = (a, b) -> a * b;
    Function<Integer, String> toString = Object::toString;
    BiFunction<Integer, Integer, String> pipeline = add.andThen(a -> mul.apply(a, 2))
        .andThen(toString);
    String result = pipeline.apply(1, 2);
    String expected = "6";
    assertEquals(expected, result);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.2.2 Java Stream API

Java Stream API 就是一种典型的流水线落地方式。
下面是一个简单的Java Stream的示例代码,它使用了 filtermapcollect操作,从一个字符串列表中筛选出以字母"A"开头的字符串,并转换为大写,然后收集到一个新的列表中。

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class StreamExample {

    public static void main(String[] args) {
        // Create a list of strings
        List<String> list = Arrays.asList("Apple", "Banana", "Orange", "Pear", "Avocado");

        // Create a stream from the list
        // Filter the strings that start with "A"
        // Map the strings to upper case
        // Collect the results into a new list
        List<String> result = list.stream()
                .filter(s -> s.startsWith("A"))
                .map(s -> s.toUpperCase())
                .collect(Collectors.toList());

        // Print the result
        System.out.println(result); // [APPLE, AVOCADO]
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

日常开发中,通常查询出底层数据,通过筛选和映射,转为所需的结构。由于 Java 的 Stream 比较简单和常用,在这里就不作过多陈述。

2.2.3 业务编排

比如你工作中有一个这种需求:需要做一个物料(新闻资讯、短视频等)推荐系统,有以下几个步骤:物料召回(根据业务需求从 MySQL 、ES 或二方接口中查询候选物料)、黑名单过滤(有些物料不允许透出)、观看记录过滤(观看过的不能透出,需要过滤掉)、按类型粗排(同个类目或主题只能保留 M 个)、算法精排(调用算法系统进行打分)、业务置顶(根据业务需要对某些物料置顶)、按 size 截断(返回请求所需的 size)等步骤。

伪代码如下:

// 定义一个Pipeline接口,表示一个流水线
public interface Pipeline<T> {
    // 添加一个阶段到流水线
    void addStage(Stage<T> stage);
    // 执行流水线
    void execute(T input);
}

// 定义一个Stage接口,表示一个阶段
public interface Stage<T> {
    // 处理输入数据,并返回输出数据
    T process(T input);
}

// 定义一个PipelineContext类,表示流水线的上下文
public class PipelineContext<T> {
    // 存储流水线的阶段列表
    private List<Stage<T>> stages;
    // 存储流水线的当前索引
    private int index;

    public PipelineContext() {
        stages = new ArrayList<>();
        index = 0;
    }

    // 添加一个阶段到上下文
    public void addStage(Stage<T> stage) {
        stages.add(stage);
    }

    // 执行上下文中的下一个阶段
    public void invokeNext(T input) {
        if (index < stages.size()) {
            Stage<T> stage = stages.get(index++);
            stage.process(input);

         }
    }
}     


// 定义一个RecContext类,表示推荐的上下文
public class RecContext<T> {
    // 存储推荐中的物料列表
    private List<T> items; 

    // 其他属性

    public PipelineContext() {
        items = new ArrayList<>();
    }

    // 省略其他方法
}     


// 定义一个DefaultPipeline类,实现Pipeline接口
public class DefaultPipeline<T> implements Pipeline<T> {
    // 创建一个PipelineContext对象
    private PipelineContext<T> context;

    public DefaultPipeline() {
        context = new PipelineContext<>();
    }

    @Override
    public void addStage(Stage<T> stage) {
        context.addStage(stage);
    }

    @Override
    public void execute(T input) {
        context.invokeNext(input);
    }
}

// 定义一个物料类,表示推荐系统的输入和输出数据
public class Material {
    // 物料的id
    private String id;
    
    // 物料的类型(资讯、视频等)
    private String type;
    
    // 物料的评分(算法精排后的结果)
    private double score;
    
    // 省略构造方法、getters和setters

}


// 定义一个物料召回阶段类,实现Stage接口
public class MaterialRecallStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的兴趣、行为等特征,从物料库(如 MySQL、Es存储或二方接口)中召回一批候选物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个黑名单过滤阶段类,实现Stage接口
public class BlacklistFilterStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的黑名单设置,过滤掉不符合条件的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}


// 定义一个观看记录过滤阶段类,实现Stage接口
public class WatchRecordFilterStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的观看记录,过滤掉已经观看过的物料,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个按类型粗排阶段类,实现Stage接口
public class TypeSortStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的偏好和物料的类型,按照一定的规则对物料进行粗排,并设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    }
}

// 定义一个算法精排阶段类,实现Stage接口
public class AlgorithmSortStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据用户的特征和物料的特征,使用机器学习模型对物料进行打分,排序后设置到 context 的 items中
       
        // 省略具体实现细节
        return context;
    }
}

// 定义一个业务置顶阶段类,实现Stage接口
public class BusinessTopStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据业务的需求,对部分物料进行置顶操作,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个按size截断阶段类,实现Stage接口
public class SizeCutStage implements Stage<RecContext<Material>> {

    @Override
    public RecContext<Material> process(RecContext<Material> context) {
        // 根据请求中的 size 数量,对物料的数量进行截断,并设置到 context 的 items中
        
        // 省略具体实现细节
        return context;
    }
}

// 定义一个测试类,用来创建和执行流水线
public class Test {

    public static void main(String[] args) {
        // 创建一个物料对象,作为流水线的输入数据
        RecContext<Material> recContext = new RecContext<Material>();
        
        // 创建一个流水线对象
        Pipeline<RecContext<Material>> pipeline = new DefaultPipeline<>();
        
        // 添加各个阶段到流水线中
        pipeline.addStage(new MaterialRecallStage());
        pipeline.addStage(new BlacklistFilterStage());
        pipeline.addStage(new WatchRecordFilterStage());
        pipeline.addStage(new TypeSortStage());
        pipeline.addStage(new AlgorithmSortStage());
        pipeline.addStage(new BusinessTopStage());
        pipeline.addStage(new SizeCutStage());
        
        // 执行流水线
        pipeline.execute(recContext);
        
        // 输出流水线的结果
        System.out.println(material);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205

每个流程(Stage)可以配置为 Spring 的 Bean, 流程的编排可以使用动态配置进行控制,这样就可以比较灵活调整。
比如粗排、置顶等步骤有多种方式可选,可以根据业务需要通过修改动态配置进行替换。
还可以自研框架或者自己编码去解析这些步骤,可以让某些步骤并行执行,比如将上述 Stage 的 bean 的 name 进行动态配置,其中中括号的部分解析后并行执行来提高性能:

[videoRecall, newsRecall,topicRecal],blacklist,
    recordFilter,typeSorce,algorithmSort,businessTop,sizeCut
  • 1
  • 2

希望这个例子,能够帮助大家更好地理解 Pipeline 设计模式的优点:不同的步骤可以相互独立降低耦合,灵活组合复用,部分步骤之间可以采用并行/并发执行的方式来提高性能等

2.3 Pipeline 模式有哪些缺点?

每种设计模式都有自己的局限性,下面给出 Pipeline 设计模式的几个缺点:

  • (1)可读性不强。因为 Pipeline 设计模式是可配置化的,且配置经常在外部(比如数据库里的一个 JSON 、携程的 Apollo 动态配置),所以不容易看出整个流程的逻辑和细节。
  • (2)调试困难。因为 Pipeline 设计模式涉及多个阶段的协作,如果某个阶段出现问题,不容易快速定位和修复。
  • (3)性能损耗。因为 Pipeline 设计模式需要在每个阶段之间传递数据,如果每个阶段是跨机器的,会增加内存和网络的开销。

当然这些缺点也是可以通过一些技巧解决的:

  • (1)针对可读性不强的问题,我们可以在请求的入口处贴出配置的地址,方便代码和配置关联。由于每个步骤非常独立,做好每个步骤的代码可读性也可以在一定程度上解决问题。
  • (2)针对调试和排查问题困难的问题。使用 Pipeline 设计模式时,我们可以对关键地方打好日志,方便快速定位、排查问题。
  • (3)针对性能损耗的问题。我们可以通过一些调整提高性能,比如上述物料推荐业务而言,需要调用算法平台的服务去打分,我们可以在打分前进行粗排,只将粗排分数较高的传给算法平台,用户和物料特征不需要传递给算法平台,算法平台自己去查询相关物料和用户特征再打分等。有些内存占用不管是采用哪种方式都是不可避免,不必纠结。

三、Pipeline 设计模式和责任链模式的区别

Pipeline 设计模式和责任链模式都是用于处理一系列相互关联的任务或操作的设计模式,它们的主要区别在于它们的处理方式和结构不同

Pipeline 设计模式通常是一个线性的流程,每个步骤都是独立的,且每个步骤都会处理整个数据集,而且每个步骤的输出作为下一个步骤的输入。这个模式通常用于数据处理流程,例如 ETL(Extract, Transform, Load)过程。

责任链模式则更加灵活,每个步骤都可以处理数据并且可以根据需要选择性地将数据传递给下一个步骤。每个步骤都有一个处理程序,如果这个步骤可以处理数据,则处理程序会处理该数据并将其传递给下一个步骤,否则就将其传递给下一个步骤。这种模式通常用于处理请求和命令,例如Web请求和异常处理。

因此,Pipeline 设计模式更适合线性处理的流程,而责任链模式更适合灵活的流程,能够根据条件决定是否需要继续处理数据

四、总结

学习的目的还是为了应用,大家在学习设计模式时,要主动和 JDK 源码,和自己使用的二方和三方框架的设计相结合,要主动和日常的业务场景相结合,以便更好的做到学以致用
每种设计模式都有自己的适用场景、优点和缺点,我们要注重掌握,并且不仅要了解某种设计模式存在的问题,还要积极思考如何解决。
通常来说,对于非常简单的场景,直接编码即可;对于复杂场景,建议优先考虑遵循设计原则,使用经典的设计模式,以提高代码的可重用性、可读性、灵活性、可拓展性、安全性和降低代码复杂度等。


创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/1004066
推荐阅读
相关标签
  

闽ICP备14008679号