Book Image

Effective Concurrency in Go

By : Burak Serdar
5 (1)
Book Image

Effective Concurrency in Go

5 (1)
By: Burak Serdar

Overview of this book

The Go language has been gaining momentum due to its treatment of concurrency as a core language feature, making concurrent programming more accessible than ever. However, concurrency is still an inherently difficult skill to master, since it requires the development of the right mindset to decompose problems into concurrent components correctly. This book will guide you in deepening your understanding of concurrency and show you how to make the most of its advantages. You’ll start by learning what guarantees are offered by the language when running concurrent programs. Through multiple examples, you will see how to use this information to develop concurrent algorithms that run without data races and complete successfully. You’ll also find out all you need to know about multiple common concurrency patterns, such as worker pools, asynchronous pipelines, fan-in/fan-out, scheduling periodic or future tasks, and error and panic handling in goroutines. The central theme of this book is to give you, the developer, an understanding of why concurrent programs behave the way they do, and how they can be used to build correct programs that work the same way in all platforms. By the time you finish the final chapter, you’ll be able to develop, analyze, and troubleshoot concurrent algorithms written in Go.
Table of Contents (13 chapters)

Condition variables

Condition variables differ from the previous concurrency primitives in the sense that, for Go, they are not an essential concurrency tool because, in most cases, a condition variable can be replaced with a channel. However, especially for shared memory systems, condition variables are important tools for synchronization. For example, the Java language builds one of its core synchronization features using condition variables.

A well-known problem of concurrent computing is the producer-consumer problem. There are one or more producer threads that produce a value. These values are consumed by one or more consumer threads. Since all producers and consumers are running concurrently, sometimes there are not enough values produced to satisfy all the consumers, and sometimes there are not enough consumers to consume values produced by the producers. There is usually a finite queue of values into which producers put, and from which consumers retrieve. There is already an elegant solution to this problem: use a channel. All producers write to the channel, and all consumers read from it, and the problem is solved. But in a shared memory system, a condition variable is usually employed for such a situation. A condition variable is a synchronization mechanism where multiple goroutines wait for a condition to occur, and another goroutine announces the occurrence of the condition to the waiting goroutines.

A condition variable supports three operations, as follows:

  • Wait: Blocks the current goroutine until a condition happens
  • Signal: Wakes up one of the waiting goroutines when the condition happens
  • Broadcast: Wakes up all of the waiting goroutines when the condition happens

Unlike the other concurrency primitives, a condition variable needs a mutex. The mutex is used to lock the critical sections in the goroutines that modify the condition. It does not matter what the condition is; what matters is that the condition can only be modified in a critical section and that critical section must be entered by locking the mutex used to construct the condition variable, as shown in the following code:

lock := sync.Mutex{}
cond := sync.NewCond(&lock)

Now let’s implement the producers-consumers problem using this condition variable. Our producers will produce integers and place them in a circular queue. The queue has a finite capacity, so the producer must wait until a consumer consumes from the queue if the queue is full. That means we need a condition variable that will cause the producers to wait until a consumer consumes a value. When the consumer consumes a value, the queue will have more space, and the producer can use it, but then the consumer who consumed that value has to signal the waiting producers that there is space available. Similarly, if consumers consume all the values before producers can produce new ones, the consumers have to wait until new values are available. So, we need another condition variable that will cause the consumers to wait until a producer produces a value. When a producer produces a new value, it has to signal to the waiting consumers that a new value is available.

Let’s start with a simple circular queue implementation:

type Queue struct {
    elements    []int
    front, rear int
    len         int
}
// NewQueue initializes an empty circular queue 
//with the given capacity
func NewQueue(capacity int) *Queue {
    return &Queue{
    elements: make([]int, capacity),
    front:    0,  // Read from elements[front]
    rear:     -1, // Write to elements[rear]
    len:      0,
    }
}
// Enqueue adds a value to the queue. Returns false 
// if queue is full
func (q *Queue) Enqueue(value int) bool {
    if q.len == len(q.elements) {
         return false
    }
    // Advance the write pointer, go around in a circle
    q.rear = (q.rear + 1) % len(q.elements)
    // Write the value
    q.elements[q.rear] = value
    q.len++
    return true
}
// Dequeue removes a value from the queue. Returns 0,false 
// if queue is empty
func (q *Queue) Dequeue() (int, bool) {
    if q.len == 0 {
          return 0, false
    }
    // Read the value at the read pointer
    data := q.elements[q.front]
    // Advance the read pointer, go around in a circle
    q.front = (q.front + 1) % len(q.elements)
    q.len--
    return data, true
}

We need a lock, two condition variables, and a circular queue:

func main() {
     lock := sync.Mutex{}
     fullCond := sync.NewCond(&lock)
     emptyCond := sync.NewCond(&lock)
     queue := NewQueue(10)

Here is the producer function. It runs in an infinite loop, producing random integer values:

producer := func() {
    for {
        // Produce value
        value := rand.Int()
        lock.Lock()
        for !queue.Enqueue(value) {
            fmt.Println("Queue is full")
            fullCond.Wait()
        }
        lock.Unlock()
        emptyCond.Signal()
        time.Sleep(time.Millisecond *
          time.Duration(rand.Intn(1000)))
    }
}

The producer generates a random integer, enters into its critical section, and attempts to enqueue the value. If it is successful, it unlocks the mutex and signals one of the consumers, letting it know that a value has been generated. If there are no consumers waiting on the emptyCond variable at that point, the signal is lost. If, however, the queue is full, then the producer starts waiting on the fullCond variable. Note that Wait is called in the critical section, with the mutex locked. When called, Wait atomically unlocks the mutex and suspends the execution of the goroutine. While waiting, the producer is no longer in its critical section, allowing the consumers to go into their own critical sections. When a consumer consumes a value, it will signal fullCond, which will wake one of the waiting producers up. When the producer wakes up, it will lock the mutex again. Waking up and locking the mutex is not atomic, which means, when Wait returns, the condition that woke up the goroutine may no longer hold, so Wait must be called inside a loop to recheck the condition. When the condition is rechecked, the goroutine will be in its critical section again, so no race conditions are possible.

The consumer is as follows:

consumer := func() {
for {
     lock.Lock()
     var v int
     for {
           var ok bool
           if v, ok = queue.Dequeue(); !ok {
                fmt.Println("Queue is empty")
                emptyCond.Wait()
                continue
           }
           break
}
     lock.Unlock()
     fullCond.Signal()
     time.Sleep(time.Millisecond * 
       time.Duration(rand.Intn(1000)))
     fmt.Println(v)
     }
}

Note the symmetry between the producer and the consumer. The consumer enters into its critical section and attempts to dequeue a value inside a for loop. If the queue has a value in it, it is read, the for loop terminates, and the mutex is unlocked. Then the goroutine notifies any potential producers that a value is read from the queue, so it is likely that the queue is not full. By the time the consumer exists in its critical section and signals the producer, it is possible that another producer produced values to fill up the queue. That’s why the producer has to check the condition again when it wakes up. The same logic applies to the consumer: if the consumer cannot read a value, it starts waiting, and when it wakes up, it has to check whether the queue has elements in it to be consumed.

The rest of the program is as follows:

for i := 0; i < 10; i++ {
     go producer()
}
for i := 0; i < 10; i++ {
     go consumer()
}
select {} // Wait indefinitely

You can run this program with different numbers of producers and consumers, and see how it behaves. When there are more producers than consumers, you should see more messages on the queue being full, and when there are more consumers than producers, you should see more messages on the queue being empty.