[Python-Dev] Trial balloon: microthreads library in stdlib (original) (raw)

Phillip J. Eby pje at telecommunity.com
Wed Feb 14 01:27:38 CET 2007


At 08:41 PM 2/13/2007 +0000, glyph at divmod.com wrote:

and the microthreading features being discussed here are a trivial hack somewhere in its mainloop machinery, not an entirely new subsystem that it should be implemented in terms of.

It doesn't even require hacking the mainloop; it can be done entirely in userspace. Here's some code I wrote a few months ago but never got around to using it. (Or testing it, for that matter, so it may not work, or even compile!)

It requires Python 2.5 and the "simplegeneric" package from the Cheeseshop, and it allows you to "spawn" generators to create independently-running pseudothreads. These pseudothreads can yield to Deferreds, "returning" the value or failure thereof. (Failures cause the error to be raises such that it appears to happen at the yield point.)

There are also a few other yield targets like "Return" (to return a value to a calling co-routine), "Pause" (to delay resumption), and "with_timeout" (to wrap some other yieldable in a timeout that raises TimeoutError). Enjoy!

import types from twisted.internet import reactor from twisted.internet.defer import Deferred, TimeoutError from twisted.python import failure from simplegeneric import generic from functools import partial

all = [ # user APIs: 'spawn', 'Pause', 'Return', 'with_timeout',

 # extension APIs:
 'schedule', 'resume', 'throw', 'current_task', 'yield_to', 'yieldable',

]

def spawn(geniter, delay=0): """Create a new task and schedule it for execution

 Usage::

     spawn(someGeneratorFunc(args))

 The given generator-iterator will be run by the Twisted reactor after
 `delay` seconds have passed (0 by default).  The return value of this
 function is a "task" object that can be passed to the ``schedule()``,
 ``throw()``, and ``yield_to()`` APIs.
 """
 task = [geniter]
 schedule(task, delay)
 return task

def schedule(task, delay=0, value=None): """Schedule task to resume after delay seconds (w/optional value)""" if task: return reactor.callLater(delay, resume, task, value) # XXX warn if non-None value sent to empty task?

def resume(task, value=None): """Resume task immediately, returning value for the current yield""" if task: _invoke(task, partial(task[-1].send, value)) # XXX warn if non-None value sent to empty task?

def throw(task, exc): """Raise exc tuple in task and immediately resume its execution""" if not task: # Propagate exception out of a failed task raise exc[0], exc[1], exc[2] _invoke(task, partial(task[-1].throw, *exc))

def _invoke(task, method): """Invoke method() in context of task, yielding to the return value""" try: value = method() except StopIteration: task.pop() # it's an exit with no return value resume(task) # so send None up the stack except: task.pop() # Propagate exception up the stack throw(task, sys.exc_info()) else: # Handle a yielded value yield_to(value, task)

@generic def yield_to(value, task): """Handle a yielded value

 To register special handlers, use ``@yield_to.when_type()`` or
 or ``@yield_to.when_object()``.  (See the ``simplegeneric`` docs for
 details.)
 """
 raise TypeError(
     "Unrecognized yield value (maybe missing Return()?): %r" % (value,)
 )

@yield_to.when_type(defer.Deferred): def _yield_to_deferred(value, task): """Return a deferred value immediately or pause until fired"""

 def _resume(value):
     if isinstance(value, failure.Failure):
         throw(task, (type(value), value, None))  # XXX extract error?
     else:
         resume(task, value)
     return value    # don't alter the deferred's value

 # This will either fire immediately, or delay until the appropriate time
 value.addCallbacks(_resume, _resume)

@yield_to.when_type(types.GeneratorType) def _yield_to_subgenerator(value, task): """Call a sub-generator, putting it on the task's call stack""" task.append(value) schedule(task)

def yieldable(f): """Declare that a function may be yielded to

 Usage::

     @yieldable
     def do_something(task):
         # code that will be invoked upon (yield do_something)

     (yield do_something)

 The function's return value will be ignored, and errors in it are NOT
 caught!  It must instead use the ``resume()``, ``schedule()``,
 ``throw()``, or ``yield_to()`` APIs to communicate with the yielding task
 (which is provided as the sole argument.)
 """
 f.__yieldable__ = True
 return f

@yield_to.when_type(types.FunctionType) def _yield_to_function(value, task): """Invoke task-management function""" if getattr(value, 'yieldable', None): return value(task) # function is marked as yieldable else: raise TypeError( "Function is not marked yieldable (maybe missing Return()?): %r" % (value,) )

@yieldable def current_task(task): """Yield this function (don't call it) to obtain the current task

 Usage: ``task = (yield current_task)``

 The yielding coroutine is immediately resumed; no task switching will
 occur.
 """
 resume(task, task)

class Pause(object): """Object that can be yielded to temporarily pause execution

 Usage::
     yield Pause     # allow other tasks to run, then resume ASAP
     yield Pause(5)  # don't run for at least five seconds
 """
 seconds = 0
 def __init__(self, seconds=0):
     self.seconds = seconds

 def __repr__(self):
     return 'Pause(%s)' % self.seconds

@yield_to.when_type(Pause) @yield_to.when_object(Pause) def _yield_to_pause(value, task): """Allow other tasks to run, by moving this task to the end of the queue""" schedule(task, value.seconds)

class Return(object): """Return a value to your caller

 Usage::

     yield Return(42)      # returns 42 to calling coroutine
     yield Return          # returns None to calling coroutine
 """

 value = None

 def __init__(self, value):
     self.value = value

 def __repr__(self):
     return 'Return(%s)' % self.value

@yield_to.when_type(Return) @yield_to.when_object(Return) def _yield_to_return(value, task): """Return a value to the caller""" task.pop().close() # ensure any finally clauses are run resume(task, value.value)

def with_timeout(seconds, value): """Raise TimeoutError if value doesn't resume before seconds elapse

 Example::

     result = yield with_timeout(30, something(whatever))

 This is equivalent to ``result = yield something(whatever)``, except that
 if the current task isn't resumed in 30 seconds or less, a
 ``defer.TimeoutError`` will be raised in the blocked task.  (Note: if the

 The `value` passed to this routine may be any yieldable value, although it
 makes no sense to yield a value that will just be returned to a parent
 coroutine.

 Note that the ``something(whatever)`` subtask may trap the 

TimeoutError and misinterpret its significance, so generally speaking you should use this only for relatively low-level operations, and expose a timeout parameter to your callers, to allow them to simply specify the timeout rather than using this wrapper.

 In fact, this is what most Twisted APIs do, providing a ``timeout``
 keyword argument that you should use instead of this wrapper, as they will
 then raise ``TimeoutError`` automatically after the specified time 

expires. So, if you're calling such an API, don't use this wrapper. """ # raise a timeout error in this task after seconds delayedCall = reactor.callLater( seconds, throw, (yield current_task), (TimeoutError, TimeoutError(), None) )

 try:
     yield Return((yield value))   # call the subtask and return its result
 finally:
     # Ensure timeout call doesn't fire after we've exited
     if delayedCall.active():
         delayedCall.cancel()


More information about the Python-Dev mailing list