赞
踩
作者:明明如月学长, CSDN 博客专家,蚂蚁集团高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《EffectiveJava》独家解析》专栏作者。
热门文章推荐:
之前我们在《Java 中的 Pipeline 设计模式》 一文中介绍了 Pipeline 设计模式。其核心思想是创建一组操作(管道)并将数据在这些操作中传递,每个操作可以独立工作,也可以支持同时处理多个数据流。
有同学提到几个不错的相关问题,本文简单探讨下。
“为什么用 XXX 设计模式,而不是硬编码?” 这个问题对其他设计模式都适用。
这个问题分为两个维度来回答:
Pipeline 设计模式的优点主要有以下三点:降低耦合、增加灵活性、提高性能。
如果采用硬编码的方式来实现类似的功能:代码之间的耦合度更高,排查问题需要读懂更多代码;编写高质量的单测也很困难;无法灵活地实现不同步骤之间的组合复用;虽然有些步骤也可以自己使用线程池等方式实现异步化,但这种能力不能复用,换个场景又要自己写一遍。
一般来说,某个处理流程可以拆分成多个处理步骤,不同的步骤之间相对独立,数据在不同的步骤之间传递,可以通过特定编排来完成一个复杂的任务,此时可以考虑使用 Pipeline 设计模式。
下面给出一些常见的场景:
(1)数据处理:当需要对大量数据进行处理时,通常需要将处理过程分为多个阶段。例如,数据清洗、转换、归一化、特征提取等阶段都可以作为 Pipeline 中的一部分。
(2)图像处理:在图像处理中,需要对图像进行多个处理阶段,例如颜色空间转换、滤波、边缘检测、特征提取等。这些处理步骤可以被组合成一个 Pipeline,以便可以轻松地处理整个图像数据集。
(3)构建 DevOps 流水线:在软件开发过程中,需要对代码进行多个处理阶段,例如代码编译、单元测试、代码分析、代码部署等。这些步骤可以被组合成一个 Pipeline,以构成整个开发过程。
可能很多人觉得上面讲的 Pipeline 设计模式场景不够接地气,那么实际工作中 Pipeline 有哪些常见的落地方式?
我们可以使用 Function
来实现简单易用的 Pipeline。
示例代码:
Function<Integer, Integer> square = s -> s * s;
Function<Integer, Integer> half = s -> s / 2;
Function<Integer, String> toString = Object::toString;
Function<Integer, String> pipeline = square.andThen(half)
.andThen(toString);
String result = pipeline.apply(5);
String expected = "12";
assertEquals(expected, result);
我们可以使用 BiFunction
拓展 Function
的功能,支持两个对象转为一个对象。
示例代码:
BiFunction<Integer, Integer, Integer> add = Integer::sum;
BiFunction<Integer, Integer, Integer> mul = (a, b) -> a * b;
Function<Integer, String> toString = Object::toString;
BiFunction<Integer, Integer, String> pipeline = add.andThen(a -> mul.apply(a, 2))
.andThen(toString);
String result = pipeline.apply(1, 2);
String expected = "6";
assertEquals(expected, result);
Java Stream API 就是一种典型的流水线落地方式。
下面是一个简单的Java Stream
的示例代码,它使用了 filter
、map
和 collect
操作,从一个字符串列表中筛选出以字母"A"开头的字符串,并转换为大写,然后收集到一个新的列表中。
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class StreamExample {
public static void main(String[] args) {
// Create a list of strings
List<String> list = Arrays.asList("Apple", "Banana", "Orange", "Pear", "Avocado");
// Create a stream from the list
// Filter the strings that start with "A"
// Map the strings to upper case
// Collect the results into a new list
List<String> result = list.stream()
.filter(s -> s.startsWith("A"))
.map(s -> s.toUpperCase())
.collect(Collectors.toList());
// Print the result
System.out.println(result); // [APPLE, AVOCADO]
}
}
日常开发中,通常查询出底层数据,通过筛选和映射,转为所需的结构。由于 Java 的 Stream 比较简单和常用,在这里就不作过多陈述。
比如你工作中有一个这种需求:需要做一个物料(新闻资讯、短视频等)推荐系统,有以下几个步骤:物料召回(根据业务需求从 MySQL 、ES 或二方接口中查询候选物料)、黑名单过滤(有些物料不允许透出)、观看记录过滤(观看过的不能透出,需要过滤掉)、按类型粗排(同个类目或主题只能保留 M 个)、算法精排(调用算法系统进行打分)、业务置顶(根据业务需要对某些物料置顶)、按 size 截断(返回请求所需的 size)等步骤。
伪代码如下:
// 定义一个Pipeline接口,表示一个流水线
public interface Pipeline<T> {
// 添加一个阶段到流水线
void addStage(Stage<T> stage);
// 执行流水线
void execute(T input);
}
// 定义一个Stage接口,表示一个阶段
public interface Stage<T> {
// 处理输入数据,并返回输出数据
T process(T input);
}
// 定义一个PipelineContext类,表示流水线的上下文
public class PipelineContext<T> {
// 存储流水线的阶段列表
private List<Stage<T>> stages;
// 存储流水线的当前索引
private int index;
public PipelineContext() {
stages = new ArrayList<>();
index = 0;
}
// 添加一个阶段到上下文
public void addStage(Stage<T> stage) {
stages.add(stage);
}
// 执行上下文中的下一个阶段
public void invokeNext(T input) {
if (index < stages.size()) {
Stage<T> stage = stages.get(index++);
stage.process(input);
}
}
}
// 定义一个RecContext类,表示推荐的上下文
public class RecContext<T> {
// 存储推荐中的物料列表
private List<T> items;
// 其他属性
public PipelineContext() {
items = new ArrayList<>();
}
// 省略其他方法
}
// 定义一个DefaultPipeline类,实现Pipeline接口
public class DefaultPipeline<T> implements Pipeline<T> {
// 创建一个PipelineContext对象
private PipelineContext<T> context;
public DefaultPipeline() {
context = new PipelineContext<>();
}
@Override
public void addStage(Stage<T> stage) {
context.addStage(stage);
}
@Override
public void execute(T input) {
context.invokeNext(input);
}
}
// 定义一个物料类,表示推荐系统的输入和输出数据
public class Material {
// 物料的id
private String id;
// 物料的类型(资讯、视频等)
private String type;
// 物料的评分(算法精排后的结果)
private double score;
// 省略构造方法、getters和setters
}
// 定义一个物料召回阶段类,实现Stage接口
public class MaterialRecallStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据用户的兴趣、行为等特征,从物料库(如 MySQL、Es存储或二方接口)中召回一批候选物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个黑名单过滤阶段类,实现Stage接口
public class BlacklistFilterStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据用户的黑名单设置,过滤掉不符合条件的物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个观看记录过滤阶段类,实现Stage接口
public class WatchRecordFilterStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据用户的观看记录,过滤掉已经观看过的物料,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个按类型粗排阶段类,实现Stage接口
public class TypeSortStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据用户的偏好和物料的类型,按照一定的规则对物料进行粗排,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个算法精排阶段类,实现Stage接口
public class AlgorithmSortStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据用户的特征和物料的特征,使用机器学习模型对物料进行打分,排序后设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个业务置顶阶段类,实现Stage接口
public class BusinessTopStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据业务的需求,对部分物料进行置顶操作,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个按size截断阶段类,实现Stage接口
public class SizeCutStage implements Stage<RecContext<Material>> {
@Override
public RecContext<Material> process(RecContext<Material> context) {
// 根据请求中的 size 数量,对物料的数量进行截断,并设置到 context 的 items中
// 省略具体实现细节
return context;
}
}
// 定义一个测试类,用来创建和执行流水线
public class Test {
public static void main(String[] args) {
// 创建一个物料对象,作为流水线的输入数据
RecContext<Material> recContext = new RecContext<Material>();
// 创建一个流水线对象
Pipeline<RecContext<Material>> pipeline = new DefaultPipeline<>();
// 添加各个阶段到流水线中
pipeline.addStage(new MaterialRecallStage());
pipeline.addStage(new BlacklistFilterStage());
pipeline.addStage(new WatchRecordFilterStage());
pipeline.addStage(new TypeSortStage());
pipeline.addStage(new AlgorithmSortStage());
pipeline.addStage(new BusinessTopStage());
pipeline.addStage(new SizeCutStage());
// 执行流水线
pipeline.execute(recContext);
// 输出流水线的结果
System.out.println(material);
}
}
每个流程(Stage)可以配置为 Spring 的 Bean, 流程的编排可以使用动态配置进行控制,这样就可以比较灵活调整。
比如粗排、置顶等步骤有多种方式可选,可以根据业务需要通过修改动态配置进行替换。
还可以自研框架或者自己编码去解析这些步骤,可以让某些步骤并行执行,比如将上述 Stage 的 bean 的 name 进行动态配置,其中中括号的部分解析后并行执行来提高性能:
[videoRecall, newsRecall,topicRecal],blacklist,
recordFilter,typeSorce,algorithmSort,businessTop,sizeCut
希望这个例子,能够帮助大家更好地理解 Pipeline 设计模式的优点:不同的步骤可以相互独立降低耦合,灵活组合复用,部分步骤之间可以采用并行/并发执行的方式来提高性能等。
每种设计模式都有自己的局限性,下面给出 Pipeline 设计模式的几个缺点:
当然这些缺点也是可以通过一些技巧解决的:
Pipeline 设计模式和责任链模式都是用于处理一系列相互关联的任务或操作的设计模式,它们的主要区别在于它们的处理方式和结构不同。
Pipeline 设计模式通常是一个线性的流程,每个步骤都是独立的,且每个步骤都会处理整个数据集,而且每个步骤的输出作为下一个步骤的输入。这个模式通常用于数据处理流程,例如 ETL(Extract, Transform, Load)过程。
责任链模式则更加灵活,每个步骤都可以处理数据并且可以根据需要选择性地将数据传递给下一个步骤。每个步骤都有一个处理程序,如果这个步骤可以处理数据,则处理程序会处理该数据并将其传递给下一个步骤,否则就将其传递给下一个步骤。这种模式通常用于处理请求和命令,例如Web请求和异常处理。
因此,Pipeline 设计模式更适合线性处理的流程,而责任链模式更适合灵活的流程,能够根据条件决定是否需要继续处理数据。
学习的目的还是为了应用,大家在学习设计模式时,要主动和 JDK 源码,和自己使用的二方和三方框架的设计相结合,要主动和日常的业务场景相结合,以便更好的做到学以致用。
每种设计模式都有自己的适用场景、优点和缺点,我们要注重掌握,并且不仅要了解某种设计模式存在的问题,还要积极思考如何解决。
通常来说,对于非常简单的场景,直接编码即可;对于复杂场景,建议优先考虑遵循设计原则,使用经典的设计模式,以提高代码的可重用性、可读性、灵活性、可拓展性、安全性和降低代码复杂度等。
创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。