Common Concurrency Patterns

Concurrency patterns are one of Go’s most distinctive programming paradigms. This article summarizes the most common concurrency patterns in Go development, including request-receiver pattern, single state holder pattern, producer-consumer pattern, Pipeline pattern, and more. These patterns help us better organize and manage goroutines, making concurrent programs more robust and elegant. By learning these patterns, you’ll be able to better master Go’s concurrency features and write higher quality concurrent code.

Simple Request-Receiver Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Request function returns a receiver based on parameters
func Glob(pattern string) <-chan Item {
c := make(chan Item) // 1. Initialize receiver

// 2. Anonymous G closure execution
go func() {
defer close(c)
for name, item := range items {
if ok, _ := filepath.Match(pattern, name); !ok {
continue
}
c <- item
}
}()

// 3. Quickly return receiver for later programs to read from chan
return c
}

func main() {
for item := range Glob("[ab]*") {
fmt.Println(item)
}
}

Single State Holder

One G holds a global state, and other Gs can access the state through channels

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
reads := make(chan *readOp)
writes := make(chan *writeOp)

// State holder
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
// Return result
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
// Return result
write.resp <- true
}
}
}()

// Requesters
go func() {
for {
read := &readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
}
}()

go func() {
for {
write := &writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
}
}()

Producer First, Consumer Later

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Producer first, known number of productions
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}

consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
}

results := chanOwner()
// Consume
consumer(results)

Consumer Notifies Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done:
// Stop production, exit
return
}
}
}()

return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
// Consumer actively requests production to stop
close(done)

Multiple Channel Merge for Safe Exit (OR channel)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}

orDone := make(chan interface{})
go func() {
defer close(orDone)

switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sig := func(after time.Duration) <-chan interface{}{
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}

start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))

When any one channel exits, all other channels exit

When Concurrently Executing a Function, Always Package Return Values into a Struct Using Channels

Example: Concurrent http.Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Unify return values into result struct
type Result struct {
Error error
Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)

for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- result:
}
}
}()
return results
}
1
2
3
4
5
6
7
8
9
10
11
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}

This pattern also inspires us to introduce a program state observer in error handling, which can control program flow. In the above example, we default that observer to main.

Error Control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
if errCount >= 3 {
// When too many errors, break and terminate request G
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}

Pipeline Pattern

Split the production process, decouple, multiple Gs cooperate with each other, but each G is independent
Prerequisite for splitting: Different stages process the same type of data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}

multiply := func(
done <-chan interface{},
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}

add := func(
done <-chan interface{},
intStream <-chan int,
additive int,
) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+additive:
}
}
}()
return addedStream
}

// Each stage can use done channel to safely exit goroutine
done := make(chan interface{})
defer close(done)

// Each stage returns a channel of the data type to be processed
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
fmt.Println(v)
}

Multiple goroutines are connected through multiple channels of the same type and share a done channel

Generator Pattern

Infinite repeat generation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
// Infinite loop continuously sends slice contents until stop signal
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}

The above pattern can generate a stream, let’s read from this stream

Read from stream as needed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <- valueStream:
}
}
}()
return takeStream
}

done := make(chan interface{})
defer close(done)

// Take first 10 elements from stream then exit
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}

Worker Pool Pattern

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
worker := 8
c := make(chan int, l)

var wg sync.WaitGroup
wg.Add(worker)

for i:=0;i<worker;i++ {
go func(){
for row := range c {
for col := range m.m[row] {
fillPixel(m, row, col)
}
}
}()
}

for row := range m.m {
c <- row
}
close(c)

Pub-Sub Pattern

Each subscriber holds a channel
Each subscriber has a filter to filter content
Filter is usually a bool function value type
Publisher uses map to store subscriber channel to filter function mapping
Each time publishing, traverse map and use corresponding filter for filtering before delivery

Event-Driven Pattern

Commonly used to trigger other operations during execution of some operations, such as notifications
These event handling operations need to be registered in advance, then triggered at appropriate times

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package events

import (
"time"
)

var UserCreated userCreated

// Define context payload
type UserCreatedPayload struct {
Email string
Time time.Time
}

// Function chain with context payload
type userCreated struct {
handlers []interface{ Handle(UserCreatedPayload) }
}

// Register adds event handler function to slice
func (u *userCreated) Register(handler interface{ Handle(UserCreatedPayload) }) {
u.handlers = append(u.handlers, handler)
}

// Trigger executes context-carrying handler functions in goroutines
func (u userCreated) Trigger(payload UserCreatedPayload) {
for _, handler := range u.handlers {
go handler.Handle(payload)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"time"

"github.com/stephenafamo/demo/events"
)

func init() {
createNotifier := userCreatedNotifier{
adminEmail: "[email protected]",
slackHook: "https://hooks.slack.com/services/...",
}

// Register handler callback
events.UserCreated.Register(createNotifier)
}

type userCreatedNotifier struct{
adminEmail string
slackHook string
}

func (u userCreatedNotifier) notifyAdmin(email string, time time.Time) {
// ...
}

func (u userCreatedNotifier) sendToSlack(email string, time time.Time) {
// ...
}

// As long as it matches the handler function signature, you can customize the internal implementation
func (u userCreatedNotifier) Handle(payload events.UserCreatedPayload) {
// Do something with our payload
u.notifyAdmin(payload.Email, payload.Time)
u.sendToSlack(payload.Email, payload.Time)
}

Trigger event handler function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package auth

import (
"time"

"github.com/stephenafamo/demo/events"
// Other imported packages
)

func CreateUser() {
// ...
// After decoupling, just pass in required payload, no need to care about specific handling logic
events.UserCreated.Trigger(events.UserCreatedPayload{
Email: "[email protected]",
Time: time.Now(),
})
// ...
}
Author

马克鱼

Posted on

2019-09-22

Updated on

2025-10-12

Licensed under