cpython: f5aed0dba844 (original) (raw)
--- a/Doc/library/sched.rst +++ b/Doc/library/sched.rst @@ -27,6 +27,9 @@ scheduler: .. versionchanged:: 3.3 timefunc and delayfunc parameters are optional.
- .. versionchanged:: 3.3
:class:`scheduler` class can be safely used in multi-threaded[](#l1.8)
environments.[](#l1.9)
Example::
@@ -47,33 +50,6 @@ Example::
From print_time 930343700.273
930343700.276
-In multi-threaded environments, the :class:scheduler
class has limitations
-with respect to thread-safety, inability to insert a new task before
-the one currently pending in a running scheduler, and holding up the main
-thread until the event queue is empty. Instead, the preferred approach
-is to use the :class:threading.Timer
class instead.
-
-Example::
-
- ... print("From print_time", time.time())
- ...
- ... print(time.time())
- ... Timer(5, print_time, ()).start()
- ... Timer(10, print_time, ()).start()
- ... time.sleep(11) # sleep while time-delay events execute
- ... print(time.time())
- ...
- 930343690.257
- From print_time 930343695.274
- From print_time 930343700.273
- 930343701.301
- - .. _scheduler-objects: Scheduler Objects
--- a/Doc/whatsnew/3.3.rst
+++ b/Doc/whatsnew/3.3.rst
@@ -662,6 +662,10 @@ should be used. For example, this will
sched
-----
+* :class:~sched.scheduler
class can now be safely used in multi-threaded
- timefunc and delayfunct parameters of :class:
~sched.scheduler
class constructor are now optional and defaults to :func:time.time
and :func:time.sleep
respectively. (Contributed by Chris Clark in
--- a/Lib/sched.py +++ b/Lib/sched.py @@ -30,6 +30,7 @@ has another way to reference private dat import time import heapq +import threading from collections import namedtuple all = ["scheduler"] @@ -48,6 +49,7 @@ class scheduler: """Initialize a new instance, passing the time and delay functions""" self._queue = []
self._lock = threading.RLock()[](#l3.15) self.timefunc = timefunc[](#l3.16) self.delayfunc = delayfunc[](#l3.17)
@@ -58,9 +60,10 @@ class scheduler: if necessary. """
event = Event(time, priority, action, argument, kwargs)[](#l3.23)
heapq.heappush(self._queue, event)[](#l3.24)
return event # The ID[](#l3.25)
with self._lock:[](#l3.26)
event = Event(time, priority, action, argument, kwargs)[](#l3.27)
heapq.heappush(self._queue, event)[](#l3.28)
return event # The ID[](#l3.29)
def enter(self, delay, priority, action, argument=[], kwargs={}): """A variant that specifies the time as a relative time. @@ -68,8 +71,9 @@ class scheduler: This is actually the more commonly used interface. """
time = self.timefunc() + delay[](#l3.37)
return self.enterabs(time, priority, action, argument, kwargs)[](#l3.38)
with self._lock:[](#l3.39)
time = self.timefunc() + delay[](#l3.40)
return self.enterabs(time, priority, action, argument, kwargs)[](#l3.41)
def cancel(self, event): """Remove an event from the queue. @@ -78,12 +82,14 @@ class scheduler: If the event is not in the queue, this raises ValueError. """
self._queue.remove(event)[](#l3.49)
heapq.heapify(self._queue)[](#l3.50)
with self._lock:[](#l3.51)
self._queue.remove(event)[](#l3.52)
heapq.heapify(self._queue)[](#l3.53)
def empty(self): """Check whether the queue is empty."""
return not self._queue[](#l3.57)
with self._lock:[](#l3.58)
return not self._queue[](#l3.59)
def run(self): """Execute events until the queue is empty. @@ -108,24 +114,25 @@ class scheduler: """ # localize variable access to minimize overhead # and to improve thread safety
q = self._queue[](#l3.67)
delayfunc = self.delayfunc[](#l3.68)
timefunc = self.timefunc[](#l3.69)
pop = heapq.heappop[](#l3.70)
while q:[](#l3.71)
time, priority, action, argument, kwargs = checked_event = q[0][](#l3.72)
now = timefunc()[](#l3.73)
if now < time:[](#l3.74)
delayfunc(time - now)[](#l3.75)
else:[](#l3.76)
event = pop(q)[](#l3.77)
# Verify that the event was not removed or altered[](#l3.78)
# by another thread after we last looked at q[0].[](#l3.79)
if event is checked_event:[](#l3.80)
action(*argument, **kwargs)[](#l3.81)
delayfunc(0) # Let other threads run[](#l3.82)
with self._lock:[](#l3.83)
q = self._queue[](#l3.84)
delayfunc = self.delayfunc[](#l3.85)
timefunc = self.timefunc[](#l3.86)
pop = heapq.heappop[](#l3.87)
while q:[](#l3.88)
time, priority, action, argument, kwargs = checked_event = q[0][](#l3.89)
now = timefunc()[](#l3.90)
if now < time:[](#l3.91)
delayfunc(time - now)[](#l3.92) else:[](#l3.93)
heapq.heappush(q, event)[](#l3.94)
event = pop(q)[](#l3.95)
# Verify that the event was not removed or altered[](#l3.96)
# by another thread after we last looked at q[0].[](#l3.97)
if event is checked_event:[](#l3.98)
action(*argument, **kwargs)[](#l3.99)
delayfunc(0) # Let other threads run[](#l3.100)
else:[](#l3.101)
heapq.heappush(q, event)[](#l3.102)
@property def queue(self): @@ -138,5 +145,6 @@ class scheduler: # Use heapq to sort the queue rather than using 'sorted(self._queue)'. # With heapq, two events scheduled at the same time will show in # the actual order they would be retrieved.
events = self._queue[:][](#l3.110)
return map(heapq.heappop, [events]*len(events))[](#l3.111)
with self._lock:[](#l3.112)
events = self._queue[:][](#l3.113)
return map(heapq.heappop, [events]*len(events))[](#l3.114)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -409,6 +409,9 @@ Core and Builtins Library ------- +- Issue #8684 sched.scheduler class can be safely used in multi-threaded