Go语言的并发模式是其最具特色的编程范式之一。本文总结了在Go开发中最常见的几种并发模式,包括请求-接收模式、单一状态者模式、生产-消费模式、Pipeline流水线模式等。这些模式能够帮助我们更好地组织和管理goroutine,使并发程序更加健壮和优雅。通过这些模式的学习,你将能够更好地掌握Go语言的并发特性,写出更高质量的并发代码。
极简的请求-接收模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func Glob (pattern string ) <-chan Item { c := make (chan Item) go func () { defer close (c) for name, item := range items { if ok, _ := filepath.Match(pattern, name); !ok { continue } c <- item } }() return c } func main () { for item := range Glob("[ab]*" ) { fmt.Println(item) } }
单一状态者 一个 G 持有一个全局的状态,其他的 G 可以通过 chan 访问到 G 持有的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 reads := make (chan *readOp) writes := make (chan *writeOp) go func () { var state = make (map [int ]int ) for { select { case read := <-reads: read.resp <- state[read.key] case write := <-writes: state[write.key] = write.val write.resp <- true } } }() go func () { for { read := &readOp{ key: rand.Intn(5 ), resp: make (chan int )} reads <- read <-read.resp } }() go func () { for { write := &writeOp{ key: rand.Intn(5 ), val: rand.Intn(100 ), resp: make (chan bool )} writes <- write <-write.resp } }()
先生产,后消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 chanOwner := func () <-chan int { results := make (chan int , 5 ) go func () { defer close (results) for i := 0 ; i <= 5 ; i++ { results <- i } }() return results } consumer := func (results <-chan int ) { for result := range results { fmt.Printf("Received: %d\n" , result) } fmt.Println("Done receiving!" ) } results := chanOwner() consumer(results)
消费者通知生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 newRandStream := func (done <-chan interface {}) <-chan int { randStream := make (chan int ) go func () { defer fmt.Println("newRandStream closure exited." ) defer close (randStream) for { select { case randStream <- rand.Int(): case <-done: return } } }() return randStream } done := make (chan interface {}) randStream := newRandStream(done) fmt.Println("3 random ints:" ) for i := 1 ; i <= 3 ; i++ { fmt.Printf("%d: %d\n" , i, <-randStream) } close (done)
多 channel 合并使得安全退出(OR channel) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 var or func (channels ...<-chan interface {}) <-chan interface {}or = func (channels ...<-chan interface {}) <-chan interface {} { 1 switch len (channels) { case 0 : 2 return nil case 1 : 3 return channels[0 ] } orDone := make (chan interface {}) go func () { defer close (orDone) switch len (channels) { case 2 : select { case <-channels[0 ]: case <-channels[1 ]: } default : select { case <-channels[0 ]: case <-channels[1 ]: case <-channels[2 ]: case <-or(append (channels[3 :], orDone)...): } } }() return orDone }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 sig := func (after time.Duration) <-chan interface {}{ c := make (chan interface {}) go func () { defer close (c) time.Sleep(after) }() return c } start := time.Now() <-or( sig(2 *time.Hour), sig(5 *time.Minute), sig(1 *time.Second), sig(1 *time.Hour), sig(1 *time.Minute), ) fmt.Printf("done after %v" , time.Since(start))
只要有其中一个 chan 退出,其他的 chan 都退出
当需要将某一函数并发执行时,可以一律将返回值打包成结构体使用 chan 传递 例如:并发 http.Get
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 type Result struct { Error error Response *http.Response } checkStatus := func (done <-chan interface {}, urls ...string ) <-chan Result { results := make (chan Result) go func () { defer close (results) for _, url := range urls { var result Result resp, err := http.Get(url) result = Result{Error: err, Response: resp} select { case <-done: return case results <- result: } } }() return results }
1 2 3 4 5 6 7 8 9 10 11 done := make (chan interface {}) defer close (done)urls := []string {"https://www.google.com" , "https://badhost" } for result := range checkStatus(done, urls...) { if result.Error != nil { fmt.Printf("error: %v" , result.Error) continue } fmt.Printf("Response: %v\n" , result.Response.Status) }
同时,这个模式也启发我们在容错方面可以引入一个程序状态的观察者,它能够同时控制程序的走向,在上述例子中,我们默认那个观察者是 main
错误控制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 done := make (chan interface {}) defer close (done)errCount := 0 urls := []string {"a" , "https://www.google.com" , "b" , "c" , "d" } for result := range checkStatus(done, urls...) { if result.Error != nil { fmt.Printf("error: %v\n" , result.Error) errCount++ if errCount >= 3 { fmt.Println("Too many errors, breaking!" ) break } continue } fmt.Printf("Response: %v\n" , result.Response.Status) }
流水线 Pipeline 模式 将生产过程进行拆分,解耦,多个 G 之间互相协同,但每个 G 之间又相互独立 能进行拆分的前提:不同的阶段处理的都是同一类型的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 generator := func (done <-chan interface {}, integers ...int ) <-chan int { intStream := make (chan int ) go func () { defer close (intStream) for _, i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream } multiply := func ( done <-chan interface {}, intStream <-chan int , multiplier int , ) <-chan int { multipliedStream := make (chan int ) go func () { defer close (multipliedStream) for i := range intStream { select { case <-done: return case multipliedStream <- i*multiplier: } } }() return multipliedStream } add := func ( done <-chan interface {}, intStream <-chan int , additive int , ) <-chan int { addedStream := make (chan int ) go func () { defer close (addedStream) for i := range intStream { select { case <-done: return case addedStream <- i+additive: } } }() return addedStream } done := make (chan interface {}) defer close (done)intStream := generator(done, 1 , 2 , 3 , 4 ) pipeline := multiply(done, add(done, multiply(done, intStream, 2 ), 1 ), 2 ) for v := range pipeline { fmt.Println(v) }
多个 goroutine 通过多个同类型的 channel 和共享一个 done channel 联系在一起
生成器模式 无限重复生成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 repeat := func ( done <-chan interface {}, values ...interface {}, ) <-chan interface {} { valueStream := make (chan interface {}) go func () { defer close (valueStream) for { for _, v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream }
上述模式可以产生一个生成流,下面我们来从这个流中读取
从流中按需读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 take := func ( done <-chan interface {}, valueStream <-chan interface {}, num int , ) <-chan interface {} { takeStream := make (chan interface {}) go func () { defer close (takeStream) for i := 0 ; i < num; i++ { select { case <-done: return case takeStream <- <- valueStream: } } }() return takeStream } done := make (chan interface {}) defer close (done)for num := range take(done, repeat(done, 1 ), 10 ) { fmt.Printf("%v " , num) }
worker pool 协程池模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 worker := 8 c := make (chan int , l) var wg sync.WaitGroupwg.Add(worker) for i:=0 ;i<worker;i++ { go func () { for row := range c { for col := range m.m[row] { fillPixel(m, row, col) } } }() } for row := range m.m { c <- row } close (c)
pub-sub 发布订阅模式 每一个订阅者都会持有一个 chan 每个订阅者会有一个过滤器来进行内容的过滤 过滤器通常是一个 bool 函数值类型 发布者使用 map 存储订阅者 chan 到过滤函数的映射 每次发布的时候都遍历 map 然后使用对应的过滤器进行过滤后投递即可
事件驱动模式 常用于在一些操作执行的过程中触发其他操作例如通知 这些事件的处理操作需要事先注册,然后在合适的时间触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package eventsimport ( "time" ) var UserCreated userCreatedtype UserCreatedPayload struct { Email string Time time.Time } type userCreated struct { handlers []interface { Handle(UserCreatedPayload) } } func (u *userCreated) Register(handler interface { Handle(UserCreatedPayload) }) { u.handlers = append (u.handlers, handler) } func (u userCreated) Trigger(payload UserCreatedPayload) { for _, handler := range u.handlers { go handler.Handle(payload) } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package mainimport ( "time" "github.com/stephenafamo/demo/events" ) func init () { createNotifier := userCreatedNotifier{ adminEmail: "[email protected] " , slackHook: "https://hooks.slack.com/services/..." , } events.UserCreated.Register(createNotifier) } type userCreatedNotifier struct { adminEmail string slackHook string } func (u userCreatedNotifier) notifyAdmin(email string , time time.Time) { } func (u userCreatedNotifier) sendToSlack(email string , time time.Time) { } func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) { u.notifyAdmin(payload.Email, payload.Time) u.sendToSlack(payload.Email, payload.Time) }
触发事件处理函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package authimport ( "time" "github.com/stephenafamo/demo/events" ) func CreateUser () { events.UserCreated.Trigger(events.UserCreatedPayload{ Email: "[email protected] " , Time: time.Now(), }) }