Awesome threading using nurseries
I wrote about a nice threading pattern using Queues for synchronization a while ago. In the meantime, I’ve stumbled across Trio and I loved their concept of nurseries, which is basically a wrapper around the threads-and-queues concept that also makes sure that threads are properly joined and do not run out of control.
So today I ran across a case where I’d have to query large chunks of data from two systems in parallel, that I would then have to compare and merge into a third data set. Thus I needed a way to start two threads, return stuff from them to the main thread, and join them back together. A nursery came to mind, because it does just that. So that’s what I used, and here’s the code for it!
The basic pattern is outlined in the Python multiprocessing module:
- Create a Queue for the process to return its result through.
- Spawn the Process, let it do its thing in the background.
- Once you feel like it, join it.
- When it’s done, read its return value from the Queue.
Trio nurseries are context guards. This fits nicely because that way, you can have the nursery keep track of all the processes and queues, and the fact that it’s a context guard means that the nursery will always get a chance to clean its stuff up, which amounts to joining the processes. I’m going to use threads, but it’s pretty much the same thing.
So here’s the code for the nursery:
from collections import deque
from threading import Thread
class ThreadNursery(object):
def __init__(self):
self.threads = []
self.queues = []
def __enter__(self):
return self
def go(self, target_fn):
= deque()
result_q def wrapper_fn():
# call the target function and put
# its return value into the queue
result_q.append(target_fn())
# Create a thread that runs the wrapper
= Thread(target=wrapper_fn)
thrd self.threads.append(thrd)
self.queues.append(result_q)
thrd.start()
def __exit__(self, type, value, traceback):
for thrd in self.threads:
thrd.join()
def get_results(self):
= []
results for queue in self.queues:
results.append(queue.popleft())return results
And now I can use it like this:
with ThreadNursery() as nursery:
@nursery.go
def query_first_system():
= []
things while True:
= requests.get("http://somewhere/api/get_stuff")
resp
things.append(resp.json())if resp.json()["more"] == False:
break
return things
@nursery.go
def query_second_system():
= []
things while True:
= requests.get("http://somewhere-else/api/get_stuff")
resp
things.append(resp.json())if resp.json()["more"] == False:
break
return things
@nursery.go
def calculate_pi():
from time import sleep
import math
10)
sleep(return math.pi
# the `with` block is over, so the nursery now joins everything
= nursery.get_results() first_system_things, second_system_things, pi
I like how all the threading and Queues business is completely hidden away from the functions that do actual work. They just look like any other function would, and even the rest of the code doesn’t look all that special. All the plumbing is nicely hidden in our nursery.
Update: See also go statement considered harmful.