pipeline设计模式

概念

管道模式用于将复杂的进程分解成多个独立的子任务。每个独立的任务都是可复用的,因此这些任务可以被组合成复杂的进程。

这种模式允许你讲庞大的进程分解成更小的子任务,这些子任务将数据进行处理并将处理后的结果传递给下一个子任务。就像流水线一样,有条不紊,从原料加工到成品,实现一道完整的工序。

管道中的每一个任务都会接受并返回同一类型的数据,这样子任务可以在管道中被添加、移除或者替换,而不影响其它子任务。

Laravel 在框架中的很多地方使用了管道设计模式,最常见的就是中间件的实现

当请求最终到达控制器动作被处理前,会先经过一系列的中间件。每个中间价都有一个独立的职责,例如,设置 Cookie、判断是否登录以及阻止 CSRF 攻击等等。

每个阶段都会对请求进行处理,如果请求通过就会被传递给下一个处理,不通过就会返回相应的 HTTP 响应。

这种机制使得我们很容易在请求最终到达应用代码前添加处理操作,当然如果不需要这个处理操作你也可以随时移除而不影响请求的生命周期。

管道模式的优点

首先,将复杂的处理流程分解成独立的子任务,从而方便测试每个子任务;

其次,被分解的子任务可以被不同的处理进程复用,避免代码冗余。

最后,在复杂进程中添加、移除和替换子任务非常轻松,对已存在的进程没有任何影响。

管道模式的缺点

虽然每个子任务变得简单了,但是当你再度尝试将这些子任务组合成完整进程时有一定复杂性;

此外你还需要保证独立子任务测试通过后整体的流程能正常工作,这有一定的不确定性。

最后,当你看到的都是一个个子任务时,对理解整体流程带来困难(盲人摸象的故事想必大家很熟悉,正是此理)

如何使用Laravel的管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//使用 Laravel 提供的管道很简单,首先需要创建一个新的 Illuminate\Pipeline\Pipeline 对象,并将其注入到某个 Illuminate\Contracts\Container\Container 的实例:
$pipeline = app('Illuminate\Pipeline\Pipeline');
//接下来将你想要传递的对象发送这个管道:
$pipeline->send($request);
//然后将其传递到接受并处理请求的任务数组:
$pipeline->through($middleware);
//最后运行管道任务并编写回调处理:
$pipeline->then(function ($request) {
// Do something
});
这就是中间件的基本工作原理:接收 HTTP 请求,让请求经过定义好的路由中间件,最后到达目的地进行处理。

举个例子,追查一下调用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use Illuminate\Pipeline\Pipeline;
$p1 = function($input,Closure $next){
echo "执行p1\n";
$input = $input + 1;
return $next($input);
};
$p2 = function($input,Closure $next){
echo "执行p2\n";
$result = $next($input);
echo "再次执行p2\n";
$result = $result + 1;
return $result;
};
$p3 = function($input,Closure $next){
echo "执行p3\n";
$input = $input + 1;
return $next($input);
};
$input = 0;
(new Pipeline())->send($input)->through($pipelines)->then(function($input){
echo "result is ".$input."\n";
});
结果:
执行p1
执行p2
执行p3
result is 2
再次执行p2
//顺着开始捋,先是创建pipeline
public function __construct(Container $container = null)
{
$this->container = $container;
}
//接着是设定整个流水线的输入
public function send($passable)
{
//$passable是0
$this->passable = $passable;
return $this;
}
//接着传入各个处理管道
public function through($pipes)
{
//管道内容[$p1,$p2,$p3]
$this->pipes = is_array($pipes) ? $pipes : func_get_args();
return $this;
}
//最后是重头戏,管道开始运行
public function then(Closure $destination)
{
$pipeline = array_reduce(
array_reverse($this->pipes), $this->carry(), $this->prepareDestination($destination)
);
return $pipeline($this->passable);
}
//再看$this->carry()
protected function carry()
{
return function ($stack, $pipe) {
//stack和$pipe都作为闭包的环境保存
return function ($passable) use ($stack, $pipe) {
return $pipe($passable, $stack);
};
};
}
//$this->prepareDestination($destination)返回最后处理的闭包函数
protected function prepareDestination(Closure $destination)
{
return function ($passable) use ($destination) {
return $destination($passable);
};
}
$this->prepareDestination($destination)认为是$default
$stack $pipe
array_reduce第一次执行时 $default $p3
array_reduce第二次执行时 $p3_mix $p2
array_reduce第三次执行时 $p2_mix $p1
得到$pipeline = $p1_result
//解析最后这部分
$pipeline($this->passable)
这段在此就是$p3_result(0)
等同于
function (0) use ($p2_mix, $p1) {
return $p1(0, $p2_mix);
}
(p1处理)=>
function($p1_result) use ($p3_mix,$p2){
return $p2($p1_result,$p3_mix);
}
(p2处理)=>
function($p2_result) use ($default,$p3){
return $p3($p2_result,$default);
}
(p3处理)=>
function($p3_result) use ($destination){
$destination($p3_result)
}
=>
function($p3_result){
echo "result is ".$p3_result."\n";
}()
在此等同于p1(p2(p3(default()))),结果为2
注意p2的实现,以上执行完成后再次回到p2,$result = 2,最终返回3
$p2 = function($input,Closure $next){
echo "执行p2\n";
$result = $next($input);
echo "再次执行p2\n";
$result = $result + 1;
return $next($result);
};

更简单易懂的pipeline模式实现thephpleague/pipeline

参考

Laravel pipeline组件的实现
Laravel 中管道设计模式的使用
Laravel 5.1 源码阅读笔记
Laravel Pipeline 组件的实现