常见并发模式

本文同时提供以下语言的翻译:English

Go语言的并发模式是其最具特色的编程范式之一。本文总结了在Go开发中最常见的几种并发模式,包括请求-接收模式、单一状态者模式、生产-消费模式、Pipeline流水线模式等。这些模式能够帮助我们更好地组织和管理goroutine,使并发程序更加健壮和优雅。通过这些模式的学习,你将能够更好地掌握Go语言的并发特性,写出更高质量的并发代码。

极简的请求-接收模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 请求函数根据参数返回接收器
func Glob(pattern string) <-chan Item {
c := make(chan Item) // 1.初始化接收器

// 2.匿名 G 闭包执行程序
go func() {
defer close(c)
for name, item := range items {
if ok, _ := filepath.Match(pattern, name); !ok {
continue
}
c <- item
}
}()

// 3. 快速返回接收器以便后面的程序从chan中读
return c
}

func main() {
for item := range Glob("[ab]*") {
fmt.Println(item)
}
}

单一状态者

一个 G 持有一个全局的状态,其他的 G 可以通过 chan 访问到 G 持有的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
reads := make(chan *readOp)
writes := make(chan *writeOp)

// 状态持有者
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
// 返回结果
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
// 返回结果
write.resp <- true
}
}
}()

// 请求者
go func() {
for {
read := &readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
}
}()

go func() {
for {
write := &writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
}
}()

先生产,后消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 先生产,已知生产个数
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}

consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
}

results := chanOwner()
// 消费
consumer(results)

消费者通知生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done:
// 停止生产,退出
return
}
}
}()

return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
// 消费者主动要求生产停止
close(done)

多 channel 合并使得安全退出(OR channel)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { 1
switch len(channels) {
case 0: 2
return nil
case 1: 3
return channels[0]
}

orDone := make(chan interface{})
go func() {
defer close(orDone)

switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sig := func(after time.Duration) <-chan interface{}{
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}

start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))

只要有其中一个 chan 退出,其他的 chan 都退出

当需要将某一函数并发执行时,可以一律将返回值打包成结构体使用 chan 传递

例如:并发 http.Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 把返回值统一成 result 结构体
type Result struct {
Error error
Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)

for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- result:
}
}
}()
return results
}
1
2
3
4
5
6
7
8
9
10
11
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}

同时,这个模式也启发我们在容错方面可以引入一个程序状态的观察者,它能够同时控制程序的走向,在上述例子中,我们默认那个观察者是 main

错误控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
if errCount >= 3 {
// 当错误过多的时候,跳出并终止请求 G
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}

流水线 Pipeline 模式

将生产过程进行拆分,解耦,多个 G 之间互相协同,但每个 G 之间又相互独立
能进行拆分的前提:不同的阶段处理的都是同一类型的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}

multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}

add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+additive:
}
}
}()
return addedStream
}

// 每个阶段都可以使用 done channel 安全退出 goroutine
done := make(chan interface{})
defer close(done)

// 每个阶段都返回要处理的数据类型的 channel
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
fmt.Println(v)
}

多个 goroutine 通过多个同类型的 channel 和共享一个 done channel 联系在一起

生成器模式

无限重复生成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
// 死循环不断重复发送 slice 的内容直到发送信号停止
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}

上述模式可以产生一个生成流,下面我们来从这个流中读取

从流中按需读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <- valueStream:
}
}
}()
return takeStream
}

done := make(chan interface{})
defer close(done)

// 从流中截取前 10 个元素然后退出
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}

worker pool 协程池模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
worker := 8
c := make(chan int, l)

var wg sync.WaitGroup
wg.Add(worker)

for i:=0;i<worker;i++ {
go func(){
for row := range c {
for col := range m.m[row] {
fillPixel(m, row, col)
}
}
}()
}

for row := range m.m {
c <- row
}
close(c)

pub-sub 发布订阅模式

每一个订阅者都会持有一个 chan
每个订阅者会有一个过滤器来进行内容的过滤
过滤器通常是一个 bool 函数值类型
发布者使用 map 存储订阅者 chan 到过滤函数的映射
每次发布的时候都遍历 map 然后使用对应的过滤器进行过滤后投递即可

事件驱动模式

常用于在一些操作执行的过程中触发其他操作例如通知
这些事件的处理操作需要事先注册,然后在合适的时间触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package events

import (
"time"
)

var UserCreated userCreated

// 定义上下文荷载
type UserCreatedPayload struct {
Email string
Time time.Time
}

// 带荷载上下文的函数链
type userCreated struct {
handlers []interface{ Handle(UserCreatedPayload) }
}

// Register 将时间的处理函数添加进切片中
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
u.handlers = append(u.handlers, handler)
}

// Trigger 依次触发 goroutine 来执行带上下文的处理函数
func (u userCreated) Trigger(payload UserCreatedPayload) {
for _, handler := range u.handlers {
go handler.Handle(payload)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"time"

"github.com/stephenafamo/demo/events"
)

func init() {
createNotifier := userCreatedNotifier{
adminEmail: "[email protected]",
slackHook: "https://hooks.slack.com/services/...",
}

// 注册处理回调
events.UserCreated.Register(createNotifier)
}

type userCreatedNotifier struct{
adminEmail string
slackHook string
}

func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
// ...
}

func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
// ...
}

// 只要符合处理函数的签名就可以自定义处理函数的内部实现
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
// Do something with our payload
u.notifyAdmin(payload.Email, payload.Time)
u.sendToSlack(payload.Email, payload.Time)
}

触发事件处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package auth

import (
"time"

"github.com/stephenafamo/demo/events"
// Other imported packages
)

func CreateUser() {
// ...
// 解耦之后只需要传入需要的载荷即可,不用关心具体的处理逻辑
events.UserCreated.Trigger(events.UserCreatedPayload{
Email: "[email protected]",
Time: time.Now(),
})
// ...
}
作者

马克鱼

发布于

2019-09-22

更新于

2025-10-12

许可协议