Sometimes the best way to learn something is to build it. This guide will walk you through how to reproduce Go's concurrency features in another programming language.
We will use Python 3 as the language for implementation. This guide is broken into 4 parts:
Throughout this guide I will reference the Go Specification.
Consider this program:
func main() {
go a()
go b()
select {} // prevent the program from terminating, ignore for now
}
func a() {
go aa()
go ab()
}
func aa() { println("aa") }
func ab() { println("ab") }
func b() {
go ba()
go bb()
}
func ba() { println("ba") }
func bb() { println("bb") }
This program uses the go
keyword to start several concurrently executing functions (goroutines). main
starts a
and b
, a
starts aa
and ab
and b
starts ba
and bb
.
There are different ways we could implement this program. For example we might start each goroutine on a new thread, run them in parallel and yield an execution like this:
However another approach to concurrency allows us to execute all the goroutines on a single thread. To do this we utilize an execution queue. Anytime we call go
we enqueue the goroutine into the execution queue and anytime we complete a goroutine, we dequeue the next one and execute it. It might look something like this:
Both of these approaches are examples of scheduling: the method by which work is assigned to resources that complete the work. The sequential approach is straightforward to implement in nearly any programming language, so we will use it as the basis for our design.
Go also has channels, which allow goroutines to communicate and synchronize their execution. Consider this program:
func main() {
ch := make(chan int)
go a(ch)
go b(ch)
select {} // prevent the program from terminating, ignore for now
}
func a(ch chan int) {
println("a before")
ch <- 5
println("a after")
}
func b(ch chan int) {
println("b before")
println(<-ch)
println("b after")
}
Like before main
starts a
and b
, but this time a
sends the value 5
to ch
and b
receives a value from ch
and prints it.
Channel operations in Go are synchronous. The sending of 5
in a
can't proceed until b
gets to <-ch
. And vice-versa, b
can't proceed until a
reaches ch <- 5
. For this reason channels are sometimes called rendezvous points, because they represent points in the code where two goroutines must meet.
With our current design, execution would invariably deadlock in the face of channels. a
would get to ch <- 5
and get stuck, because the <-ch
in b
will never run. So we need to introduce an additional mechanism for blocked goroutines.
Suppose we had a way of pausing and resuming a function. If we could do that we could move the blocked goroutine out of the execution queue and store it on the channel along with the value we'd like to send (5
). Then, when <-ch
is encountered in b
, we could re-enqueue a
to call println("a after")
, and b
would receive the value 5
and call println(5) ; println("b after")
. Like this:
A function which has this pause/resume capability is often called a coroutine (for which goroutines get their name). Python 3 has native support for coroutines and we will see later how to use them, but what about languages which don't have coroutines?
Instead of pausing/resuming goroutines we can break up the functions which define them and use callbacks for blocking operations. For example:
func main() {
ch := make(chan int)
go a(ch)
go b(ch)
select {} // prevent the program from terminating, ignore for now
}
func a(ch chan int) {
println("a before")
send(ch, 5, func() {
println("a after")
})
}
func b(ch chan int) {
println("b before")
recv(ch, func(value) {
println(value)
println("b after")
})
}
send(ch, 5, ...)
is like ch <- 5
, but it has an additional parameter: a function which represents all the code that occurred after the original ch <- 5
.
In the same way recv(ch, ...)
replaces <-ch
, except this time the callback function has an additional argument representing the value received from the channel.
Often forgotten in discussions about concurrency in Go is the select
statement. It allows us to wait on multiple channel operations and proceed with the first one that is able to proceed:
func main() {
ch1, ch2 := make(chan int), make(chan int)
go a(ch2)
select {
case value := <-ch1:
println("1:", value)
case value := <-ch2:
println("2:", value)
}
}
func a(ch chan int) {
ch <- 5
}
In this example the second case will be chosen since a
is sending 5
to ch2
. Since nothing is sending to ch1
that case would block forever.
select
has nuanced behavior, but a couple more things worth mentioning:
You can use a default
case if nothing else is able to proceed:
select {
case value := <-ch1:
println("1:", value)
case value := <-ch2:
println("2:", value)
default:
println("everything was blocked")
}
And finally an empty select
with no cases blocks forever.
We will implement our scheduler as a Python 3 module. Our API will consist of 9 functions:
def go(callback):
pass
def run():
pass
def make():
pass
def len(channel):
pass
def cap(channel):
pass
def send(channel, value, callback):
pass
def recv(channel, callback):
pass
def close(channel):
pass
def select(cases, callback):
pass
default = object() # used in select
These functions correspond with concurrent functionality in Go. Here's a table of their usage:
Example in Python | Example in Go | |
---|---|---|
go |
go(lambda: print("in a goroutine"))
|
go println("in a goroutine")
|
make |
ch = make()
|
ch := make(chan int)
|
len |
len(ch)
|
len(ch)
|
cap |
cap(ch)
|
cap(ch)
|
send |
send(ch, 5, lambda:
print("sent!"))
|
ch <- 5
println("sent!")
|
receive |
recv(ch, lambda value, ok:
print("received!", value, ok))
|
value, ok := <-ch
println("received!", value, ok)
|
close |
close(ch)
|
close(ch)
|
select |
select([
(recv, ch1,
lambda v1, ok: print("received!", v1, ok)),
(send, ch2, v2,
lambda: print("sent!")),
(default,
lambda: print("default!"))],
lambda: print("after select")
)
|
select {
case v1, ok := <-ch1:
println("received!", v1, ok)
case ch2 <- v2:
println("sent!")
default:
println("default!")
}
println("after select")
|
run |
run()
|
implicit as the main function
|
This API uses callbacks, meaning the go
, send
, receive
and select
functions all take in another function to call when the operation completes. For example:
def f():
print("in a goroutine")
go(f)
# or:
go(lambda: print("in a goroutine"))
Using callbacks can be pretty confusing, especially when you use loops, but an API using callbacks is easier to implement at first and translates well into other programming languages. In part 4 we will see how to convert this API into one that uses async/await.
In addition to these module-level functions, we will need a class for the Channel
:
class Channel:
def __init__(self):
self.closed = False
self.waiting_to_send = WaitingQueue()
self.waiting_to_recv = WaitingQueue()
This Channel
class uses a helper WaitingQueue
class:
class WaitingQueue(list):
total = 0
def enqueue(self, x):
WaitingQueue.total += 1
self.append(x)
def dequeue(self, x=None):
if x is None:
x = self.pop(0)
WaitingQueue.total -= 1
else:
# attempt to remove the passed in item from the queue
idx = self.index(x)
if idx is not None:
self.pop(idx)
WaitingQueue.total -= 1
return x
This WaitingQueue
class supports enqueuing and dequeuing elements. It also supports dequeuing a specific element if passed in. We track total
as a class-level variable so we can know if anything is still waiting when our application completes.
Goroutines will be modeled as coroutines using a queue of functions. Calling go
enqueues a function for execution and calling run
will continually dequeue each function and run it until there are no functions left. For example:
# queue: []
go(lambda: print("hello"))
# queue: [lambda: print("hello")]
go(lambda: print("world"))
# queue: [lambda: print("hello"), lambda: print("world")]
run()
# queue: []
# results in:
# hello
# world
go
will just enqueue the callback into a list:
execution_queue = []
def go(callback):
if callback:
execution_queue.append(callback)
run
will continuously dequeue functions and execute them:
def run():
while execution_queue:
f = execution_queue.pop(0)
f()
if WaitingQueue.total > 0:
raise Exception("fatal error: all goroutines are asleep - deadlock")
We raise an exception when there are goroutines blocked on channel operations and no functions left in the execution queue because this mirrors the behavior in Go.
make
just calls the Channel constructor:
def make():
return Channel()
len
is always 0 with an unbuffered channel:
def len(channel):
return 0
cap
is always 0 with an unbuffered channel:
def cap(channel):
return 0
For send
the specification states:
Communication blocks until the send can proceed. A send on an unbuffered channel can proceed if a receiver is ready. [...] A send on a closed channel proceeds by causing a run-time panic. A send on a nil channel blocks forever.
To implement this behavior, we will do the following:
None
we immediately return, which will result in us blocking forever.waiting_to_send
queue.def send(channel, value, callback):
# "A send on a nil channel blocks forever."
if channel is None:
WaitingQueue.total += 1
return
# "A send on a closed channel proceeds by causing a run-time panic."
if channel.closed:
raise Exception("send on closed channel")
# "A send on an unbuffered channel can proceed if a receiver is ready."
if channel.waiting_to_recv:
receiver = channel.waiting_to_recv.dequeue()
go(callback)
go(lambda: receiver(value, True))
return
channel.waiting_to_send.enqueue((value, callback))
If there are multiple goroutines receiving values on a channel, does it matter which one we pick? Unlike select
(which we will see later), the specification is silent on this question, so it's up to us to decide. The standard Go runtime uses a linked list of listening channels as described here, so that's why we remove the first one.
For recv
the specification states:
The expression blocks until a value is available. Receiving from a nil channel blocks forever. A receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received.
recv
is the mirror image of send
. It takes in a channel and a callback and first tries to find a matching send
operation. If none is found we push the callback function onto the waiting_to_recv
queue:
def recv(channel, callback):
# "Receiving from a nil channel blocks forever."
if channel is None:
WaitingQueue.total += 1
return
# "if anything is currently blocked on sending for this channel, receive it"
if channel.waiting_to_send:
value, sender = channel.waiting_to_send.dequeue()
go(lambda: callback(value, True))
go(sender)
return
# "A receive operation on a closed channel can always proceed immediately,
# yielding the element type's zero value after any previously sent values have been received."
if channel.closed:
go(lambda: callback(None, False))
return
channel.waiting_to_recv.enqueue(callback)
A slight subtly here, we check channel.closed
after we attempt to receive a value because the spec says: "A receive operation on a closed channel can always proceed immediately [...] after any previously sent values have been received". This will matter for buffered channels.
close
sets .closed
on the channel, but also completes any receivers/senders.
def close(channel):
# if the channel is already closed, we panic
if channel.closed:
raise Exception("close of closed channel")
channel.closed = True
# complete any senders
while channel.waiting_to_send:
value, callback = channel.waiting_to_send.dequeue()
send(channel, value, callback)
# complete any receivers
while channel.waiting_to_recv:
callback = channel.waiting_to_recv.dequeue()
recv(channel, callback)
Selection is more complicated than the previous functions. When used it looks like this:
select(
[
(recv, ch1, lambda v1, ok: print("received!", v1, ok)),
(send, ch2, v2, lambda: print("sent!")),
(default, lambda: print("default!"))
],
lambda: print("after select")
)
We pass in a list of tuples. Each of those tuples is either a send
, a recv
or a default
case.
The specification states that the select statement should work like this:
- For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send. Any side effects in that evaluation will occur irrespective of which (if any) communication operation is selected to proceed. Expressions on the left-hand side of a RecvStmt with a short variable declaration or assignment are not yet evaluated.
- If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
- Unless the selected case is the default case, the respective communication operation is executed.
- If the selected case is a RecvStmt with a short variable declaration or an assignment, the left-hand side expressions are evaluated and the received value (or values) are assigned.
- The statement list of the selected case is executed.
So there are basically 3 possibilities:
Here's the implementation:
from random import randint
import builtins
# used to indicate the default case in a select
default = object()
def select(cases, callback=None):
def is_ready(case):
if case[0] == send:
return case[1].closed or case[1].waiting_to_recv
elif case[0] == recv:
return case[1].closed or case[1].waiting_to_send
elif case[0] == default:
return False
# first see if any of the cases are ready to proceed
ready = [case for case in cases if is_ready(case)]
if ready:
# pick a random one
case = ready[randint(0, builtins.len(ready)-1)]
if case[0] == send:
send(case[1], case[2], case[3])
elif case[0] == recv:
recv(case[1], case[2])
go(callback)
return
# next see if there's a default case
defaults = [case for case in cases if case[0] == default]
if defaults:
defaults[0]()
go(callback)
return
# finally we will enqueue each case into the waiting queues
# we also update each callback so it will cleanup all the
# other cases so only one is fired
wrapped = []
def cleanup():
for case in wrapped:
if case[0] == send:
case[1].waiting_to_send.dequeue((case[2], case[3]))
elif case[0] == recv:
case[1].waiting_to_recv.dequeue(case[2])
go(callback)
# overwrite all the callbacks and enqueue into the waiting queues
for case in cases:
if case[0] == send:
new_case = (case[0], case[1], case[2],
lambda: (cleanup(), case[3]()))
case[1].waiting_to_send.enqueue((new_case[2], new_case[3]))
wrapped.append(new_case)
elif case[0] == recv:
new_case = (case[0], case[1],
lambda value, ok: (cleanup(), case[2](value, ok)))
case[1].waiting_to_recv.enqueue(new_case[2])
wrapped.append(new_case)
That's all of our methods. You can see the code in its entirety here: github.com/golang-book/concurrency/python/v1.
Let's see an example of how we might use this module. One of my favorite goroutine-heavy examples is concurrent merge sort. It's made up of two functions. The first is a merge function, which takes two sorted lists and creates a new sorted and combined list. In Go it looks like this:
func merge(l, r []int) []int {
m := make([]int, 0, len(l)+len(r))
for len(l) > 0 || len(r) > 0 {
switch {
case len(l) == 0:
m = append(m, r[0])
r = r[1:]
case len(r) == 0:
m = append(m, l[0])
l = l[1:]
case l[0] <= r[0]:
m = append(m, l[0])
l = l[1:]
case l[0] > r[0]:
m = append(m, r[0])
r = r[1:]
}
}
return m
}
And in Python:
def merge(l, r):
m = []
while len(l) > 0 or len(r) > 0:
if len(l) == 0:
m.append(r[0])
r = r[1:]
elif len(r) == 0:
m.append(l[0])
l = l[1:]
elif l[0] <= r[0]:
m.append(l[0])
l = l[1:]
else:
m.append(r[0])
r = r[1:]
return m
Using that function we can implement our concurrent sort function. First in Go:
func ConcurrentMergeSort(xs []int) []int {
switch len(xs) {
case 0:
return nil
case 1, 2:
return merge(xs[:1], xs[1:])
default:
lc, rc := make(chan []int), make(chan []int)
go func() {
lc <- ConcurrentMergeSort(xs[:len(xs)/2])
}()
go func() {
rc <- ConcurrentMergeSort(xs[len(xs)/2:])
}()
return merge(<-lc, <-rc)
}
}
And the equivalent Python:
from concurrency import go, make, recv, run, send
def concurrent_merge_sort(xs, callback):
if len(xs) <= 1:
callback(xs)
else:
lc, rc = make(), make()
go(lambda: concurrent_merge_sort(xs[:len(xs)//2], lambda l:
send(lc, l, lambda: None)))
go(lambda: concurrent_merge_sort(xs[len(xs)//2:], lambda r:
send(rc, r, lambda: None)))
recv(lc, lambda l, ok:
recv(rc, lambda r, ok:
callback(merge(l, r))))
# example usage:
def test_concurrent_merge_sort():
def callback(result):
assert result == [1, 2, 3, 4, 5]
concurrent_merge_sort([2, 3, 1, 5, 4], callback)
run()
It's definitely not as clean as the Go version but if you squint you can see the original.
Go channels can also be buffered:
A new, initialized channel value can be made using the built-in function make, which takes the channel type and an optional capacity as arguments:
make(chan int, 100)
> The capacity, in number of elements, sets the size of the buffer in the channel. If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready. Otherwise, the channel is buffered and communication succeeds without blocking if the buffer is not full (sends) or not empty (receives).
Buffered channels are frequently used to improve performance and they can also prevent deadlocks and orphaned goroutines. For example consider this channel which is unbuffered:
```go
func main() {
ch := make(chan int)
ch <- 5
// will never get here
}
The main
function gets stuck on ch <- 5
because there's nothing available to read it. With a buffered channel the function will complete:
func main() {
ch := make(chan int, 1)
ch <- 5
// will get here just fine
}
To support buffering we will make several changes
make
and will now take an optional capacity:
def make(capacity=0):
return Channel(capacity)
And Channel
will be modified to include the capacity
and a buffer
:
class Channel:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.closed = False
self.waiting_to_send = WaitingQueue()
self.waiting_to_recv = WaitingQueue()
len
now returns the length of the buffer:
def len(channel):
return builtins.len(channel.buffer)
cap
now returns the channel capacity:
def cap(channel):
return channel.capacity
send
has an additional case before blocking:
def send(channel, value, callback):
# "A send on a nil channel blocks forever."
if channel is None:
WaitingQueue.total += 1
return
# "A send on a closed channel proceeds by causing a run-time panic."
if channel.closed:
raise Exception("send on closed channel")
# "A send on an unbuffered channel can proceed if a receiver is ready."
if channel.waiting_to_recv:
receiver = channel.waiting_to_recv.dequeue()
go(callback)
go(lambda: receiver(value, True))
return
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# NEW
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# "A send on a buffered channel can proceed if there is room in the buffer."
if len(channel) < cap(channel):
channel.buffer.append(value)
go(callback)
return
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
channel.waiting_to_send.enqueue((value, callback))
recv
also has an additional case:
def recv(channel, callback):
# "Receiving from a nil channel blocks forever."
if channel is None:
WaitingQueue.total += 1
return
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# NEW
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# if there is a value in the buffer, receive it
if len(channel) > 0:
# pop the first element, because:
# "Channels act as first-in-first-out queues.
# For example, if one goroutine sends values on a channel and
# a second goroutine receives them,
# the values are received in the order sent. "
value = channel.buffer.pop(0)
go(lambda: callback(value, True))
return
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# "if anything is currently blocked on sending for this channel, receive it"
if channel.waiting_to_send:
value, sender = channel.waiting_to_send.dequeue()
go(lambda: callback(value, True))
go(sender)
return
# "A receive operation on a closed channel can always proceed immediately,
# yielding the element type's zero value after any previously sent values have been received."
if channel.closed:
go(lambda: callback(None, False))
return
channel.waiting_to_recv.enqueue(callback)
Checking the buffer first reproduces the behavior of this program:
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch)
// Even though `ch` is closed, this still prints:
// 1
// 2
// 3
for i := range ch {
fmt.Println(i)
}
}
Finally we have to update the is_ready
check in the select
function to account for the buffer:
def select(cases, callback=None):
def is_ready(case):
if case[0] == send:
return case[1].closed or len(case[1]) < cap(case[1]) or case[1].waiting_to_recv
elif case[0] == recv:
return case[1].closed or len(case[1]) > 0 or case[1].waiting_to_send
elif case[0] == default:
return False
The rest of select
function stays the same.
As we saw with the concurrent_merge_sort
function, working with callbacks can be pretty confusing. Like many programming languages Python 3 offers special syntax for working with coroutines via the asyncio
library.
Coroutines are created with the async
keyword and can wait on the execution of other coroutines via the await
keyword:
import asyncio
async def g():
return 5
async def f():
x = await g()
print(x)
asyncio.run(f())
To make our API async-friendly we will make several changes.
We get rid of the run
function and use asyncio.run
instead. Typically you'd make an async main function:
import asyncio
async def main():
ch = make(1)
await send(ch, 1)
value, ok = await recv(ch)
print(value, ok)
asyncio.run(main())
go
just calls asyncio.create_task
:
def go(task):
if task:
asyncio.create_task(task)
asyncio
manages its own execution queue for us.
send
is now an async function and no longer takes a callback. The second major change is that when blocked we now enqueue a future instead of a callback:
async def send(channel, value):
# "A send on a nil channel blocks forever."
if channel is None:
await asyncio.Future()
# "A send on a closed channel proceeds by causing a run-time panic."
if channel.closed:
raise Exception("send on closed channel")
# "A send on an unbuffered channel can proceed if a receiver is ready."
if channel.waiting_to_recv:
future = channel.waiting_to_recv.dequeue()
future.set_result((value, True))
return
# "A send on a buffered channel can proceed if there is room in the buffer."
if len(channel) < cap(channel):
channel.buffer.append(value)
return
future = asyncio.Future()
channel.waiting_to_send.enqueue((value, future))
await future
Like with send
above we make similar changes to recv
:
async def recv(channel):
# "Receiving from a nil channel blocks forever."
if channel is None:
await asyncio.Future()
# if there is a value in the buffer, receive it
if len(channel) > 0:
# pop the first element, because:
# "Channels act as first-in-first-out queues.
# For example, if one goroutine sends values on a channel and
# a second goroutine receives them,
# the values are received in the order sent. "
value = channel.buffer.pop(0)
return value, True
# "if anything is currently blocked on sending for this channel, receive it"
if channel.waiting_to_send:
value, future = channel.waiting_to_send.dequeue()
future.set_result(None)
return value, True
# "A receive operation on a closed channel can always proceed immediately,
# yielding the element type's zero value after any previously sent values have been received."
if channel.closed:
return None, False
future = asyncio.Future()
channel.waiting_to_recv.enqueue(future)
value, ok = await future
return value, ok
select
has the most changes. The first two cases are mostly the same, except we use await
to wait for callbacks to complete:
async def select(cases):
# block forever
if builtins.len(cases) == 0:
await asyncio.Future()
def is_ready(case):
if case[0] == send:
return case[1].closed or len(case[1]) < cap(case[1]) or case[1].waiting_to_recv
elif case[0] == recv:
return case[1].closed or len(case[1]) > 0 or case[1].waiting_to_send
elif case[0] == default:
return False
# first see if any of the cases are ready to proceed
ready = [case for case in cases if is_ready(case)]
if ready:
# pick a random one
case = ready[randint(0, builtins.len(ready)-1)]
if case[0] == send:
await send(case[1], case[2])
await case[3]()
elif case[0] == recv:
value, ok = await recv(case[1])
await case[2](value, ok)
return
# next see if there's a default case
defaults = [case for case in cases if case[0] == default]
if defaults:
await defaults[0]()
return
However the last case is significantly different. We enqueue a future for each case into the corresponding waiting queue and use asyncio.wait
with asyncio.FIRST_COMPLETED
to wait for the first one to finish. When it finishes we dequeue all the other futures and call the corresponding send
/recv
functionality:
futures = []
for case in cases:
future = asyncio.Future()
if case[0] == send:
case[1].waiting_to_send.enqueue((case[2], future))
elif case[0] == recv:
case[1].waiting_to_recv.enqueue(future)
# wait for one to complete
done, _ = asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
# remove the others
for i, case in enumerate(cases):
future = futures[i]
if case[0] == send:
case[1].waiting_to_send.dequeue((case[2], future))
elif case[0] == recv:
case[1].waiting_to_recv.dequeue(future)
for i, future in enumerate(futures):
if future == done[0]:
if cases[i][0] == send:
await future
await cases[i][3]()
elif cases[i][0] == recv:
value, ok = await future
await cases[i][2](value, ok)
Here's the concurrent merge sort function with an async-friendly API:
async def concurrent_merge_sort(xs):
if len(xs) <= 1:
return xs
else:
lc, rc = make(), make()
async def left():
value = await concurrent_merge_sort(xs[:len(xs)//2])
await send(lc, value)
go(left())
async def right():
value = await concurrent_merge_sort(xs[len(xs)//2:])
await send(rc, value)
go(right())
l, _ = await recv(lc)
r, _ = await recv(rc)
return merge(l, r)
# example usage:
def test_concurrent_merge_sort():
async def main():
result = await concurrent_merge_sort([2, 3, 1, 5, 4])
assert result == [1, 2, 3, 4, 5]
asyncio.run(main())
The merge
function is the same.
You can find the code for this version of the API here: github.com/golang-book/concurrency/python/v3.
The goal of this exercise was to implement Go's concurrency features in another programming language and thereby better understand how they work. I hope I achieved that goal.
Go has a much more sophisticated scheduler than the one we implemented: it supports pre-emption (meaning threads can be stopped not just at callbacks or await
keywords) and is multi-threaded, fully capable of utilizing all of the cores of your processor. It also supports asynchronous networking IO and time-based scheduling (sleep, tickers, etc).
Despite its sophistication many of the ideas we've seen in our toy implementation are part of the scheduler design. Instead of a single execution queue, Go has a queue for each thread. And when threads empty their queue and have nothing to do, they steal goroutines from other threads. But otherwise its not terribly dissimilar. (Of course once you introduce real multi-threading you have to be much more careful about thread-safety)
This code was surprisingly difficult to write. Although each version of the library is only a couple hundred lines of code, that code is quite hard to debug and full of nuance and edge cases. In fact I would be surprised if there weren't still some undiscovered bugs or things I missed.