Still new to Go, I often find myself reaching out for locks and waitgroups where channels would suffice. Channels too can be used to provide mutual exclusion and are more idiomatic. So as an exercise, once in a while I try to switch code that’s using Locks/Waitgroups into using channels, just to get used to them.
For example, let’s consider the Workiva/go-datastructures/futures package. I’d define a future as a sort of placeholder for a result that’s being computed asynchronously and might be accessed by multiple threads/go-routines concurrently. Here’s a better and simpler definition though from Heather Miller & her students’ book, Programming Models for Distributed Computation:
A future or promise can be thought of as a value that will eventually become available.
Alternatively, if you’re already familiar with Javascript, futures are similar to ES6 promises.
The Workiva/futures
API is short
enough and is a great place to start from. The caller creates a Future value by
invoking futures.New(completer, timeout)
. The completer
argument is a
read-only channel through which the result is received asynchronously. The
timeout
argument is there to avoid waiting for the result indefinitely. One
can then check whether the result is available by using the HasResult
method.
If the result is available, it is retrieved using the GetResult
method. If it
hasn’t arrived yet, GetResult
blocks until it’s available or a timeout occurs.
// Completer is a channel that the future expects to receive
// a result on. The future only receives on this channel.
type Completer <-chan interface{}
type Future struct {...}
func New(completer Completer, timeout time.Duration) *Future
func (f *Future) HasResult() bool
func (f *Future) GetResult() (interface{}, error)
The Future struct has the following fields:
type Future struct {
triggered bool
item interface{}
err error
lock sync.Mutex
wg sync.WaitGroup
}
Once available, the result is stored in the item
field. However, if a timeout
occurs, the item
field is set to nil and the err
field is set to a timeout
error. The triggered
boolean is mainly used to check whether the result is
available. By default it’s false
. Once either the result is received or a
timeout occurs it’s flipped to true
. And we’ll get to lock
and wg
soon
enough.
As already mentioned, the futures.New
function is used to create a Future
instance. Internally, New
launches a goroutine in which it waits for the
result. The code sample below has been trimmed to emphasize the key ideas. Also
observe that f.wg
is incremented by 1 - f.wg
will become relevant when we
get to the GetResult
method.
var errTimeout error = errors.New("timeout error")
func listenForResult(f *Future, ch <-chan interface{}, timeout time.Duration) {
t := time.NewTimer(timeout)
select {
case item := <-ch:
f.setItem(item, nil)
t.Stop()
case <-t.C:
f.setItem(nil, errTimeout)
}
}
func New(completer <-chan interface{}, timeout time.Duration) *Future {
f := &Future{}
f.wg.Add(1)
// ...
go listenForResult(f, completer, timeout)
// ...
return f
}
When the value arrives from the completer
channel (or a timeout occurs), the
future’s setItem
method is called with the result. Now, this is where things
get interesting. The setItem
method is defined as follows:
func (f *Future) setItem(item interface{}, err error) {
f.lock.Lock()
f.triggered = true
f.item = item
f.err = err
f.lock.Unlock()
f.wg.Done()
}
Once setItem
is done, all callers that were blocked on GetResult
can now
read the value. Again, for the sake of completion, here’s how GetResult
is
defined:
func (f *Future) GetResult() (interface{}, error) {
f.lock.Lock()
if f.triggered {
f.lock.Unlock()
return f.item, f.err
}
f.lock.Unlock()
f.wg.Wait()
return f.item, f.err
}
The usage of both the waitgroup and the lock can be replaced with channels. We’ll go through each one by one to see why they are there and how channels can be used in an equivalent manner. Let’s start with the waitgroup
The wg
waitgroup (which was incremented to 1 during instantiation) is there to
make every goroutine that calls GetResult
wait if the result isn’t available.
Once available, i.e. when setItem
invokes f.wg.Done()
, all the goroutines
that were blocked can then proceed and read the result. Simply put, the
waitgroup is there for notifying blocked callers. The same can be achieved by
having callers block directly while trying to ‘read’ a value from a channel and
then closing the channel when the result is ready.
type Future struct {
// ...
completed chan struct{}
}
func New(completer <-chan interface{}, timeout time.Duration) *Future {
// Note that the channel is unbuffered
f := &Future{
completed: make(chan struct{}),
}
// ...
go listenForResult(f, completer, timeout)
// ...
return f
}
func listenForResult(f *Future, ch <-chan interface{}, timeout time.Duration) {
t := time.NewTimer(timeout)
select {
case item := <-ch:
f.setItem(item, nil)
t.Stop()
case <-t.C:
f.setItem(nil, errTimeout)
}
close(f.complete) // broadcast completion
}
func (f *Future) GetResult() (interface{}, error) {
f.lock.Lock()
if f.triggered {
f.lock.Unlock()
return f.item, f.err
}
f.lock.Unlock()
<-f.completed // blocks until either value is sent or channel is closed
return f.item, f.err
}
As specified, channels are
safe for concurrent receives and all reads from a closed channel receive the
zero value. Also note that f.completed
is an empty struct{}
channel to
indicate that it’ll be used solely for signaling rather than sending or
receiving any actual values.
Now, for the locks. The f.lock
is used to ensure that data races don’t occur.
Data races fall under race conditions which are a kind of concurrency bug.
Alan Donovan’s and Brian Kernighan’s ‘The Go Programming Language’ book provides
the following description of both race conditions and data races: “A race
condition is a situation in which the program does not give the correct result
for some interleavings of the operations of multiple goroutines. Race conditions
are pernicious because they may remain latent in a program and appear
infrequently, perhaps only under heavy load or when using certain compilers,
platforms, or architectures. This makes them hard to reproduce and diagnose… A
data race occurs whenever two goroutines access the same variable concurrently
and at least one of the accesses is a write”.
Hence the locks used above. When setItem
writes to f.item
, f.triggered
and
f.error
, the locking guarantees that it has exclusive ‘ownership’ of these
variables and no other goroutine is trying to read or write to those variables
at that instance. Once setItem
has written the result and unlocked the Lock,
other goroutines can then read them safely without causing any data races.
As earlier mentioned, channels too can be used to guarantee mutual exclusion.
Given how the Future
object is structured, we end up with the following key
factors:
- A write occurs only once throughout the lifetime of a
Future
object, that is when either the result arrives or a timeout occurs. - All reads should occur only after the write above has been completed in order to avoid data races.
These two factors provide a guideline for our concurrency approach. Now let’s shift our attention to channels, particularly unbuffered channels such as the one we’ve already used above. To reference The Go Programming Language book again: “A send operation on an unbuffered channel blocks the sending goroutine until another goroutine executes a corresponding receive on the same channel, at which point the value is transmitted and both goroutines may continue. Conversely, if the receive operation was attempted first, the receiving goroutine is blocked until another goroutine performs a send on the same channel”.
This is exactly what we need, a means of blocking the readers accessing a
Future
’s internal variables until the write has been completed. Better yet, we
don’t even need to send any value to the channel in our case, we can simply
close the channel and all currently blocked readers can proceed. Moreover,
future readers don’t have to acquire a lock, since as already mentioned,
receives from a closed channel get the zero value. With all these mind, the code
can then be simplified into the following. Note that we are reusing the
f.completed
channel and the setItem
helper method is no longer required:
type Future struct {
item interface{}
err error
completed chan struct{}
}
func (f *Future) GetResult() (interface{}, error) {
<-f.completed
return f.item, f.err
}
func (f *Future) listenForResult(ch <-chan interface{}, timeout time.Duration) {
t := time.NewTimer(timeout)
select {
case item := <-ch:
f.item = item
t.Stop()
case <-t.C:
f.err = errTimeout
}
close(f.completed)
}
Just to assuage any doubts I had, I used Go’s data-race detector to test out the channels version. As expected, it did not find any data-races. I also benchmarked the locking+waitgroup version versus the channels version; both had similar performance but the former was slightly faster by a couple of nanoseconds. And that’s it! If you’ve enjoyed this post, do check out Go101’s in depth post on all the other interesting ways you can use channels in Go. Cheers!