[Python-Dev] Minimal async event loop and async utilities (Was: PEP 492: async/await in Python; version 4) (original) (raw)

PJ Eby pje at telecommunity.com
Fri May 15 19:03:41 CEST 2015


On Mon, May 11, 2015 at 6:05 PM, Guido van Rossum <guido at python.org> wrote:

OTOH you may look at micropython's uasyncio -- IIRC it doesn't have Futures and it definitely has I/O waiting.

Here's a sketch of an extremely minimal main loop that can do I/O without Futures, and might be suitable as a PEP example. (Certainly, it would be hard to write a simpler example than this, since it doesn't even use any classes or require any specially named methods, works with present-day generators, and is (I think) both 2.x/3.x compatible.)

coroutines = []     # round-robin of currently "running" coroutines

def schedule(coroutine, val=None, err=None):
    coroutines.insert(0, (coroutine, val, err))

def runLoop():
    while coroutines:
        (coroutine, val, err) = coroutines.pop()
        try:
            if err is not None:
                suspend = coroutine.throw(err)
            else
                suspend = coroutine.send(val)
        except StopIteration:
            # coroutine is finished, so don't reschedule it
            continue

        except Exception:
            # framework-specific detail  (i.e., log it, send
            # to an error handling coroutine, or just stop the program
            # Here, we just ignore it and stop the coroutine
            continue

        else:
            if hasattr(suspend, '__call__') and suspend(coroutine):
                continue
            else:
                # put it back on the round-robin list
                schedule(coroutine)

To use it, schedule() one or more coroutines, then call runLoop(), which will run as long as there are things to do. Each coroutine scheduled must yield thunks: callable objects that take a coroutine as a parameter, and return True if the coroutine should be suspended, or False if it should continue to run. If the thunk returns true, that means the thunk has taken responsibility for arranging to schedule() the coroutine with a value or error when it's time to send it the result of the suspension.

You might be asking, "wait, but where's the I/O?" Why, in a coroutine, of course...

readers = {}
writers = {}
timers = []

def readable(fileno):
    """yield readable(fileno) resumes when fileno is readable"""
    def suspend(coroutine):
        readers[fileno] = coroutine
        return True
    return suspend

def writable(fileno):
    """yield writable(fileno) resumes when fileno is writable"""
    def suspend(coroutine):
        writers[fileno] = coroutine
        return True
    return suspend

def sleepFor(seconds):
    """yield sleepFor(seconds) resumes after that much time"""
    return suspendUntil(time.time() + seconds)

def suspendUntil(timestamp):
    """yield suspendUntil(timestamp) resumes when that time is reached"""
    def suspend(coroutine)
        heappush(timers, (timestamp, coroutine)
    return suspend

def doIO(): while coroutines or readers or writers or timers:

        # Resume scheduled tasks
        while timers and timers[0][0] <= time.time():
            ts, coroutine = heappop(timers)
            schedule(coroutine)

        if readers or writers:
            if coroutines:
                # Other tasks are running; use minimal timeout
                timeout = 0.001
            else if timers:
                timeout = max(timers[0][0] - time.time(), 0.001)
            else:
                timeout = 0     # take as long as necessary
            r, w, e = select(readers, writers, [], timeout)
            for rr in r: schedule(readers.pop(rr))
            for ww in w: schedule(writers.pop(ww))

        yield   # allow other coroutines to run

schedule(doIO())  # run the I/O loop as a coroutine

(This is painfully incomplete for a real framework, but it's a rough sketch of how one of peak.events' first drafts worked, circa early 2004.)

Basically, you just need a coroutine whose job is to resume coroutines whose scheduled time has arrived, or whose I/O is ready. And of course, some data structures to keep track of such things, and an API to update the data structures and suspend the coroutines. The I/O loop exits once there are no more running tasks and nothing waiting on I/O... which will also exit the runLoop. (A bit like a miniature version of NodeJS for Python.)

And, while you need to preferably have only one such I/O coroutine (to prevent busy-waiting), the I/O coroutine is completely replaceable. All that's required to implement one is that the core runloop expose the count of active coroutines. (Notice that, apart from checking the length of coroutines, the I/O loop shown above uses only the public schedule() API and the exposed thunk-suspension protocol to do its thing.)

Also, note that you can indeed have multiple I/O coroutines running at the same time, as long as you don't mind busy-waiting. In fact, you can refactor this to move the time-based scheduling inside the runloop, and expose the "time until next task" and "number of running non-I/O coroutines" to allow multiple I/O waiters to co-ordinate and avoid busy-waiting. (A later version of peak.events did this, though it really wasn't to allow multiple I/O waiters, so much as to simplify I/O waiters by providing a core time-scheduler, and to support simulated time for running tests.)

So, there's definitely no requirement for I/O to be part of a "core" runloop system. The overall approach is extremely open to extension, hardcodes next to nothing, and is super-easy to write new yieldables for, since they need only have a method (or function) that returns a suspend function.

At the time I first implemented this approach in '03/'04, I hadn't thought of using plain functions as suspend targets; I used objects with a shouldSupend() method. But in fairness, I was working with Python 2.2 and closures were still a pretty new feature back then. ;-)

Since then, though, I've seen this approach implemented elsewhere using closures in almost exactly this way. For example, the co library for Javascript implements almost exactly the above sketch's approach, in not much more code. It just uses the built-in Javascript event loop facilities, and supports yielding other things besides thunks. (Its thunks also don't return a value, and take a callback rather than a coroutine. But these are superficial differences.)

This approach is super-flexible in practice, as there are a ton of add-on libraries for co that implement their control flow using these thunks. You can indeed fully generalize control flow in such terms, without the need for futures or similar objects. For example, if you want to provide sugar for yielding to futures or other types of objects, you just write a thunk-returning function or method, e.g.:

def await_future(future):
    def suspend(coroutine):
        @future.add_done_callback
        def resume(future):
            err = future.exception()
            if err:
                schedule(coroutine, None, future.exception())
            else:
                schedule(coroutine, future.result())
        return True
    return suspend

So yield await_future(someFuture) will arrange for suspension until the future is ready. Libraries or frameworks can also be written that wrap a generator with one that provides automatic translation to thunks for a variety of types or protocols. Similarly, you can write functions that take multiple awaitables, or that provide cancellation, etc. on top of thunks.



More information about the Python-Dev mailing list