Mon Sep 11 09:19:19 EDT 2017
Easy concurrency is one of Go’s defining and most compelling features.
Go has coroutines called “goroutines”. Goroutines resemble OS threads, but have far less overhead. Like OS treads, goroutines share resources such as memory and open files (with the attendant pitfalls).
Go efficiently packs a large number of goroutines into a modest amount of OS threads by aggressively selecting between goroutines. When a goroutine blocks, Go switches to another.
Launch a goroutine by adding the go
keyword before a function call:
go f(arg)
If our goroutines don’t need to share resources, that’s all we need to know — use the go
keyword.
A common idiom is to call a literal function with the go keyword
:
go func() {
io.Copy(os.Stdout, conn)
log.Println("done")
done <- 1
}()
Since goroutines often need to share resources, Go offers “channels”. Channels connect goroutines. A goroutine uses a channel to send a value to another goroutine.
Each channel has a type, and only sends values of that type.
Internally, a channel is a reference to a data structure created by make
, like maps.
Channels offer two operations: send and receive.
(A third operation — close — stops any further communication across the channel.)
Send and receive use the <-
operator:
ch <- x // send
x = <-ch // receive and assign
<-ch // receive and discard
close(ch)
A channel can be unbuffered (the default) or buffered:
ch = make(chan int) // unbuffered
ch = make(chan int, 5) // buffered with capacity 3
A send on an unbuffered channel blocks the sending goroutine until another goroutine receives on the channel. Conversely, if a goroutine tries to receive before the channel has data, that goroutine blocks until another goroutine sends. Receiving runs before Go reawakens the sending goroutine. In effect, unbuffered channels “synchronize” sending and receiving goroutines.
Sometimes a channel simply acts as a signal rather than to transmit a value.
We might use a bool
or int
channel for such a purpose, like we use done <- 1
above.
However, the Go convention is to use an empty struct
for signaling; this makes it obvious that the value is of no consequence.
ch := make(chan struct{})
...
ch <- struct{}{} // An empty struct acts only as a signal.
A “pipeline” is the output of one channel used as the input to another channel. Like a shell pipeline, a Go pipeline can have any number of connected channel and function stages. See https://blog.golang.org/pipelines and section 8.4.2 of The Go Programming Language.
Closing the naturals
channels signals the “squarer” function to end its for
loop:
// Pipeline1 demonstrates channels used as pipelines.
// Taken from TGPL section 8.4.2.
package main
import "fmt"
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter:
go func() {
for x := 0; x < 10; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer:
go func() {
for x := range naturals { // A syntactic convenience; see TGLP p. 229.
squares <- x * x
}
close(squares)
}()
// Printer:
for x := range squares {
fmt.Println(x)
}
}
The above construct of “for-ranging” over a channel is shorthand for:
// Squarer:
go func() {
for {
x, ok := <-naturals
if !ok {
break // Channel closed and drained
}
squares <- x * x
}
close(squares)
}()
Drained means all further receives on the closed channel don’t block but yield zero values.
It’s not vital to close every channel; the garbage collector eventually reclaims any open but unreachable channels. Only worry about closing a channel to signal receiving goroutine(s) that sending has ended. Trying to close a previously closed channel panics.
Channels can be passed into or returned from functions like any other value. A channel passes as a parameter is almost always intended only for sending or only for receiving.
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
A unidirectional channel exposes only send
or only receive
.
The channel declaration indicates its directionality:
out chan<- int // A send-only channel. Data only flows in to it (`chan<-`).
in <-chan int // A receive-only channel. Data only flows out of it (`<-chan`).
The compiler detects errant sends to a receive-only channel and vice versa.
The compiler also errors on closes of receive-only channels (since close
means no more data will be sent).
ch = make(ch string, 3)
The above creates a buffered channel with a first-in-first-out queue of three elements. A send operation slots a value into the back of the queue, and a receive empties a value from the front.
ch <- "X"
ch <- "Y"
ch <- "Z"
A receive on this channel would get “X”.
The key thing about a buffered channel is that sends/receives do not cause a goroutine to block unless the buffer completely fills/empties. Buffered channels don’t naturally cause sending and receiving goroutines to synchronize.
Section 8.4.4 of TGPL provides an amusing illustration of buffer use through the analogy of an assembly line of chefs producing cakes. https://github.com/adonovan/gopl.io/tree/master/ch8/cake
Fun fact: a problem that breaks down into totally independent sub-problems, like processing each image in a directory, is termed “embarrassingly parallel”.
// BROKEN:
func manipulateImages(imageFiles []string) {
for _, f := imageFiles {
go mungImg(f)
}
}
The above is broken because the outer function doesn’t wait for the goroutines it spawns to finish work before it returns.
There’s no direct way to wait for a goroutine to complete, but we can wait for an event on a shared channel.
(For trivial testing purposes, we can use fmt.Scanln(&in)
to wait for user input.)
Also see sync.WaitGroup
below.
The “fan out” pattern is when multiple functions read from the same channel until it closes.
The “fan in” pattern is when a function multiplexes several channels into one channel, and reads until all the inputs close.
Because sends on a closed channel panic, make sure to finish all sends before closing the channel.
sync.WaitGroup
helps.
The select
construct lets us multiplex between channels.
It selects a channel that’s ready to receive or send.
If multiple channels are ready, select
chooses randomly.
package main
import (
"fmt"
"os"
"time"
)
func main() {
abort := make(chan bool)
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- true
}()
tick := time.Tick(1 * time.Second)
fmt.Println("Begin countdown. Press RETURN to abort.")
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
/* Either count down or abort.
We can't just receive from each channel, because the first one
will block the other. We need to multiplex with `select`.
*/
select {
case <-tick:
// Pass
case <-abort:
fmt.Println("Launch aborted.")
return
}
}
fmt.Println("Lift off!")
}
Another example:
package main
import (
"fmt"
)
func counter(s string, ch chan string, done chan int) {
for i := 0; i <= 2; i++ {
ch <- fmt.Sprintf("%v: %v", s, i)
}
done <- 1
}
func main() {
ch := make(chan string)
done := make(chan int)
go counter("A", ch, done)
go counter("B", ch, done)
go counter("C", ch, done)
for i := 0; i < 3; {
select {
case x := <-ch:
fmt.Println(x)
case <- done:
i++
}
}
}
If we know how many iterations we need (i.e. — how many goroutines we’ll start and stop), coordinate them through simple signaling, as seen below in parallel()
.
If we don’t know that in advance, use sync.WaitGroup
to act as a reference counter for goroutines, as seen below in wtgrp()
.
package main
import (
"fmt"
"sync"
"time"
)
var Msgs = []string{"Hello.", "Good day.", "Buenos noches."}
func Worker(s string, t time.Duration) {
time.Sleep(time.Second * t)
fmt.Println(s)
}
func nonparallel() {
for _, m := range Msgs {
Worker(m, 2)
}
}
func broken() {
for _, m := range Msgs {
go Worker(m, 2)
}
}
func parallel() {
ch := make(chan bool)
for _, m := range Msgs {
go func(m string) {
Worker(m, 2)
ch <- true
}(m)
}
// Wait for signal on channel that each goroutine completed.
// This solution only works for a known quantity of goroutines.
for range Msgs {
<-ch
}
}
func wtgrp() {
// Wtgrp is like parallel(), but works when we don't know the number of iterations.
// sync.WaitGroup essentially keeps a count of starting goroutines and matches to ending goroutines.
var wg sync.WaitGroup
for _, m := range Msgs {
wg.Add(1) // Be certain to Add before launching the goroutine!
go func(m string) {
defer wg.Done()
Worker(m, 2)
}(m)
}
wg.Wait()
}
func main() {
// nonparallel()
// broken()
// parallel()
wtgrp()
}
How do we tell a goroutine to stop what it’s doing?
It’s difficult to pick out a particular goroutine or know how many might currently be running.
A simple signal on a channel gets consumed by the first goroutine to receive it, so that may not be suitable.
Use a broadcast.
After a channel has been drained and closed, subsequent receives on it return zero values.
Use this as a broadcast signal.
Have each goroutine check cancelled()
to see if the done
channel has been closed.
If the channel has been closed, each goroutine should clean up after itself and return.
var done = make(chan bool)
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
Sharing variable among multiple goroutines introduces several pitfalls.
See The Go Programming Language, chapter nine.
Race conditions defy intuition, because the order of interleaved operations is unpredictable, even from one program run to another. Whenever two goroutines concurrently access the same variable, and at least one access includes a write, a data race can happen. How to avoid a race condition:
sync.Mutex
With sync.Mutex
, a goroutine must acquire an exclusive lock to write a variable and unlock it when finished writing.
When another goroutine already holds the lock, this operation blocks until the other goroutine calls Unlock.
By convention, declare the variables guarded by a mutex immediately after declaration of the mutex itself.
// Bankmutex demonstrates using a mutex for avoiding races for shared variables.
package main
import "fmt"
import "sync"
var mu sync.Mutex // guards balance
var balance int
var wg sync.WaitGroup
func main() {
TestTrans(99)
TestTrans(100)
wg.Wait()
}
func TestTrans(d int) {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Before:", Balance())
Deposit(d)
fmt.Println("After:", Balance())
}()
}
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}
The code between Lock
and Unlock
is called the “critical section”.
Be sure to unlock on all paths through the function, even when it branches to an error.
Although the above example explicitly delineates the critical section, it might be safer to use defer
, like:
func Deposit(amount int) {
mu.Lock()
defer mu.Unlock()
balance = balance + amount
}