paulgorman.org/technical

Go Concurrency (Golang)

Mon Sep 11 09:19:19 EDT 2017

Easy concurrency is one of Go’s defining and most compelling features.

Goroutines

Go has coroutines called “goroutines”. Goroutines resemble OS threads, but have far less overhead. Like OS treads, goroutines share resources such as memory and open files (with the attendant pitfalls).

Go efficiently packs a large number of goroutines into a modest amount of OS threads by aggressively selecting between goroutines. When a goroutine blocks, Go switches to another.

Launch a goroutine by adding the go keyword before a function call:

go f(arg)

If our goroutines don’t need to share resources, that’s all we need to know — use the go keyword.

A common idiom is to call a literal function with the go keyword:

go func() {
	io.Copy(os.Stdout, conn)
	log.Println("done")
	done <- 1
}()

Channels

Since goroutines often need to share resources, Go offers “channels”. Channels connect goroutines. A goroutine uses a channel to send a value to another goroutine.

Each channel has a type, and only sends values of that type. Internally, a channel is a reference to a data structure created by make, like maps.

Channels offer two operations: send and receive. (A third operation — close — stops any further communication across the channel.) Send and receive use the <- operator:

ch <- x  // send
x = <-ch // receive and assign
<-ch     // receive and discard
close(ch)

A channel can be unbuffered (the default) or buffered:

ch = make(chan int)    // unbuffered
ch = make(chan int, 5) // buffered with capacity 3

A send on an unbuffered channel blocks the sending goroutine until another goroutine receives on the channel. Conversely, if a goroutine tries to receive before the channel has data, that goroutine blocks until another goroutine sends. Receiving runs before Go reawakens the sending goroutine. In effect, unbuffered channels “synchronize” sending and receiving goroutines.

Sometimes a channel simply acts as a signal rather than to transmit a value. We might use a bool or int channel for such a purpose, like we use done <- 1 above. However, the Go convention is to use an empty struct for signaling; this makes it obvious that the value is of no consequence.

ch := make(chan struct{})
...
ch <- struct{}{} // An empty struct acts only as a signal.

Pipelines

A “pipeline” is the output of one channel used as the input to another channel. Like a shell pipeline, a Go pipeline can have any number of connected channel and function stages. See https://blog.golang.org/pipelines and section 8.4.2 of The Go Programming Language.

Closing the naturals channels signals the “squarer” function to end its for loop:

// Pipeline1 demonstrates channels used as pipelines.
// Taken from TGPL section 8.4.2.
package main

import "fmt"

func main() {
	naturals := make(chan int)
	squares := make(chan int)
	// Counter:
	go func() {
		for x := 0; x < 10; x++ {
			naturals <- x
		}
		close(naturals)
	}()
	// Squarer:
	go func() {
		for x := range naturals { // A syntactic convenience; see TGLP p. 229.
			squares <- x * x
		}
		close(squares)
	}()
	// Printer:
	for x := range squares {
		fmt.Println(x)
	}
}

The above construct of “for-ranging” over a channel is shorthand for:

// Squarer:
go func() {
	for {
		x, ok := <-naturals
		if !ok {
			break // Channel closed and drained
		}
		squares <- x * x
	}
	close(squares)
}()

Drained means all further receives on the closed channel don’t block but yield zero values.

It’s not vital to close every channel; the garbage collector eventually reclaims any open but unreachable channels. Only worry about closing a channel to signal receiving goroutine(s) that sending has ended. Trying to close a previously closed channel panics.

Uni-directional Channels

Channels can be passed into or returned from functions like any other value. A channel passes as a parameter is almost always intended only for sending or only for receiving.

func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

A unidirectional channel exposes only send or only receive. The channel declaration indicates its directionality:

out chan<- int // A send-only channel. Data only flows in to it (`chan<-`).
in <-chan int  // A receive-only channel. Data only flows out of it (`<-chan`).

The compiler detects errant sends to a receive-only channel and vice versa. The compiler also errors on closes of receive-only channels (since close means no more data will be sent).

Buffered Channels

ch = make(ch string, 3)

The above creates a buffered channel with a first-in-first-out queue of three elements. A send operation slots a value into the back of the queue, and a receive empties a value from the front.

ch <- "X"
ch <- "Y"
ch <- "Z"

A receive on this channel would get “X”.

The key thing about a buffered channel is that sends/receives do not cause a goroutine to block unless the buffer completely fills/empties. Buffered channels don’t naturally cause sending and receiving goroutines to synchronize.

Section 8.4.4 of TGPL provides an amusing illustration of buffer use through the analogy of an assembly line of chefs producing cakes. https://github.com/adonovan/gopl.io/tree/master/ch8/cake

Looping in Parallel

Fun fact: a problem that breaks down into totally independent sub-problems, like processing each image in a directory, is termed “embarrassingly parallel”.

// BROKEN:
func manipulateImages(imageFiles []string) {
	for _, f := imageFiles {
		go mungImg(f)
	}
}

The above is broken because the outer function doesn’t wait for the goroutines it spawns to finish work before it returns. There’s no direct way to wait for a goroutine to complete, but we can wait for an event on a shared channel. (For trivial testing purposes, we can use fmt.Scanln(&in) to wait for user input.)

Also see sync.WaitGroup below.

Fan in, fan out

The “fan out” pattern is when multiple functions read from the same channel until it closes.

The “fan in” pattern is when a function multiplexes several channels into one channel, and reads until all the inputs close. Because sends on a closed channel panic, make sure to finish all sends before closing the channel. sync.WaitGroup helps.

Select

The select construct lets us multiplex between channels. It selects a channel that’s ready to receive or send. If multiple channels are ready, select chooses randomly.

package main

import (
	"fmt"
	"os"
	"time"
)

func main() {
	abort := make(chan bool)
	go func() {
		os.Stdin.Read(make([]byte, 1))
		abort <- true
	}()
	tick := time.Tick(1 * time.Second)
	fmt.Println("Begin countdown. Press RETURN to abort.")
	for countdown := 10; countdown > 0; countdown-- {
		fmt.Println(countdown)
		/* Either count down or abort.
		   We can't just receive from each channel, because the first one
		   will block the other. We need to multiplex with `select`.
		*/
		select {
		case <-tick:
			// Pass
		case <-abort:
			fmt.Println("Launch aborted.")
			return
		}
	}
	fmt.Println("Lift off!")
}

Another example:

package main

import (
	"fmt"
)

func counter(s string, ch chan string, done chan int) {
	for i := 0; i <= 2; i++ {
		ch <- fmt.Sprintf("%v: %v", s, i)
	}
	done <- 1
}

func main() {
	ch := make(chan string)
	done := make(chan int)
	go counter("A", ch, done)
	go counter("B", ch, done)
	go counter("C", ch, done)
	for i := 0; i < 3; {
		select {
		case x := <-ch:
			fmt.Println(x)
		case <- done:
			i++
		}
	}
}

sync.WaitGroup

If we know how many iterations we need (i.e. — how many goroutines we’ll start and stop), coordinate them through simple signaling, as seen below in parallel(). If we don’t know that in advance, use sync.WaitGroup to act as a reference counter for goroutines, as seen below in wtgrp().

package main

import (
	"fmt"
	"sync"
	"time"
)

var Msgs = []string{"Hello.", "Good day.", "Buenos noches."}

func Worker(s string, t time.Duration) {
	time.Sleep(time.Second * t)
	fmt.Println(s)
}

func nonparallel() {
	for _, m := range Msgs {
		Worker(m, 2)
	}
}

func broken() {
	for _, m := range Msgs {
		go Worker(m, 2)
	}
}

func parallel() {
	ch := make(chan bool)
	for _, m := range Msgs {
		go func(m string) {
			Worker(m, 2)
			ch <- true
		}(m)
	}
	// Wait for signal on channel that each goroutine completed.
	// This solution only works for a known quantity of goroutines.
	for range Msgs {
		<-ch
	}
}

func wtgrp() {
	// Wtgrp is like parallel(), but works when we don't know the number of iterations.
	// sync.WaitGroup essentially keeps a count of starting goroutines and matches to ending goroutines.
	var wg sync.WaitGroup
	for _, m := range Msgs {
		wg.Add(1) // Be certain to Add before launching the goroutine!
		go func(m string) {
			defer wg.Done()
			Worker(m, 2)
		}(m)
	}
	wg.Wait()
}

func main() {
	// nonparallel()
	// broken()
	// parallel()
	wtgrp()
}

Goroutine Cancellation

How do we tell a goroutine to stop what it’s doing? It’s difficult to pick out a particular goroutine or know how many might currently be running. A simple signal on a channel gets consumed by the first goroutine to receive it, so that may not be suitable. Use a broadcast. After a channel has been drained and closed, subsequent receives on it return zero values. Use this as a broadcast signal. Have each goroutine check cancelled() to see if the done channel has been closed. If the channel has been closed, each goroutine should clean up after itself and return.

var done = make(chan bool)
func cancelled() bool {
	select {
	case <-done:
		return true
	default:
		return false
	}
}

Concurrency with Shared Variables

Sharing variable among multiple goroutines introduces several pitfalls.

Race Conditions

See The Go Programming Language, chapter nine.

Race conditions defy intuition, because the order of interleaved operations is unpredictable, even from one program run to another. Whenever two goroutines concurrently access the same variable, and at least one access includes a write, a data race can happen. How to avoid a race condition:

Mutual Exclusion with sync.Mutex

With sync.Mutex, a goroutine must acquire an exclusive lock to write a variable and unlock it when finished writing. When another goroutine already holds the lock, this operation blocks until the other goroutine calls Unlock. By convention, declare the variables guarded by a mutex immediately after declaration of the mutex itself.

// Bankmutex demonstrates using a mutex for avoiding races for shared variables.
package main

import "fmt"
import "sync"

var mu sync.Mutex // guards balance
var balance int
var wg sync.WaitGroup

func main() {
	TestTrans(99)
	TestTrans(100)
	wg.Wait()
}

func TestTrans(d int) {
	wg.Add(1)
	go func() {
		defer wg.Done()
		fmt.Println("Before:", Balance())
		Deposit(d)
		fmt.Println("After:", Balance())
	}()
}

func Deposit(amount int) {
	mu.Lock()
	balance = balance + amount
	mu.Unlock()
}

func Balance() int {
	mu.Lock()
	b := balance
	mu.Unlock()
	return b
}

The code between Lock and Unlock is called the “critical section”. Be sure to unlock on all paths through the function, even when it branches to an error. Although the above example explicitly delineates the critical section, it might be safer to use defer, like:

func Deposit(amount int) {
	mu.Lock()
	defer mu.Unlock()
	balance = balance + amount
}