Go Concurrency from the Ground Up

Apr 17, 2019 at 6:35PM
Caleb Doxsey

Sometimes the best way to learn something is to build it. This guide will walk you through how to reproduce Go's concurrency features in another programming language.

We will use Python 3 as the language for implementation. This guide is broken into 4 parts:

Throughout this guide I will reference the Go Specification.

Part 1: Design

Consider this program:

func main() {
    go a()
    go b()
    select {} // prevent the program from terminating, ignore for now
}
func a() {
    go aa()
    go ab()
}
func aa() { println("aa") }
func ab() { println("ab") }
func b() {
    go ba()
    go bb()
}
func ba() { println("ba") }
func bb() { println("bb") }

This program uses the go keyword to start several concurrently executing functions (goroutines). main starts a and b, a starts aa and ab and b starts ba and bb.

There are different ways we could implement this program. For example we might start each goroutine on a new thread, run them in parallel and yield an execution like this:

Parallel Execution

However another approach to concurrency allows us to execute all the goroutines on a single thread. To do this we utilize an execution queue. Anytime we call go we enqueue the goroutine into the execution queue and anytime we complete a goroutine, we dequeue the next one and execute it. It might look something like this:

sequential
Sequential Execution

Both of these approaches are examples of scheduling: the method by which work is assigned to resources that complete the work. The sequential approach is straightforward to implement in nearly any programming language, so we will use it as the basis for our design.

Channels

Go also has channels, which allow goroutines to communicate and synchronize their execution. Consider this program:

func main() {
    ch := make(chan int)
    go a(ch)
    go b(ch)
    select {} // prevent the program from terminating, ignore for now
}
func a(ch chan int) {
    println("a before")
    ch <- 5
    println("a after")
}
func b(ch chan int) {
    println("b before")
    println(<-ch)
    println("b after")
}

Like before main starts a and b, but this time a sends the value 5 to ch and b receives a value from ch and prints it.

Channel operations in Go are synchronous. The sending of 5 in a can't proceed until b gets to <-ch. And vice-versa, b can't proceed until a reaches ch <- 5. For this reason channels are sometimes called rendezvous points, because they represent points in the code where two goroutines must meet.

With our current design, execution would invariably deadlock in the face of channels. a would get to ch <- 5 and get stuck, because the <-ch in b will never run. So we need to introduce an additional mechanism for blocked goroutines.

Suppose we had a way of pausing and resuming a function. If we could do that we could move the blocked goroutine out of the execution queue and store it on the channel along with the value we'd like to send (5). Then, when <-ch is encountered in b, we could re-enqueue a to call println("a after"), and b would receive the value 5 and call println(5) ; println("b after"). Like this:

channels
Channels

A function which has this pause/resume capability is often called a coroutine (for which goroutines get their name). Python 3 has native support for coroutines and we will see later how to use them, but what about languages which don't have coroutines?

Callbacks

Instead of pausing/resuming goroutines we can break up the functions which define them and use callbacks for blocking operations. For example:

func main() {
    ch := make(chan int)
    go a(ch)
    go b(ch)
    select {} // prevent the program from terminating, ignore for now
}
func a(ch chan int) {
    println("a before")
    send(ch, 5, func() {
        println("a after")
    })
}
func b(ch chan int) {
    println("b before")
    recv(ch, func(value) {
        println(value)
        println("b after")
    })
}

send(ch, 5, ...) is like ch <- 5, but it has an additional parameter: a function which represents all the code that occurred after the original ch <- 5.

In the same way recv(ch, ...) replaces <-ch, except this time the callback function has an additional argument representing the value received from the channel.

Selection

Often forgotten in discussions about concurrency in Go is the select statement. It allows us to wait on multiple channel operations and proceed with the first one that is able to proceed:

func main() {
    ch1, ch2 := make(chan int), make(chan int)
    go a(ch2)
    select {
    case value := <-ch1:
        println("1:", value)
    case value := <-ch2:
        println("2:", value)
    }
}
func a(ch chan int) {
    ch <- 5
}

In this example the second case will be chosen since a is sending 5 to ch2. Since nothing is sending to ch1 that case would block forever.

select has nuanced behavior, but a couple more things worth mentioning:

You can use a default case if nothing else is able to proceed:

select {
case value := <-ch1:
    println("1:", value)
case value := <-ch2:
    println("2:", value)
default:
    println("everything was blocked")
}

And finally an empty select with no cases blocks forever.

Part 2: Implementation

We will implement our scheduler as a Python 3 module. Our API will consist of 9 functions:

def go(callback):
  pass
def run():
  pass
def make():
  pass
def len(channel):
  pass
def cap(channel):
  pass
def send(channel, value, callback):
  pass
def recv(channel, callback):
  pass
def close(channel):
  pass
def select(cases, callback):
  pass
default = object() # used in select

These functions correspond with concurrent functionality in Go. Here's a table of their usage:

Example in Python Example in Go
go
go(lambda: print("in a goroutine"))
go println("in a goroutine")
make
ch = make()
ch := make(chan int)
len
len(ch)
len(ch)
cap
cap(ch)
cap(ch)
send
send(ch, 5, lambda:
     print("sent!"))
ch <- 5
println("sent!")
receive
recv(ch, lambda value, ok:
     print("received!", value, ok))
value, ok := <-ch
println("received!", value, ok)
close
close(ch)
close(ch)
select
select([
  (recv, ch1,
   lambda v1, ok: print("received!", v1, ok)),
  (send, ch2, v2,
   lambda: print("sent!")),
  (default,
   lambda: print("default!"))],
  lambda: print("after select")
)
select {
case v1, ok := <-ch1:
  println("received!", v1, ok)
case ch2 <- v2:
  println("sent!")
default:
  println("default!")
}
println("after select")
run
run()
implicit as the main function

This API uses callbacks, meaning the go, send, receive and select functions all take in another function to call when the operation completes. For example:

def f():
  print("in a goroutine")
go(f)

# or:
go(lambda: print("in a goroutine"))

Using callbacks can be pretty confusing, especially when you use loops, but an API using callbacks is easier to implement at first and translates well into other programming languages. In part 4 we will see how to convert this API into one that uses async/await.

In addition to these module-level functions, we will need a class for the Channel:

class Channel:
    def __init__(self):
        self.closed = False
        self.waiting_to_send = WaitingQueue()
        self.waiting_to_recv = WaitingQueue()

This Channel class uses a helper WaitingQueue class:

class WaitingQueue(list):
    total = 0

    def enqueue(self, x):
        WaitingQueue.total += 1
        self.append(x)

    def dequeue(self, x=None):
        if x is None:
            x = self.pop(0)
            WaitingQueue.total -= 1
        else:
            # attempt to remove the passed in item from the queue
            idx = self.index(x)
            if idx is not None:
                self.pop(idx)
                WaitingQueue.total -= 1
        return x

This WaitingQueue class supports enqueuing and dequeuing elements. It also supports dequeuing a specific element if passed in. We track total as a class-level variable so we can know if anything is still waiting when our application completes.

Scheduling Methods

Goroutines will be modeled as coroutines using a queue of functions. Calling go enqueues a function for execution and calling run will continually dequeue each function and run it until there are no functions left. For example:

# queue: []
go(lambda: print("hello"))
# queue: [lambda: print("hello")]
go(lambda: print("world"))
# queue: [lambda: print("hello"), lambda: print("world")]
run()
# queue: []

# results in:
# hello
# world

go

go will just enqueue the callback into a list:

execution_queue = []

def go(callback):
    if callback:
        execution_queue.append(callback)

run

run will continuously dequeue functions and execute them:

def run():
    while execution_queue:
        f = execution_queue.pop(0)
        f()

    if WaitingQueue.total > 0:
        raise Exception("fatal error: all goroutines are asleep - deadlock")

We raise an exception when there are goroutines blocked on channel operations and no functions left in the execution queue because this mirrors the behavior in Go.

Channel Methods

make

make just calls the Channel constructor:

def make():
    return Channel()

len

len is always 0 with an unbuffered channel:

def len(channel):
    return 0

cap

cap is always 0 with an unbuffered channel:

def cap(channel):
    return 0

send

For send the specification states:

Communication blocks until the send can proceed. A send on an unbuffered channel can proceed if a receiver is ready. [...] A send on a closed channel proceeds by causing a run-time panic. A send on a nil channel blocks forever.

To implement this behavior, we will do the following:

  1. If the channel is None we immediately return, which will result in us blocking forever.
  2. If the channel is closed we raise an exception.
  3. If there's a receive function waiting on this channel we remove it, execute both callbacks and return.
  4. Finally we push the callback and the value onto the waiting_to_send queue.
def send(channel, value, callback):
    # "A send on a nil channel blocks forever."
    if channel is None:
        WaitingQueue.total += 1
        return

    # "A send on a closed channel proceeds by causing a run-time panic."
    if channel.closed:
        raise Exception("send on closed channel")

    # "A send on an unbuffered channel can proceed if a receiver is ready."
    if channel.waiting_to_recv:
        receiver = channel.waiting_to_recv.dequeue()
        go(callback)
        go(lambda: receiver(value, True))
        return

    channel.waiting_to_send.enqueue((value, callback))

If there are multiple goroutines receiving values on a channel, does it matter which one we pick? Unlike select (which we will see later), the specification is silent on this question, so it's up to us to decide. The standard Go runtime uses a linked list of listening channels as described here, so that's why we remove the first one.

recv

For recv the specification states:

The expression blocks until a value is available. Receiving from a nil channel blocks forever. A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.

recv is the mirror image of send. It takes in a channel and a callback and first tries to find a matching send operation. If none is found we push the callback function onto the waiting_to_recv queue:

def recv(channel, callback):
    # "Receiving from a nil channel blocks forever."
    if channel is None:
        WaitingQueue.total += 1
        return

    # "if anything is currently blocked on sending for this channel, receive it"
    if channel.waiting_to_send:
        value, sender = channel.waiting_to_send.dequeue()
        go(lambda: callback(value, True))
        go(sender)
        return

    # "A receive operation on a closed channel can always proceed immediately,
    # yielding the element type's zero value after any previously sent values have been received."
    if channel.closed:
        go(lambda: callback(None, False))
        return

    channel.waiting_to_recv.enqueue(callback)

A slight subtly here, we check channel.closed after we attempt to receive a value because the spec says: "A receive operation on a closed channel can always proceed immediately [...] after any previously sent values have been received". This will matter for buffered channels.

close

close sets .closed on the channel, but also completes any receivers/senders.

def close(channel):
    # if the channel is already closed, we panic
    if channel.closed:
        raise Exception("close of closed channel")

    channel.closed = True

    # complete any senders
    while channel.waiting_to_send:
        value, callback = channel.waiting_to_send.dequeue()
        send(channel, value, callback)

    # complete any receivers
    while channel.waiting_to_recv:
        callback = channel.waiting_to_recv.dequeue()
        recv(channel, callback)

Selection

Selection is more complicated than the previous functions. When used it looks like this:

select(
  [
    (recv, ch1, lambda v1, ok: print("received!", v1, ok)),
    (send, ch2, v2, lambda: print("sent!")),
    (default, lambda: print("default!"))
  ],
  lambda: print("after select")
)

We pass in a list of tuples. Each of those tuples is either a send, a recv or a default case.

The specification states that the select statement should work like this:

  1. For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send. Any side effects in that evaluation will occur irrespective of which (if any) communication operation is selected to proceed. Expressions on the left-hand side of a RecvStmt with a short variable declaration or assignment are not yet evaluated.
  2. If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
  3. Unless the selected case is the default case, the respective communication operation is executed.
  4. If the selected case is a RecvStmt with a short variable declaration or an assignment, the left-hand side expressions are evaluated and the received value (or values) are assigned.
  5. The statement list of the selected case is executed.

So there are basically 3 possibilities:

  1. At least one of the cases is able to proceed, in which case we pick one at random and execute it.
  2. None of the cases is able to proceed, but there's a default case, in which case we execute that.
  3. None of the cases is able to proceed and there is no default case, so we enqueue waiting callbacks into each of the channels for sending/receiving. When one of those callbacks is chosen, we remove all the others to ensure that only one of the cases proceeds.

select

Here's the implementation:

from random import randint
import builtins

# used to indicate the default case in a select
default = object()

def select(cases, callback=None):
    def is_ready(case):
        if case[0] == send:
            return case[1].closed or case[1].waiting_to_recv
        elif case[0] == recv:
            return case[1].closed or case[1].waiting_to_send
        elif case[0] == default:
            return False

    # first see if any of the cases are ready to proceed
    ready = [case for case in cases if is_ready(case)]
    if ready:
        # pick a random one
        case = ready[randint(0, builtins.len(ready)-1)]
        if case[0] == send:
            send(case[1], case[2], case[3])
        elif case[0] == recv:
            recv(case[1], case[2])
        go(callback)
        return

    # next see if there's a default case
    defaults = [case for case in cases if case[0] == default]
    if defaults:
        defaults[0]()
        go(callback)
        return

    # finally we will enqueue each case into the waiting queues
    # we also update each callback so it will cleanup all the
    # other cases so only one is fired

    wrapped = []

    def cleanup():
        for case in wrapped:
            if case[0] == send:
                case[1].waiting_to_send.dequeue((case[2], case[3]))
            elif case[0] == recv:
                case[1].waiting_to_recv.dequeue(case[2])
        go(callback)

    # overwrite all the callbacks and enqueue into the waiting queues
    for case in cases:
        if case[0] == send:
            new_case = (case[0], case[1], case[2],
                        lambda: (cleanup(), case[3]()))
            case[1].waiting_to_send.enqueue((new_case[2], new_case[3]))
            wrapped.append(new_case)
        elif case[0] == recv:
            new_case = (case[0], case[1],
                        lambda value, ok: (cleanup(), case[2](value, ok)))
            case[1].waiting_to_recv.enqueue(new_case[2])
            wrapped.append(new_case)

Example Usage

That's all of our methods. You can see the code in its entirety here: github.com/golang-book/concurrency/python/v1.

Let's see an example of how we might use this module. One of my favorite goroutine-heavy examples is concurrent merge sort. It's made up of two functions. The first is a merge function, which takes two sorted lists and creates a new sorted and combined list. In Go it looks like this:

func merge(l, r []int) []int {
    m := make([]int, 0, len(l)+len(r))
    for len(l) > 0 || len(r) > 0 {
        switch {
        case len(l) == 0:
            m = append(m, r[0])
            r = r[1:]
        case len(r) == 0:
            m = append(m, l[0])
            l = l[1:]
        case l[0] <= r[0]:
            m = append(m, l[0])
            l = l[1:]
        case l[0] > r[0]:
            m = append(m, r[0])
            r = r[1:]
        }
    }
    return m
}

And in Python:

def merge(l, r):
    m = []
    while len(l) > 0 or len(r) > 0:
        if len(l) == 0:
            m.append(r[0])
            r = r[1:]
        elif len(r) == 0:
            m.append(l[0])
            l = l[1:]
        elif l[0] <= r[0]:
            m.append(l[0])
            l = l[1:]
        else:
            m.append(r[0])
            r = r[1:]
    return m

Using that function we can implement our concurrent sort function. First in Go:

func ConcurrentMergeSort(xs []int) []int {
    switch len(xs) {
    case 0:
        return nil
    case 1, 2:
        return merge(xs[:1], xs[1:])
    default:
        lc, rc := make(chan []int), make(chan []int)
        go func() {
            lc <- ConcurrentMergeSort(xs[:len(xs)/2])
        }()
        go func() {
            rc <- ConcurrentMergeSort(xs[len(xs)/2:])
        }()
        return merge(<-lc, <-rc)
    }
}

And the equivalent Python:

from concurrency import go, make, recv, run, send

def concurrent_merge_sort(xs, callback):
    if len(xs) <= 1:
        callback(xs)
    else:
        lc, rc = make(), make()
        go(lambda: concurrent_merge_sort(xs[:len(xs)//2], lambda l:
                                         send(lc, l, lambda: None)))
        go(lambda: concurrent_merge_sort(xs[len(xs)//2:], lambda r:
                                         send(rc, r, lambda: None)))
        recv(lc, lambda l, ok:
             recv(rc, lambda r, ok:
                  callback(merge(l, r))))

# example usage:
def test_concurrent_merge_sort():
    def callback(result):
        assert result == [1, 2, 3, 4, 5]
    concurrent_merge_sort([2, 3, 1, 5, 4], callback)
    run()

It's definitely not as clean as the Go version but if you squint you can see the original.

Part 3: Channel Buffering

Go channels can also be buffered:

A new, initialized channel value can be made using the built-in function make, which takes the channel type and an optional capacity as arguments:


make(chan int, 100)

> The capacity, in number of elements, sets the size of the buffer in the channel. If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready. Otherwise, the channel is buffered and communication succeeds without blocking if the buffer is not full (sends) or not empty (receives).

Buffered channels are frequently used to improve performance and they can also prevent deadlocks and orphaned goroutines. For example consider this channel which is unbuffered:

```go
func main() {
    ch := make(chan int)
    ch <- 5
    // will never get here
}

The main function gets stuck on ch <- 5 because there's nothing available to read it. With a buffered channel the function will complete:

func main() {
    ch := make(chan int, 1)
    ch <- 5
    // will get here just fine
}

To support buffering we will make several changes

make

make and will now take an optional capacity:

def make(capacity=0):
    return Channel(capacity)

And Channel will be modified to include the capacity and a buffer:

class Channel:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.closed = False
        self.waiting_to_send = WaitingQueue()
        self.waiting_to_recv = WaitingQueue()

len

len now returns the length of the buffer:

def len(channel):
    return builtins.len(channel.buffer)

cap

cap now returns the channel capacity:

def cap(channel):
    return channel.capacity

send

send has an additional case before blocking:

def send(channel, value, callback):
    # "A send on a nil channel blocks forever."
    if channel is None:
        WaitingQueue.total += 1
        return

    # "A send on a closed channel proceeds by causing a run-time panic."
    if channel.closed:
        raise Exception("send on closed channel")

    # "A send on an unbuffered channel can proceed if a receiver is ready."
    if channel.waiting_to_recv:
        receiver = channel.waiting_to_recv.dequeue()
        go(callback)
        go(lambda: receiver(value, True))
        return

    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    # NEW
    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    # "A send on a buffered channel can proceed if there is room in the buffer."
    if len(channel) < cap(channel):
        channel.buffer.append(value)
        go(callback)
        return
    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

    channel.waiting_to_send.enqueue((value, callback))

recv

recv also has an additional case:

def recv(channel, callback):
    # "Receiving from a nil channel blocks forever."
    if channel is None:
        WaitingQueue.total += 1
        return

    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    # NEW
    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    # if there is a value in the buffer, receive it
    if len(channel) > 0:
        # pop the first element, because:
        # "Channels act as first-in-first-out queues.
        # For example, if one goroutine sends values on a channel and
        # a second goroutine receives them,
        # the values are received in the order sent. "
        value = channel.buffer.pop(0)
        go(lambda: callback(value, True))
        return
    # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

    # "if anything is currently blocked on sending for this channel, receive it"
    if channel.waiting_to_send:
        value, sender = channel.waiting_to_send.dequeue()
        go(lambda: callback(value, True))
        go(sender)
        return

    # "A receive operation on a closed channel can always proceed immediately,
    # yielding the element type's zero value after any previously sent values have been received."
    if channel.closed:
        go(lambda: callback(None, False))
        return

    channel.waiting_to_recv.enqueue(callback)

Checking the buffer first reproduces the behavior of this program:

func main() {
    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)

    // Even though `ch` is closed, this still prints:
    //   1
    //   2
    //   3
    for i := range ch {
        fmt.Println(i)
    }
}

select

Finally we have to update the is_ready check in the select function to account for the buffer:

def select(cases, callback=None):
    def is_ready(case):
        if case[0] == send:
            return case[1].closed or len(case[1]) < cap(case[1]) or case[1].waiting_to_recv
        elif case[0] == recv:
            return case[1].closed or len(case[1]) > 0 or case[1].waiting_to_send
        elif case[0] == default:
            return False

The rest of select function stays the same.

Part 4: Async/Await

As we saw with the concurrent_merge_sort function, working with callbacks can be pretty confusing. Like many programming languages Python 3 offers special syntax for working with coroutines via the asyncio library.

Coroutines are created with the async keyword and can wait on the execution of other coroutines via the await keyword:

import asyncio

async def g():
    return 5

async def f():
    x = await g()
    print(x)

asyncio.run(f())

To make our API async-friendly we will make several changes.

run

We get rid of the run function and use asyncio.run instead. Typically you'd make an async main function:

import asyncio

async def main():
    ch = make(1)
    await send(ch, 1)
    value, ok = await recv(ch)
    print(value, ok)

asyncio.run(main())

go

go just calls asyncio.create_task:

def go(task):
    if task:
        asyncio.create_task(task)

asyncio manages its own execution queue for us.

send

send is now an async function and no longer takes a callback. The second major change is that when blocked we now enqueue a future instead of a callback:

async def send(channel, value):
    # "A send on a nil channel blocks forever."
    if channel is None:
        await asyncio.Future()

    # "A send on a closed channel proceeds by causing a run-time panic."
    if channel.closed:
        raise Exception("send on closed channel")

    # "A send on an unbuffered channel can proceed if a receiver is ready."
    if channel.waiting_to_recv:
        future = channel.waiting_to_recv.dequeue()
        future.set_result((value, True))
        return

    # "A send on a buffered channel can proceed if there is room in the buffer."
    if len(channel) < cap(channel):
        channel.buffer.append(value)
        return

    future = asyncio.Future()
    channel.waiting_to_send.enqueue((value, future))
    await future

recv

Like with send above we make similar changes to recv:

async def recv(channel):
    # "Receiving from a nil channel blocks forever."
    if channel is None:
        await asyncio.Future()

    # if there is a value in the buffer, receive it
    if len(channel) > 0:
        # pop the first element, because:
        # "Channels act as first-in-first-out queues.
        # For example, if one goroutine sends values on a channel and
        # a second goroutine receives them,
        # the values are received in the order sent. "
        value = channel.buffer.pop(0)
        return value, True

    # "if anything is currently blocked on sending for this channel, receive it"
    if channel.waiting_to_send:
        value, future = channel.waiting_to_send.dequeue()
        future.set_result(None)
        return value, True

    # "A receive operation on a closed channel can always proceed immediately,
    # yielding the element type's zero value after any previously sent values have been received."
    if channel.closed:
        return None, False

    future = asyncio.Future()
    channel.waiting_to_recv.enqueue(future)
    value, ok = await future
    return value, ok

select

select has the most changes. The first two cases are mostly the same, except we use await to wait for callbacks to complete:

async def select(cases):
    # block forever
    if builtins.len(cases) == 0:
        await asyncio.Future()

    def is_ready(case):
        if case[0] == send:
            return case[1].closed or len(case[1]) < cap(case[1]) or case[1].waiting_to_recv
        elif case[0] == recv:
            return case[1].closed or len(case[1]) > 0 or case[1].waiting_to_send
        elif case[0] == default:
            return False

    # first see if any of the cases are ready to proceed
    ready = [case for case in cases if is_ready(case)]
    if ready:
        # pick a random one
        case = ready[randint(0, builtins.len(ready)-1)]
        if case[0] == send:
            await send(case[1], case[2])
            await case[3]()
        elif case[0] == recv:
            value, ok = await recv(case[1])
            await case[2](value, ok)
        return

    # next see if there's a default case
    defaults = [case for case in cases if case[0] == default]
    if defaults:
        await defaults[0]()
        return

However the last case is significantly different. We enqueue a future for each case into the corresponding waiting queue and use asyncio.wait with asyncio.FIRST_COMPLETED to wait for the first one to finish. When it finishes we dequeue all the other futures and call the corresponding send/recv functionality:

    futures = []
    for case in cases:
        future = asyncio.Future()
        if case[0] == send:
            case[1].waiting_to_send.enqueue((case[2], future))
        elif case[0] == recv:
            case[1].waiting_to_recv.enqueue(future)

    # wait for one to complete
    done, _ = asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)

    # remove the others
    for i, case in enumerate(cases):
        future = futures[i]
        if case[0] == send:
            case[1].waiting_to_send.dequeue((case[2], future))
        elif case[0] == recv:
            case[1].waiting_to_recv.dequeue(future)

    for i, future in enumerate(futures):
        if future == done[0]:
            if cases[i][0] == send:
                await future
                await cases[i][3]()
            elif cases[i][0] == recv:
                value, ok = await future
                await cases[i][2](value, ok)

Example Usage

Here's the concurrent merge sort function with an async-friendly API:

async def concurrent_merge_sort(xs):
    if len(xs) <= 1:
        return xs
    else:
        lc, rc = make(), make()

        async def left():
            value = await concurrent_merge_sort(xs[:len(xs)//2])
            await send(lc, value)
        go(left())

        async def right():
            value = await concurrent_merge_sort(xs[len(xs)//2:])
            await send(rc, value)
        go(right())

        l, _ = await recv(lc)
        r, _ = await recv(rc)
        return merge(l, r)

# example usage:
def test_concurrent_merge_sort():
    async def main():
        result = await concurrent_merge_sort([2, 3, 1, 5, 4])
        assert result == [1, 2, 3, 4, 5]
    asyncio.run(main())

The merge function is the same.

You can find the code for this version of the API here: github.com/golang-book/concurrency/python/v3.

Conclusion

The goal of this exercise was to implement Go's concurrency features in another programming language and thereby better understand how they work. I hope I achieved that goal.

Go has a much more sophisticated scheduler than the one we implemented: it supports pre-emption (meaning threads can be stopped not just at callbacks or await keywords) and is multi-threaded, fully capable of utilizing all of the cores of your processor. It also supports asynchronous networking IO and time-based scheduling (sleep, tickers, etc).

Despite its sophistication many of the ideas we've seen in our toy implementation are part of the scheduler design. Instead of a single execution queue, Go has a queue for each thread. And when threads empty their queue and have nothing to do, they steal goroutines from other threads. But otherwise its not terribly dissimilar. (Of course once you introduce real multi-threading you have to be much more careful about thread-safety)

This code was surprisingly difficult to write. Although each version of the library is only a couple hundred lines of code, that code is quite hard to debug and full of nuance and edge cases. In fact I would be surprised if there weren't still some undiscovered bugs or things I missed.