Acting up weird: Threads in Python

Recently, I ran across the Go programming language, which is focused on concurrent programming. Basically, Go embraces the Actor model, and makes it extremely easy to write your code in such a way. It does that by combining a couple of features:

  • Creating threads is a piece of cake: If you want to call a function like you always do, you write do_something(), and it gets done. If you want that function to run in a separate thread, you just write go do_something(). That's it.

  • Joining threads isn't necessary, Go takes care of that for you.

  • Synchronizing threads is done using channels, which are pretty much the same thing as Unix pipes: You write stuff into it, and someone else can read that stuff out of it.

  • If you're waiting for data from multiple threads, data can arrive either on a single channel for all threads, or one channel for each thread — it's up to you.

  • In order to do stuff based on which channel delivers data first, you use the select statement, which pretty much works like the good ole select() function does.

  • Function arguments and return are mostly obsolete. Instead, you pass input data on one channel, including the channel on which you want to receive your answer, and then block until you can read the answer from there.

  • Each goroutine uses one input channel for every action it can do, and uses select to switch between those.

Go Example: Synchronized stdout

As a result, your programs get structured in a way that allows for concurrency to happen while being able to nicely integrate access to resources that aren't shareable between threads. One such resource is the standard output: If multiple threads write onto it at the same time, you'll get a messed-up output. The only way to prevent that is by using locks — unless you're using a separate thread that exists solely for the purpose of writing to the standard output, and that reads its data from a channel. If every thread that wants to write to stdout writes to that channel, the system will inherently make sure that no two lines are mixed up while being written.

package main
import (
     "fmt"
     "math/rand"
     "time"
)
func printer(outc chan string) {
     for {
         fmt.Printf(<-outc)
     }
}
func main() {
     rand.Seed(time.Now().UnixNano())
     // retc is a channel on which the goroutines will tell *us* they're complete
     retc := make(chan bool)
     // outc will be used to talk to the printer
     outc := make(chan string)
     go printer(outc)
     // spawn five goroutines that have something to say
     for i := 1; i <= 5; i++ {
         go func(i int){
             time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
             outc <- fmt.Sprintf("Hello world from %d!\n", i)
             retc <- true
         }(i)
     }
     // Wait for them to complete
     for i := 1; i <= 5; i++ {
         <-retc
     }
     close(outc)
     close(retc)
}

Note that Go synchronizes stdout automatically, so you don't actually have to. This example is not meant to be used in code, it just illustrates the concept.

Wait, I thought this was about Python?

When I saw this, I fell in love pretty much immediately. It's just a wonderful pattern that gets rid of all the fuckups that threads like to drag along. So I decided to re-implement my pretty straightforward, single-threaded version of RSS2IMAP.py in Go, so I could make use of all that fancy stuff.

The script follows a pretty simple list of steps:

  • For each IMAP directory that's supposed to contain feeds,

    • for each feed,

      • for each new entry in the feed,

        • build an email message,

        • parse the entry's content to find images,

        • add those images to the email,

        • rewrite the content to reference images in the mail instead of on the web,

        • add the rewritten content to the email,

        • upload it to the IMAP directory.

Unfortunately, for some of those tasks, I didn't find any libraries I was satisfied with (I'm picky), so I was confronted with the choice of either writing those libraries myself, or looking for other ways. Out of laziness and curiousity, I chose to see if I can refactor my Python version this way, shamelessly stealing some stuff from Go in the process.

Ripping off Go

So if the whole thing is going to be a ripoff of Go, we have to find adequate replacements for go, select and channels.

The go statement is pretty easy. We can use the threading module to run functions in threads already, so all we need to do is define go, like such:

def go(func, *args, **kwargs):
     thr = threading.Thread(target=func, args=args, kwargs=kwargs)
     thr.daemon = True
     thr.start()
     return thr

Now, in order to run a function in the background, we change the call shoop(woop) to go(shoop, woop). It's not exactly go shoop(woop), but it gets pretty darn close, so that's good enough for me.

select is a tougher one, but thanks to Thomas's post on StackOverflow, there's something that comes pretty close too. Go's select looks like this:

  select {
  case p := <-channel1:
      do_something(p)
  case q := <-channel2:
      do_something_else(q)
  }

And here's Thomas's Python equivalent in action:

for which, msg in select(channel1, channel2):
     if which is channel1:
         do_something(msg)
     elif which is channel2:
         do_something_else(msg)

Again, that's close enough. Here's the implementation of select:

def select(*queues):
     combined = Queue(maxsize=0)
     def listen_and_forward(queue):
         while True:
             combined.put((queue, queue.get()))
     for queue in queues:
         go(listen_and_forward, queue)
     while True:
         yield combined.get()

It's beautifully simple, and it's actually designed in a way a Go coder would likely do it too, so this is definitely going in the right direction. Of course, Go also supports writing and non-blocking IO in their select, so this isn't an exact replacement, but it should do for the time being. (Imho, it's so awesome I almost feel bad for ripping this off and posting it here. Made by this guy, so all credit goes to him.)

That same thread also provides the answer as to what we want to use for our channels: Python's Queue module mimics that perfectly. Just like a channel, a Queue can be passed around, written to and read from. Reading blocks until there actually is something to be read, so synchronization is dealt with as well.

Basic Design

First of all, it's noteworthy that the basic design really isn't all that complicated. Python's imaplib isn't thread safe, so I decided to have one thread handling all the IMAP stuff, and one thread for each feed to be processed. Basically, that's it.

Now, while Go's stdout seems to be synchronized, Python's definitely isn't, so we're about to see our printer again — and since Python is made of awesome, we can use our go function as a decorator:

outq = Queue()
@go
def printer():
     while True:
         msg = outq.get()
         print "[%s] %s" % (datetime.now(), msg)

The IMAP handler thread offers two features, namely checking whether or not an article already exists, and uploading one. Users choose the feature they need by appending requests to the appropriate queue. The thread runs select over these two queues, processes any data it receives, and returns the result on the queue passed in by the user:

checkq  = Queue()
uploadq = Queue()
@go
def imapmaster():
     for whichq, command in select(checkq, uploadq):
         if whichq is checkq:
             returnq, mailbox, entid, entry = command
             serv.select(mailbox)
             typ, data = serv.search(None, '(HEADER X-RSS2IMAP-ID "%s")' % entid)
             returnq.put((entid, entry, bool(data[0])))
         elif whichq is uploadq:
             returnq, mailbox, mp = command
             serv.select(mailbox)
             serv.append(mailbox, "", imaplib.Time2Internaldate(time()), str(mp))
             returnq.put(True)

Now that we have a way to talk to the IMAP server in a synchronized fashion, we can change the feed processing threads as such:

def process_feed(dirname, feedname, feedinfo):
     feed = feedparser.parse(feedinfo["url"])
     pending = Queue()
     pendingupload = Queue()
     outstanding = 0
     outstandingupload = 0
     # Check which entries need to be uploaded.
     for entry in feed.entries:
         entid = hashlib.sha1(entry.id).hexdigest()
         checkq.put((pending, dirname, entid, entry))
         outstanding += 1
     # Process replies, queueing uploads for new articles.
     while outstanding:
         entid, entry, found = pending.get()
         outstanding -= 1
         if found:
             continue
         mp = build_email(entid, entry)
         uploadq.put((pendingupload, dirname, mp))
         outstandingupload += 1
     # Wait for all the uploads to complete.
     while outstandingupload:
         pendingupload.get()
         outstandingupload -= 1

Basically, we switch the code away from a "one step after another" approach to generating work for someone else, and then waiting for them to complete it so we can continue, thereby minimizing the time spent waiting: While I'm busy parsing the feed and creating those emails, other threads can happily fire away at the IMAP server and get their work done without being blocked by me, but I'm still able to deliver my uploads to the IMAP master thread and don't need to worry about when that thread will finally be able to process them. It does that as soon as it gets around to do it.

The result is this version of RSS2IMAP.py.

So, does it work?

Now that we've done all that pretty stuff, let's figure out if it worked. Running the old, single-threaded version inside time, I got the following measurement (I'm processing 25 feeds in my setup):

real    0m32.566s
user    0m15.929s
sys     0m0.092s

Huh, there's definitely room for some improvement there. The whole thing ran 32 seconds, but has been doing real work for only about half of the time. The rest ot the time it has been sleeping, waiting for IO to complete. So the single-threaded thing sucks, but how does the new shiny multithreaded version compete?

real    0m50.986s
user    0m37.234s
sys     0m43.219s

What the fuck? It takes about 20 seconds longer than the original, having almost tripled the work it's doing in user space, and sys time has skyrocketed! Running htop in another SSH window shows that the process takes an insane amount of CPU at that time too! What the hell is going on?

Well, the short answer is,

Python sucks

The most commonly used Python interpreter, cpython, has this tiny little thing called the Global Interpreter Lock. Since cpython's garbage collection isn't thread safe, it has to make sure that only one thread at a time is actually using it. It does that by ensuring that only one thread can run Python code at any given point in time.

That alone wouldn't be too bad, at least if you're running code inside those threads that doesn't do much CPU-bound stuff. If you're doing IO operations all the time, the threads are mostly blocked anyway, so there's not much competition. But if you're trying to do CPU-intense stuff, this changes completely, because then most threads will need to hold the GIL to get any work done. Running the feedparser module is one of these CPU-intense things to be doing.

Unfortunately, the way the GIL is implemented works nicely on single-core machines, but is a royal fuckup once multi-core hardware is involved (in Nov 2013, that is, always). If you want the gory details, I'd recommend watching Inside the Python GIL held by David Beazley, where he explains everything that's happening. The gist of it is that once you're on a multicore machine, the thread that currently has the GIL releases it every now and then so that others have a chance at getting it, but re-acquires it so quickly that that actually never happens. So this thread goes like "neener neener go get it go get it neener neener" all the time, creating an immense overhead in the process which prevents any actual work from getting done.

This is exactly what happens here. Trying to parse 25 feeds at once, feedparser hogs the CPU in those threads, and the GIL turns a minor annoyance into a royal catastrophe.

How do we fix that?

Fixing that would require changing the GIL implementation, which — for numerous reasons I'm not at all familiar with — isn't going to happen anytime soon. But we can at least work around this problem by reducing the "neener neener" frequency. Normally, the GIL release happens roughly every 100 steps in the Python code. This number looks suspiciously like an educated guess to me, because otherwise it would be more like a power of two or something. So if this is a guess, let's guess again:

if hasattr(sys, "setcheckinterval"):
    sys.setcheckinterval(10000)

Here's the result:

real    0m20.595s
user    0m16.373s
sys     0m0.544s

This looks pretty much okay. The sys workload has been reduced to a sane level, even though it's still significantly higher than in the single-threaded version. The time in which the thing is doing some actual work is roughly the same (which makes sense, because the amount of work is roughly the same too). However, the total time has improved: Instead of wasting 15 seconds waiting, we now only waste about 4.

Conclusion

So this was my first encounter with the Actor model. I think it's a pretty fascinating way to organize a program, and it requires thinking about programming from a whole new perspective. I guess this is most useful when you're shoveling data from one place to another, because the concept of selecting a bunch'a channels fits into that nicely.

Sadly, doing that stuff in Python may not be the best choice. IronPython and Jython don't have GILs so they're not affected by this problem, but they're not as readily available either. From my point of view, Go really rocks at this part, but introduces a whole new set of problems which currently prevent it from becoming my new language of choice.

I'm also thinking about writing some wrapper class thing that simplifies handling those queues, and packaging it together with go and select as shown here into some concurrency library or something.

Guess we'll have to wait and see.