Concurrency patterns are one of Go’s most distinctive programming paradigms. This article summarizes the most common concurrency patterns in Go development, including request-receiver pattern, single state holder pattern, producer-consumer pattern, Pipeline pattern, and more. These patterns help us better organize and manage goroutines, making concurrent programs more robust and elegant. By learning these patterns, you’ll be able to better master Go’s concurrency features and write higher quality concurrent code.
// State holder gofunc() { var state = make(map[int]int) for { select { case read := <-reads: // Return result read.resp <- state[read.key] case write := <-writes: state[write.key] = write.val // Return result write.resp <- true } } }()
done := make(chaninterface{}) randStream := newRandStream(done) fmt.Println("3 random ints:") for i := 1; i <= 3; i++ { fmt.Printf("%d: %d\n", i, <-randStream) } // Consumer actively requests production to stop close(done)
switchlen(channels) { case2: select { case <-channels[0]: case <-channels[1]: } default: select { case <-channels[0]: case <-channels[1]: case <-channels[2]: case <-or(append(channels[3:], orDone)...): } } }() return orDone }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
sig := func(after time.Duration) <-chaninterface{}{ c := make(chaninterface{}) gofunc() { deferclose(c) time.Sleep(after) }() return c }
for _, url := range urls { var result Result resp, err := http.Get(url) result = Result{Error: err, Response: resp} select { case <-done: return case results <- result: } } }() return results }
1 2 3 4 5 6 7 8 9 10 11
done := make(chaninterface{}) deferclose(done)
urls := []string{"https://www.google.com", "https://badhost"} for result := range checkStatus(done, urls...) { if result.Error != nil { fmt.Printf("error: %v", result.Error) continue } fmt.Printf("Response: %v\n", result.Response.Status) }
This pattern also inspires us to introduce a program state observer in error handling, which can control program flow. In the above example, we default that observer to main.
Error Control
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
done := make(chaninterface{}) deferclose(done)
errCount := 0 urls := []string{"a", "https://www.google.com", "b", "c", "d"} for result := range checkStatus(done, urls...) { if result.Error != nil { fmt.Printf("error: %v\n", result.Error) errCount++ if errCount >= 3 { // When too many errors, break and terminate request G fmt.Println("Too many errors, breaking!") break } continue } fmt.Printf("Response: %v\n", result.Response.Status) }
Pipeline Pattern
Split the production process, decouple, multiple Gs cooperate with each other, but each G is independent Prerequisite for splitting: Different stages process the same type of data
generator := func(done <-chaninterface{}, integers ...int) <-chanint { intStream := make(chanint) gofunc() { deferclose(intStream) for _, i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream }
multiply := func( done <-chaninterface{}, intStream <-chanint, multiplier int, ) <-chanint { multipliedStream := make(chanint) gofunc() { deferclose(multipliedStream) for i := range intStream { select { case <-done: return case multipliedStream <- i*multiplier: } } }() return multipliedStream }
add := func( done <-chaninterface{}, intStream <-chanint, additive int, ) <-chanint { addedStream := make(chanint) gofunc() { deferclose(addedStream) for i := range intStream { select { case <-done: return case addedStream <- i+additive: } } }() return addedStream }
// Each stage can use done channel to safely exit goroutine done := make(chaninterface{}) deferclose(done)
// Each stage returns a channel of the data type to be processed intStream := generator(done, 1, 2, 3, 4) pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeline { fmt.Println(v) }
Multiple goroutines are connected through multiple channels of the same type and share a done channel
for i:=0;i<worker;i++ { gofunc(){ for row := range c { for col := range m.m[row] { fillPixel(m, row, col) } } }() }
for row := range m.m { c <- row } close(c)
Pub-Sub Pattern
Each subscriber holds a channel Each subscriber has a filter to filter content Filter is usually a bool function value type Publisher uses map to store subscriber channel to filter function mapping Each time publishing, traverse map and use corresponding filter for filtering before delivery
Event-Driven Pattern
Commonly used to trigger other operations during execution of some operations, such as notifications These event handling operations need to be registered in advance, then triggered at appropriate times
type userCreatedNotifier struct{ adminEmail string slackHook string }
func(u userCreatedNotifier) notifyAdmin(email string, time time.Time) { // ... }
func(u userCreatedNotifier) sendToSlack(email string, time time.Time) { // ... }
// As long as it matches the handler function signature, you can customize the internal implementation func(u userCreatedNotifier) Handle(payload events.UserCreatedPayload) { // Do something with our payload u.notifyAdmin(payload.Email, payload.Time) u.sendToSlack(payload.Email, payload.Time) }
Trigger event handler function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package auth
import ( "time"
"github.com/stephenafamo/demo/events" // Other imported packages )
funcCreateUser() { // ... // After decoupling, just pass in required payload, no need to care about specific handling logic events.UserCreated.Trigger(events.UserCreatedPayload{ Email: "[email protected]", Time: time.Now(), }) // ... }