A. Jesse Jiryu Davis

Tag: tornado

Slides From My Talk On Python Coroutines

Here's the slides from tonight's NYC Python Meetup talk on coroutines in Tornado and Tulip. The slides are a bit inscrutable on their own—it's my style to just show code, then talk a lot to explain the code. Still, if you were there [...]

Here's the slides from tonight's NYC Python Meetup talk on coroutines in Tornado and Tulip. The slides are a bit inscrutable on their own—it's my style to just show code, then talk a lot to explain the code. Still, if you were there tonight you may find these useful.

Python Coroutines, Present and Future from emptysquare

Toro Rewritten for Tornado 3.0

Speaking of my package Toro, I've just released version 0.5. Toro provides semaphores, queues, and so on, for advanced control flows with Tornado coroutines. Version 0.5 is a rewrite, motivated by two recent events. First, the release [...]

Toro

Speaking of my package Toro, I've just released version 0.5. Toro provides semaphores, queues, and so on, for advanced control flows with Tornado coroutines.

Version 0.5 is a rewrite, motivated by two recent events. First, the release of Tornado 3.0 has introduced a much more convenient coroutine API, and I wanted Toro to support the modern style. Second, I contributed a version of Toro's queues to Tulip, and the queues changed a bit in the process. As much as possible, I updated Toro to match the API of Tulip's locks and queues, for consistency's sake.

In previous versions, most Toro methods had to be wrapped in gen.Task, which made for weird-looking code. But using Toro is now quite graceful. For example, a producer-consumer pair:

q = toro.Queue()

@gen.coroutine
def producer():
    for item in range(5):
        print 'Sending', item
        yield q.put(item)

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        print '\t\t', 'Got', item

consumer()
producer()
IOLoop.current().start()

Another nice new feature: Semaphore.acquire and Lock.acquire can be used with the with statement:

lock = toro.Lock()

@gen.coroutine
def f():
   with (yield lock.acquire()):
       print "We're in the lock"

   print "Out of the lock"

More examples are in the docs. Enjoy!

Slides from my PyCon lightning talk on Toro

Here's the 8 slides for my 4½-minute talk on Toro this morning. Toro is a package I wrote last year that provides objects something like locks, events, conditions, semaphores, and queues for Tornado coroutines. PyCon lightning [...]

Here's the 8 slides for my 4½-minute talk on Toro this morning. Toro is a package I wrote last year that provides objects something like locks, events, conditions, semaphores, and queues for Tornado coroutines.

PyCon lightning talk on my Toro module for Tornado from emptysquare

Plop: Python Profiler With Call Graphs

Tornado's maintainer Ben Darnell released a Python Low-Overhead Profiler or "Plop" last year, and I'm just now playing with it. Unlike cProfile, which records every function call at great cost to the running process, Plop promises that [...]

Tornado's maintainer Ben Darnell released a Python Low-Overhead Profiler or "Plop" last year, and I'm just now playing with it. Unlike cProfile, which records every function call at great cost to the running process, Plop promises that "profile collection can be turned on and off in a live process with minimal performance impact."

A Plop Collector samples the process's call stack periodically (every 10 milliseconds by default) until you call Collector.stop(). Plop's profile viewer is a web application built on Tornado and d3.js, which uses a fun force-directed layout to display your process's call graph. You can use the demo scripts from Plop's repo to make an example profile:

Call graph

Functions are shown as circles, sized according to the number of times they were executed and colored according to filename. Edges connect callers to callees. The visualization nearly freezes Firefox but runs well in Chrome.

Plop isn't going to replace cProfile and RunSnakeRun, but that's not its intention. Better to think of it as a lightweight complement to the heavier machinery: Plop is nice for visualizing call graphs (which RunSnakeRun does badly) and for sampling a live process in a performance-critical environment.

YieldPoints: simple extensions to tornado.gen

I affectionately introduce YieldPoints, my littlest project yet. It's just some simple extensions to Tornado's gen module. The cutest example of what you can do with YieldPoints is the WaitAny class, which lets you begin multiple [ ... ]

YieldPoints

I affectionately introduce YieldPoints, my littlest project yet. It's just some simple extensions to Tornado's gen module.

The cutest example of what you can do with YieldPoints is the WaitAny class, which lets you begin multiple asynchronous tasks and handle their results in the order they complete:

@gen.engine
def f():
    callback0 = yield gen.Callback(0)
    callback1 = yield gen.Callback(1)

    # Fire callback1 soon, callback0 later
    IOLoop.instance().add_timeout(
        timedelta(seconds=0.1), partial(callback1, 'foo'))

    IOLoop.instance().add_timeout(
        timedelta(seconds=0.2), partial(callback0, 'bar'))

    keys = set([0, 1])
    while keys:
        key, result = yield yieldpoints.WaitAny(keys)
        print 'key:', key, ', result:', result
        keys.remove(key)

More examples are in the docs: you can use WithTimeout to wrap any callback in a timeout, and use Cancel or CancelAll to decline to wait for a callback you registered earlier. There's an adorable extended example that uses my library to start downloading multiple URLs at once, and process the results in the order received.

Further reading:

YieldPoints on Read the Docs

YieldPoints on Github

YieldPoints on PyPI

Toro: synchronization primitives for Tornado coroutines

I took a break from Motor to make a new package "Toro": queues, semaphores, locks, and so on for Tornado coroutines. (The name "Toro" is from "Tornado" and "Coro".) Why would you need something like this, especially since Tornado apps are [ ... ]

Toro

I took a break from Motor to make a new package "Toro": queues, semaphores, locks, and so on for Tornado coroutines. (The name "Toro" is from "Tornado" and "Coro".)

Why would you need something like this, especially since Tornado apps are usually single-threaded? Well, with Tornado's gen module you can turn Python generators into full-featured coroutines, but coordination among these coroutines is difficult. If one coroutine wants exclusive access to a resource, how can it notify other coroutines to proceed once it's finished? How do you allow N coroutines, but no more than N, access a resource at once? How do you start a set of coroutines and end your program when the last completes?

Each of these problems can be solved individually, but Toro's classes generalize the solutions. Toro provides to Tornado coroutines a set of locking primitives and queues analogous to those that Gevent provides to Greenlets, or that the standard library provides to threads.

Here's a producer-consumer example with a toro.Queue:

from tornado import ioloop, gen
import toro

q = toro.JoinableQueue(maxsize=3)

@gen.engine
def consumer():
    while True:
        item = yield gen.Task(q.get)
        try:
            print 'Doing work on', item
        finally:
            q.task_done()

@gen.engine
def producer():
    for item in range(10):
        yield gen.Task(q.put, item)

if __name__ == '__main__':
    producer()
    consumer()
    loop = ioloop.IOLoop.instance()
    q.join(callback=loop.stop) # block until all tasks are done
    loop.start()

More examples are in the docs: graceful shutdown using Toro's Lock, a caching proxy server with Event, and a web spider with Queue. Further reading:

Toro on Read the Docs

Toro on Github

Toro on PyPI

Toro logo by Musho Rodney Alan Greenblat

Refactoring Tornado Code With gen.engine

Sometimes writing callback-style asynchronous code with Tornado is a pain. But the real hurt comes when you want to refactor your async code into reusable subroutines. Tornado's gen module makes refactoring easy, but you need to learn a [ ... ]

Sometimes writing callback-style asynchronous code with Tornado is a pain. But the real hurt comes when you want to refactor your async code into reusable subroutines. Tornado's gen module makes refactoring easy, but you need to learn a few tricks first.

For Example

I'll use this blog to illustrate. I built it with Motor-Blog, a trivial blog platform on top of Motor, my new driver for Tornado and MongoDB.

When you came here, Motor-Blog did three or four MongoDB queries to render this page.

1: Find the blog post at this URL and show you this content.

2 and 3: Find the next and previous posts to render the navigation links at the bottom.

Maybe 4: If the list of categories on the left has changed since it was last cached, fetch the list.

Let's go through each query and see how the tornado.gen module makes life easier.

Fetching One Post

In Tornado, fetching one post takes a little more work than with blocking-style code:

db = motor.MotorConnection().open_sync().my_blog_db

class PostHandler(tornado.web.RequestHandler):
    @tornado.asynchronous
    def get(self, slug):
        db.posts.find_one({'slug': slug}, callback=self._found_post)

    def _found_post(self, post, error):
        if error:
            raise tornado.web.HTTPError(500, str(error))
        elif not post:
            raise tornado.web.HTTPError(404)
        else:
            self.render('post.html', post=post)

Not so bad. But is it better with gen?

class PostHandler(tornado.web.RequestHandler):
    @tornado.asynchronous
    @gen.engine
    def get(self, slug):
        post, error = yield gen.Task(
            db.posts.find_one, {'slug': slug})

        if error:
            raise tornado.web.HTTPError(500, str(error))
        elif not post:
            raise tornado.web.HTTPError(404)
        else:
            self.render('post.html', post=post)

A little better. The yield statement makes this function a generator. gen.engine is a brilliant hack which runs the generator until it's complete. Each time the generator yields a Task, gen.engine schedules the generator to be resumed when the task is complete. Read the source code of the Runner class for details, it's exhilarating. Or just enjoy the glow of putting all your logic in a single function again, without defining any callbacks.

Motor includes a subclass of gen.Task called motor.Op. It handles checking and raising the exception for you, so the above can be simplified further:

@tornado.asynchronous
@gen.engine
def get(self, slug):
    post = yield motor.Op(
        db.posts.find_one, {'slug': slug})  
    if not post:
        raise tornado.web.HTTPError(404)
    else:
        self.render('post.html', post=post)

Still, no huge gains. gen starts to shine when you need to parallelize some tasks.

Fetching Next And Previous

Once Motor-Blog finds the current post, it gets the next and previous posts. Since the two queries are independent we can save a few milliseconds by doing them in parallel. How does this look with callbacks?

@tornado.asynchronous
def get(self, slug):
    db.posts.find_one({'slug': slug}, callback=self._found_post)

def _found_post(self, post, error):
    if error:
        raise tornado.web.HTTPError(500, str(error))
    elif not post:
        raise tornado.web.HTTPError(404)
    else:
        _id = post['_id']
        self.post = post

        # Two queries in parallel
        db.posts.find_one({'_id': {'$lt': _id}},
            callback=self._found_prev)
        db.posts.find_one({'_id': {'$gt': _id}},
            callback=self._found_next)

def _found_prev(self, prev, error):
    if error:
        raise tornado.web.HTTPError(500, str(error))
    else:
        self.prev = prev
        if self.next:
            # Done
            self._render()

def _found_next(self, next, error):
    if error:
        raise tornado.web.HTTPError(500, str(error))
    else:
        self.next = next
        if self.prev:
            # Done
            self._render()

def _render(self)
    self.render('post.html',
        post=self.post, prev=self.prev, next=self.next)

This is completely disgusting and it makes me want to give up on Tornado. All that boilerplate can't be factored out. Will gen help?

@tornado.asynchronous
@gen.engine
def get(self, slug):
    post, error = yield motor.Op(
        db.posts.find_one, {'slug': slug})
    if not post:
        raise tornado.web.HTTPError(404)
    else:
        prev, next = yield [
            motor.Op(db.posts.find_one, {'_id': {'$lt': _id}}),
            motor.Op(db.posts.find_one, {'_id': {'$gt': _id}})]

        self.render('post.html', post=post, prev=prev, next=next)

Now our single get function is just as nice as it would be with blocking code. In fact, the parallel fetch is far easier than if you were multithreading instead of using Tornado. But what about factoring out a common subroutine that request handlers can share?

Fetching Categories

Every page on my blog needs to show the category list on the left side. Each request handler could just include this in its get method:

categories = yield motor.Op(
    db.categories.find().sort('name').to_list)

But that's terrible engineering. Here's how to factor it into a subroutine with gen:

@gen.engine
def get_categories(db, callback):
    try:
        categories = yield motor.Op(
            db.categories.find().sort('name').to_list)
    except Exception, e:
        callback(None, e)
        return

    callback(categories, None)

This function does not have to be part of a request handler—it stands on its own at the module scope. To call it from a request handler, do:

class PostHandler(tornado.web.RequestHandler):
    @tornado.asynchronous
    @gen.engine
    def get(self, slug):
        categories = yield motor.Op(get_categories)
        # ... get the current, previous, and next posts as usual, then ...
        self.render('post.html',
            post=post, prev=prev, next=next, categories=categories)

gen.engine runs get until it yields get_categories, then a separate engine runs get_categories until it calls the callback, which resumes get. It's almost like a regular function call!

This is particularly nice because I want to cache the categories between page views. get_categories can be updated very simply to use a cache:

categories = None
@gen.engine
def get_categories(db, callback):
    global categories
    if not categories:
        try:
            categories = yield motor.Op(
                db.categories.find().sort('name').to_list)
        except Exception, e:
            callback(None, e)
            return

    callback(categories, None)

(Note for nerds: I invalidate the cache whenever a post with a never-before-seen category is added. The "new category" signal is saved to a capped collection in MongoDB, which all the Tornado servers are always tailing. That'll be the subject of a future post.)

Conclusion

The gen module's excellent documentation shows briefly how a method that makes a few async calls can be simplified using gen.engine, but the power really comes when you need to factor out a common subroutine. It's not obvious how to do that at first, but there are only three steps:

1. Decorate the subroutine with @gen.engine.

2. Make the subroutine take a callback argument (it must be called callback), to which the subroutine will pass its results when finished.

3. Call the subroutine within an engine-decorated function like:

result = yield gen.Task(subroutine)

result contains the value or values that subroutine passed to the callback.

If you follow Motor's convention where every callback takes arguments (result, error), then you can use motor.Op to deal with the exception:

result = yield motor.Op(subroutine)

Pausing with Tornado

Throwing this in my blog so I don't forget again. The way to sleep for a certain period of time using tornado.gen is: import tornado.web from tornado.ioloop import IOLoop from tornado import gen class [ ... ]

Throwing this in my blog so I don't forget again. The way to sleep for a certain period of time using tornado.gen is:

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen

class MyHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @gen.engine
    def get(self):
        self.write("sleeping .... ")
        # Do nothing for 5 sec
        loop = IOLoop.instance()
        yield gen.Task(loop.add_timeout, time.time() + 5)
        self.write("I'm awake!")
        self.finish()

Simple once you see it, but for some reason this has been the hardest for me to get used to.

Video, Slides, and Code About Async Python and MongoDB

Video is now online from my webinar last week about Tornado and MongoDB. Alas, I didn't make the text on my screen big enough to be easily readable in the low-res video we recorded, so it'll be a little fuzzy for you. (Live and learn.) No [ ... ]

Video is now online from my webinar last week about Tornado and MongoDB. Alas, I didn't make the text on my screen big enough to be easily readable in the low-res video we recorded, so it'll be a little fuzzy for you. (Live and learn.) No worries, the slides are here in full-res glory and the example code is on GitHub. It's a trivial Twitter clone called "chirp" which demonstrates using a MongoDB capped collection as a sort of queue. The demo uses Tornado, a MongoDB tailable cursor, and socket.io to stream new "chirps" from the capped collection to clients. I've implemented the same demo app three times: