Awesome threading using nurseries

I wrote about a nice threading pattern using Queues for synchronization a while ago. In the meantime, I've stumbled across Trio and I loved their concept of nurseries, which is basically a wrapper around the threads-and-queues concept that also makes sure that threads are properly joined and do not run out of control.

So today I ran across a case where I'd have to query large chunks of data from two systems in parallel, that I would then have to compare and merge into a third data set. Thus I needed a way to start two threads, return stuff from them to the main thread, and join them back together. A nursery came to mind, because it does just that. So that's what I used, and here's the code for it!

The basic pattern is outlined in the Python multiprocessing module:

  • Create a Queue for the process to return its result through.
  • Spawn the Process, let it do its thing in the background.
  • Once you feel like it, join it.
  • When it's done, read its return value from the Queue.

Trio nurseries are context guards. This fits nicely because that way, you can have the nursery keep track of all the processes and queues, and the fact that it's a context guard means that the nursery will always get a chance to clean its stuff up, which amounts to joining the processes. I'm going to use threads, but it's pretty much the same thing.

So here's the code for the nursery:

from collections import deque
from threading   import Thread

class ThreadNursery(object):
    def __init__(self):
        self.threads = []
        self.queues = []

    def __enter__(self):
        return self

    def go(self, target_fn):
        result_q = deque()
        def wrapper_fn():
            # call the target function and put
            # its return value into the queue
            result_q.append(target_fn())

        # Create a thread that runs the wrapper
        thrd = Thread(target=wrapper_fn)
        self.threads.append(thrd)
        self.queues.append(result_q)
        thrd.start()

    def __exit__(self, type, value, traceback):
        for thrd in self.threads:
            thrd.join()

    def get_results(self):
        results = []
        for queue in self.queues:
            results.append(queue.popleft())
        return results

And now I can use it like this:

with ThreadNursery() as nursery:
    @nursery.go
    def query_first_system():
        things = []
        while True:
            resp = requests.get("http://somewhere/api/get_stuff")
            things.append(resp.json())
            if resp.json()["more"] == False:
                break
        return things

    @nursery.go
    def query_second_system():
        things = []
        while True:
            resp = requests.get("http://somewhere-else/api/get_stuff")
            things.append(resp.json())
            if resp.json()["more"] == False:
                break
        return things

    @nursery.go
    def calculate_pi():
        from time import sleep
        import math
        sleep(10)
        return math.pi

# the `with` block is over, so the nursery now joins everything

first_system_things, second_system_things, pi = nursery.get_results()

I like how all the threading and Queues business is completely hidden away from the functions that do actual work. They just look like any other function would, and even the rest of the code doesn't look all that special. All the plumbing is nicely hidden in our nursery.

Update: See also go statement considered harmful.