本文基于 Java

本文基于 slot

流水线模式是我个人最喜欢的设计模式,它可以帮我把复杂问题简单化,让复杂的流程变得更清晰易懂。

在诸多的优秀的开源项目中都能看到它的身影(如:Netty,Kafka),我觉得这个模式还是值得学习下的。

网上的 Pipeline 实现各异,这篇文章也只是我个人的一个实现思路,为各位读者抛砖引玉,拓展思路。

基本概念与实现

Pipeline 有三个基本的概念,分别是:Pipeline,Valve,Context,他们的关系大致如下所示:

+-------------------------------------------------------------------------------+
| Pipeline |
| +-----------+ +-----------+ +-----------+ +-----------+ |
| | | | | | | | | |
| | Valve1 +----^+ Valve2 +-----^+ Valve3 +--------> Valve4 | |
| | | | | | | | | |
| +-----------+ +-----------+ +-----------+ +-----------+ |
| Context |
+-------------------------------------------------------------------------------+

一条Pipeline有一个Context,多个Valve。这些Valve是很小的、单元化的,一个Valve只做一件简单的事。前后Valve之间的通信由Context来承载。但 Context 并不是必须的,如果你的需求足够简单的话,那么可以去除 Context。

Pipeline

以我的理解,Pipeline 是存放 Valve 的容器,其实现为单向链表或双向链表,Pipeline 驱动整个业务流程的运行,什么时候终止、什么时候添加新 Valve 由 Pipeline 说了算。

一般来说,Pipeline 作为一个 interface 存在的,其中定义了一些 Pipeline 的通用行为,具体的实现由各个业务逻辑去实现。一个典型的 Pipeline 接口声明如下:

public interface Pipeline {
// 初始化
void init(PipelineConfig config);
// 启动 Pipeline
void start();
// 获取 Pipeline 上下文
Context getContext();
}

但在 slot 做了魔改,slotPipeline 实现如下:

@Setter
@Getter
public class Pipeline<I> {
/**
* 管道名称
*/
private final String name;

/**
* 管道中第一个处理节点
*/
private Valve<I, ?> first;

/**
* 管道中最后一个处理节点
*/
private Valve<?, ?> tail;

private final PipelineContext context = new PipelineContext();

public Pipeline(String name) {
this.name = name;
}

/**
* 在管道的最后添加一个处理节点
*
* @param valve 处理节点
*/
public void addLast(Valve<?, ?> valve) {
final Valve<?, ?> t = this.tail;
tail = valve;
if (null == t) {
first = (Valve<I, ?>) valve;
} else {
t.setNext(valve);
valve.setPrev(t);
}
}

// 没实现,发现没有用到😅
public void linkAfter() {

}

// 唤醒第一个 Valve
public void start(I input) {
this.first.bootstrap(input, this.context);
}
}

整个业务流程的 Valve 被当作双向链表管理起来,有序的从第一个 Valve 节点开始被执行到最后一个 Valve。

slot 中 分别有 InitValve,BuildSlotClassValve,InstrumentationValveTransformClassValve

执行关系为 InitValve -> BuildSlotClassValve -> TransformClassValve -> InstrumentationValve,具体到代码层面 SlotAgentBootstrap 如下所示:

public static void premain(String slotConfFilePath, Instrumentation inst) {
final Properties init = init(slotConfFilePath);
AnsiLog.info("开始埋点...,埋点配置文件为 : {}", slotConfFilePath);
final Pipeline<Properties> pipeline = new Pipeline<>("slot");
pipeline.addLast(new InitValve());
pipeline.addLast(new BuildSlotClassValve());
pipeline.addLast(new TransformClassValve());
pipeline.addLast(new InstrumentationValve(inst));

pipeline.start(init);
}

由链表组织所有的 Valve 是非常灵活的,我们可以在复杂的业务系统中动态的组装 Pipeline,实现复杂的业务逻辑。slot 的业务相对稳定,变化相对较小,所以直接在代码里写死,显式调用。

我们也可以引入外部系统配置化,可以通过 json,xml 或数据库中的信息来描述各个 Valve 的依赖关系(当然这些 Valve 都是事先被实现好的),类似 Tomcat 的 Pipeline 一样,json 大致如下所示:

{
"scene_a": {
"valves": [
"checkOrder",
"checkPayment",
"checkDiscount",
"computeMount",
"payment",
"DeductInventory"
],
"config": {
"sendEmail": true,
"supportAlipay": true
}

}
}

Valve

Valve 是 Pipeline 中最小的处理单元,所有的业务逻辑都将在 Valve 中被实现。slotValve 实现如下所示:

public interface Valve<I, O> {
/**
* 初始化
*
* @param context 流水线上下文
*/
void init(PipelineContext context);

/**
* 处理业务逻辑
*
* @param input 输入的数据类型
* @param context 管道上线文
* @return 输出
*/
O process(I input, PipelineContext context);

/**
* 获取下一个节点
*
* @return 下一个处理节点
*/
Valve<?, ?> getNext();

/**
* 设置下一个处理节点
*
* @param next 下一个处理节点
*/
void setNext(Valve<?, ?> next);

/**
* 设置上一个处理节点
*
* @param prev 上一个梳理节点
*/
void setPrev(Valve<?, ?> prev);

/**
* 获取上一个处理节点
*
* @return 处理节点
*/
Valve<?, ?> getPrev();

/**
* 唤起执行节点
*
* @param input 输出
* @param context 流水线上下文
*/
void bootstrap(I input, PipelineContext context);

void setName(String name);

String getName();
}

不难看出,一个 Valve 有输入和输出,例如:BValve 依赖 AValve,那么 AValve 的输出就是 BValve 的输入,这样就可以将整个业务流程给串联起来。

Context

存放了整个 Pipeline 的必要的运行信息,类似于缓存一样的存在,Pipeline 中的任何 Valve 都可以向其中存放和读取数据。Context 没有特定的实现方式,可以按照自己的喜好去实现。在 slotPipelineContext 使用 LRU 来实现,具体如下:

public class PipelineContext {
// LRU
private static final Map<String, Object> CACHE = new LinkedHashMap<>(16, 0.75F, true);

public synchronized void cache(String key, Object value) {
CACHE.put(key, value);
}

public synchronized Object getByKey(String key) {
return CACHE.get(key);
}
}

拓展

策略模式

我们在某个Valve,可能会需要根据不同的业务线,有不同的逻辑。比如同一个文本,有些是发邮件,有些是发短信,有些是发钉钉。那这个时候就可以在配置里面写上当前这个业务线要发送的渠道,然后在Valve里面通过策略模式去决定使用什么渠道发送,这样就不用在Valve里面写死很多if-else,以后很好扩展。

并行执行、

我们在前面看到Valve都是链式一个一个执行的。但有时候可能多个Valve彼此之间并不依赖,可以同时并行地去跑。比如发消息,可能多个Valve并行地去发。

这个时候我们可以把Pipeline改造一下,就像Jenkins设计Pipeline那样,把一个完整的Pipeline分成Phase、Stage、Step等,我们可以对某个Phase或者某个Step设置成可以并行执行的。这需要另外写一个并行执行的Pipeline,用CountDownLatch等工具来等待所有Valve执行完,往下走。

使用 ThreadLocal

理论上来说,我们在任何地方,都应该使用Context来在整个Pipeline中传递数据。但Context有时候使用起来相对比较麻烦。比如我们在Valve内部抽私有方法的时候,可能要经常把Context作为方法参数传进去,用起来并不是特别方便。而Valve应该是无状态的,不适合把Context放在Valve里面作为属性。

这个时候我们可以借助ThreadLocal来代替Context的作用,Valve通过使用ThreadLocal来存取数据。但使用ThreadLocal有三个需要注意的点。

如果你的Pipeline是要支持并行的,在并行Valve里面就不适合使用ThreadLocal。

使用ThreadLocal要记得在最后阶段Clear,避免影响当前线程下次执行Pipeline。

不要把零散的一个个属性放进ThreadLocal,因为同一种类型,一个线程只能在一个ThreadLocal里面放一个值。而我们的上下文可能会有多个String、boolean等值。如果使用ThreadLocal,可以把所有属性都包成一个Context类,放进ThreadLocal。

组合

虽然我们说一个Valve只做一件简单的事。但这是相对于整个流程来说的。有时候太过细化也不好,不方便管理。正确的做法应该是做好抽象和分组。比如我们会有一个“校验”阶段,就不用把具体每个字段的校验都单独抽成Valve放进主流程。我们可以就在主流程放一个“校验”的Valve,然后在这个“校验”的Valve里面专门生成一条“校验Pipeline”。这样主流程也比较清晰明了,每个Valve的职责也比较清晰。

注意,子Pipeline应该有它单独的Context,但是它同时也应该具有主Pipeline的Context,是不是应该通过继承来实现?

工厂方法模板

Pipeline的优势在于通过配置化来灵活地实现不同的业务走不同的流程。实现统一化和差异化的完美结合。我们通过读取配置,生成一条Pipeline的时候,用工厂模式来实现比较好。
先读取当前的业务线,然后通过这个业务线和配置,完成整条Pipeline的实例化和装配,可以通过调用一个Pipeline Factory来实现。

Pipeline pipeline = PipelineFactory.create(pipelineConfig);
pipeline.start();

树与图

上面我们介绍的Pipeline,本质上是一个链。但如果往更通用(同时也更复杂)的方向去设计,它还可以做成一个图或者树。

假设我们在某个环节有一个条件分支,通过当时的context里面的数据状态,来判断下一步要走哪个Valve,形成一个树。最后可能又归拢到一个Valve,那就形成了一个图。

树和图会显著增加Pipeline的复杂度,需要配合上可视化配置,才能发挥出它的威力,但成本相对较高,除非真的业务非常多且复杂,不然不是很推荐。

树和图的Pipeline也需要结合数据结构专门设计Pipeline,节点如何往下走。

缺点

Pipeline设计模式很强大,但它也有很明显的缺点。

  • 第一个缺点是可读性不强。因为它是可配置化的,且配置经常在外部(比如数据库里的一个JSON)。所以可读性不好。尤其是我们在读 Valve 代码的时候,「如果不对照配置,其实是不知道它的前后调用关系的」。

  • 第二个缺点是Pipeline之间传递数据是通过Context,而不是简单的函数调用。所以一条Pipeline是有状态的,而且「方法调用内部修改Context」,而不是通过返回值,是有副作用的。

适用场景

Pipeline的本质是「数据结构和设计模式的灵活应用」,来应对流程复杂多变的业务场景。它并不是一个新的东西,也不是一个固定的设计模式,而应该是一种灵活的设计思想。

当然了,它也不是银弹,不能解决所有问题。还是要「合适」才行。