CategoryResourceRepost/极客时间专栏/左耳听风/Go语言编程模式/114 | Go编程模式:Pipeline.md
louzefeng d3828a7aee mod
2024-07-11 05:50:32 +00:00

268 lines
7.4 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<audio id="audio" title="114 | Go编程模式Pipeline" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/0c/ac/0cd21eaac338025c39cc4a0e01068aac.mp3"></audio>
你好,我是陈皓,网名左耳朵耗子。
这节课我着重介绍一下Go编程中的Pipeline模式。对于Pipeline用过Unix/Linux命令行的人都不会陌生**它是一种把各种命令拼接起来完成一个更强功能的技术方法**。
现在的流式处理、函数式编程、应用网关对微服务进行简单的API编排其实都是受Pipeline这种技术方式的影响。Pipeline可以很容易地把代码按单一职责的原则拆分成多个高内聚低耦合的小模块然后轻松地把它们拼装起来去完成比较复杂的功能。
## HTTP 处理
这种Pipeline的模式我在[上节课](https://time.geekbang.org/column/article/332608)中有过一个示例,我们再复习一下。
上节课,我们有很多 `WithServerHead()` 、`WithBasicAuth()` 、`WithDebugLog()`这样的小功能代码在需要实现某个HTTP API 的时候,我们就可以很轻松地把它们组织起来。
原来的代码是下面这个样子:
```
http.HandleFunc(&quot;/v1/hello&quot;, WithServerHeader(WithAuthCookie(hello)))
http.HandleFunc(&quot;/v2/hello&quot;, WithServerHeader(WithBasicAuth(hello)))
http.HandleFunc(&quot;/v3/hello&quot;, WithServerHeader(WithBasicAuth(WithDebugLog(hello))))
```
通过一个代理函数:
```
type HttpHandlerDecorator func(http.HandlerFunc) http.HandlerFunc
func Handler(h http.HandlerFunc, decors ...HttpHandlerDecorator) http.HandlerFunc {
for i := range decors {
d := decors[len(decors)-1-i] // iterate in reverse
h = d(h)
}
return h
}
```
我们就可以移除不断的嵌套,像下面这样使用了:
```
http.HandleFunc(&quot;/v4/hello&quot;, Handler(hello,
WithServerHeader, WithBasicAuth, WithDebugLog))
```
## Channel 管理
当然,如果你要写出一个[泛型的Pipeline框架](https://coolshell.cn/articles/17929.html#%E6%B3%9B%E5%9E%8B%E7%9A%84%E4%BF%AE%E9%A5%B0%E5%99%A8)并不容易,可以使用[Go Generation](https://coolshell.cn/articles/21179.html)实现但是我们别忘了Go语言最具特色的 Go Routine 和 Channel 这两个神器完全可以用来构造这种编程。
Rob Pike在 [Go Concurrency Patterns: Pipelines and cancellation](https://blog.golang.org/pipelines) 这篇博客中介绍了一种编程模式,下面我们来学习下。
### Channel转发函数
首先,我们需要一个 `echo()`函数它会把一个整型数组放到一个Channel中并返回这个Channel。
```
func echo(nums []int) &lt;-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out &lt;- n
}
close(out)
}()
return out
}
```
然后,我们依照这个模式,就可以写下下面的函数。
### 平方函数
```
func sq(in &lt;-chan int) &lt;-chan int {
out := make(chan int)
go func() {
for n := range in {
out &lt;- n * n
}
close(out)
}()
return out
}
```
### 过滤奇数函数
```
func odd(in &lt;-chan int) &lt;-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 != 0 {
out &lt;- n
}
}
close(out)
}()
return out
}
```
### 求和函数
```
func sum(in &lt;-chan int) &lt;-chan int {
out := make(chan int)
go func() {
var sum = 0
for n := range in {
sum += n
}
out &lt;- sum
close(out)
}()
return out
}
```
用户端的代码如下所示你可能会觉得sum()odd() 和 sq()太过于相似其实你可以通过Map/Reduce编程模式或者是Go Generation的方式合并一下
```
var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for n := range sum(sq(odd(echo(nums)))) {
fmt.Println(n)
}
```
上面的代码类似于我们执行了Unix/Linux命令 `echo $nums | sq | sum`。同样,如果你不想有那么多的函数嵌套,就可以使用一个代理函数来完成。
```
type EchoFunc func ([]int) (&lt;- chan int)
type PipeFunc func (&lt;- chan int) (&lt;- chan int)
func pipeline(nums []int, echo EchoFunc, pipeFns ... PipeFunc) &lt;- chan int {
ch := echo(nums)
for i := range pipeFns {
ch = pipeFns[i](ch)
}
return ch
}
```
然后,就可以这样做了:
```
var nums = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for n := range pipeline(nums, gen, odd, sq, sum) {
fmt.Println(n)
}
```
## Fan in/Out
**动用Go语言的 Go Routine和 Channel还有一个好处就是可以写出1对多或多对1的Pipeline也就是Fan In/ Fan Out**。下面我们来看一个Fan in的示例。
假设我们要通过并发的方式对一个很长的数组中的质数进行求和运算,我们想先把数组分段求和,然后再把它们集中起来。
下面是我们的主函数:
```
func makeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}
func main() {
nums := makeRange(1, 10000)
in := echo(nums)
const nProcess = 5
var chans [nProcess]&lt;-chan int
for i := range chans {
chans[i] = sum(prime(in))
}
for n := range sum(merge(chans[:])) {
fmt.Println(n)
}
}
```
再看我们的 `prime()` 函数的实现
```
func is_prime(value int) bool {
for i := 2; i &lt;= int(math.Floor(float64(value) / 2)); i++ {
if value%i == 0 {
return false
}
}
return value &gt; 1
}
func prime(in &lt;-chan int) &lt;-chan int {
out := make(chan int)
go func () {
for n := range in {
if is_prime(n) {
out &lt;- n
}
}
close(out)
}()
return out
}
```
我来简单解释下这段代码。
- 首先我们制造了从1到10000的数组
- 然后,把这堆数组全部 `echo`到一个Channel里—— `in`
- 此时,生成 5 个 Channel接着都调用 `sum(prime(in))` 于是每个Sum的Go Routine都会开始计算和
- 最后,再把所有的结果再求和拼起来,得到最终的结果。
其中的merge代码如下
```
func merge(cs []&lt;-chan int) &lt;-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(c &lt;-chan int) {
for n := range c {
out &lt;- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
```
整个程序的结构如下图所示:
<img src="https://static001.geekbang.org/resource/image/f9/b3/f9d2b599620d5bc191194ff239f0a1b3.jpg" alt="">
## 参考文档
如果你还想了解更多类似的与并发相关的技术,我再给你推荐一些资源:
- [Go Concurrency Patterns Rob Pike 2012 Google I/O presents the basics of Gos concurrency primitives and several ways to apply them.](https://www.youtube.com/watch?v=f6kdp27TYZs)
<li>[Advanced Go Concurrency Patterns Rob Pike 2013 Google I/O](https://blog.golang.org/advanced-go-concurrency-patterns)<br>
[covers more complex uses of Gos primitives, especially select.](https://blog.golang.org/advanced-go-concurrency-patterns)</li>
<li>[Squinting at Power Series Douglas McIlroy's paper](https://swtch.com/~rsc/thread/squint.pdf)<br>
[shows how Go-like concurrency provides elegant support for complex calculations.](https://swtch.com/~rsc/thread/squint.pdf)</li>
好了,这节课就到这里。如果你觉得今天的内容对你有所帮助,欢迎你帮我分享给更多人。