[Python-Dev] Minimal async event loop and async utilities (Was: PEP 492: async/await in Python; version 4) (original) (raw)
PJ Eby pje at telecommunity.com
Fri May 15 19:03:41 CEST 2015
- Previous message (by thread): [Python-Dev] Minimal async event loop and async utilities (Was: PEP 492: async/await in Python; version 4)
- Next message (by thread): [Python-Dev] Ancient use of generators
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
On Mon, May 11, 2015 at 6:05 PM, Guido van Rossum <guido at python.org> wrote:
OTOH you may look at micropython's uasyncio -- IIRC it doesn't have Futures and it definitely has I/O waiting.
Here's a sketch of an extremely minimal main loop that can do I/O without Futures, and might be suitable as a PEP example. (Certainly, it would be hard to write a simpler example than this, since it doesn't even use any classes or require any specially named methods, works with present-day generators, and is (I think) both 2.x/3.x compatible.)
coroutines = [] # round-robin of currently "running" coroutines
def schedule(coroutine, val=None, err=None):
coroutines.insert(0, (coroutine, val, err))
def runLoop():
while coroutines:
(coroutine, val, err) = coroutines.pop()
try:
if err is not None:
suspend = coroutine.throw(err)
else
suspend = coroutine.send(val)
except StopIteration:
# coroutine is finished, so don't reschedule it
continue
except Exception:
# framework-specific detail (i.e., log it, send
# to an error handling coroutine, or just stop the program
# Here, we just ignore it and stop the coroutine
continue
else:
if hasattr(suspend, '__call__') and suspend(coroutine):
continue
else:
# put it back on the round-robin list
schedule(coroutine)
To use it, schedule()
one or more coroutines, then call runLoop()
,
which will run as long as there are things to do. Each coroutine
scheduled must yield thunks: callable objects that take a coroutine
as a parameter, and return True if the coroutine should be suspended,
or False if it should continue to run. If the thunk returns true,
that means the thunk has taken responsibility for arranging to
schedule()
the coroutine with a value or error when it's time to
send it the result of the suspension.
You might be asking, "wait, but where's the I/O?" Why, in a coroutine, of course...
readers = {}
writers = {}
timers = []
def readable(fileno):
"""yield readable(fileno) resumes when fileno is readable"""
def suspend(coroutine):
readers[fileno] = coroutine
return True
return suspend
def writable(fileno):
"""yield writable(fileno) resumes when fileno is writable"""
def suspend(coroutine):
writers[fileno] = coroutine
return True
return suspend
def sleepFor(seconds):
"""yield sleepFor(seconds) resumes after that much time"""
return suspendUntil(time.time() + seconds)
def suspendUntil(timestamp):
"""yield suspendUntil(timestamp) resumes when that time is reached"""
def suspend(coroutine)
heappush(timers, (timestamp, coroutine)
return suspend
def doIO(): while coroutines or readers or writers or timers:
# Resume scheduled tasks
while timers and timers[0][0] <= time.time():
ts, coroutine = heappop(timers)
schedule(coroutine)
if readers or writers:
if coroutines:
# Other tasks are running; use minimal timeout
timeout = 0.001
else if timers:
timeout = max(timers[0][0] - time.time(), 0.001)
else:
timeout = 0 # take as long as necessary
r, w, e = select(readers, writers, [], timeout)
for rr in r: schedule(readers.pop(rr))
for ww in w: schedule(writers.pop(ww))
yield # allow other coroutines to run
schedule(doIO()) # run the I/O loop as a coroutine
(This is painfully incomplete for a real framework, but it's a rough sketch of how one of peak.events' first drafts worked, circa early 2004.)
Basically, you just need a coroutine whose job is to resume coroutines whose scheduled time has arrived, or whose I/O is ready. And of course, some data structures to keep track of such things, and an API to update the data structures and suspend the coroutines. The I/O loop exits once there are no more running tasks and nothing waiting on I/O... which will also exit the runLoop. (A bit like a miniature version of NodeJS for Python.)
And, while you need to preferably have only one such I/O coroutine
(to prevent busy-waiting), the I/O coroutine is completely
replaceable. All that's required to implement one is that the core
runloop expose the count of active coroutines. (Notice that, apart
from checking the length of coroutines
, the I/O loop shown above
uses only the public schedule()
API and the exposed thunk-suspension
protocol to do its thing.)
Also, note that you can indeed have multiple I/O coroutines running at the same time, as long as you don't mind busy-waiting. In fact, you can refactor this to move the time-based scheduling inside the runloop, and expose the "time until next task" and "number of running non-I/O coroutines" to allow multiple I/O waiters to co-ordinate and avoid busy-waiting. (A later version of peak.events did this, though it really wasn't to allow multiple I/O waiters, so much as to simplify I/O waiters by providing a core time-scheduler, and to support simulated time for running tests.)
So, there's definitely no requirement for I/O to be part of a "core" runloop system. The overall approach is extremely open to extension, hardcodes next to nothing, and is super-easy to write new yieldables for, since they need only have a method (or function) that returns a suspend function.
At the time I first implemented this approach in '03/'04, I hadn't
thought of using plain functions as suspend targets; I used objects
with a shouldSupend()
method. But in fairness, I was working with
Python 2.2 and closures were still a pretty new feature back then.
;-)
Since then, though, I've seen this approach implemented elsewhere
using closures in almost exactly this way. For example, the co
library for Javascript implements almost exactly the above sketch's
approach, in not much more code. It just uses the built-in Javascript
event loop facilities, and supports yielding other things besides
thunks. (Its thunks also don't return a value, and take a callback
rather than a coroutine. But these are superficial differences.)
This approach is super-flexible in practice, as there are a ton of
add-on libraries for co
that implement their control flow using
these thunks. You can indeed fully generalize control flow in such
terms, without the need for futures or similar objects. For example,
if you want to provide sugar for yielding to futures or other types of
objects, you just write a thunk-returning function or method, e.g.:
def await_future(future):
def suspend(coroutine):
@future.add_done_callback
def resume(future):
err = future.exception()
if err:
schedule(coroutine, None, future.exception())
else:
schedule(coroutine, future.result())
return True
return suspend
So yield await_future(someFuture)
will arrange for suspension until
the future is ready. Libraries or frameworks can also be written that
wrap a generator with one that provides automatic translation to
thunks for a variety of types or protocols. Similarly, you can write
functions that take multiple awaitables, or that provide cancellation,
etc. on top of thunks.
- Previous message (by thread): [Python-Dev] Minimal async event loop and async utilities (Was: PEP 492: async/await in Python; version 4)
- Next message (by thread): [Python-Dev] Ancient use of generators
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]