cpython: 7aa2d3e1c885 (original) (raw)
Mercurial > cpython
changeset 97277:7aa2d3e1c885 3.4
Issue #23812: Fix asyncio.Queue.get() to avoid loosing items on cancellation. Patch by Gustavo J. A. M. Carneiro. [#23812]
Yury Selivanov yselivanov@sprymix.com | |
---|---|
date | Wed, 05 Aug 2015 13:52:33 -0400 |
parents | 94e215a5e24b |
children | d5644d7e222d ee8e9edb8aae |
files | Lib/asyncio/queues.py Lib/test/test_asyncio/test_queues.py Misc/NEWS |
diffstat | 3 files changed, 101 insertions(+), 10 deletions(-)[+] [-] Lib/asyncio/queues.py 47 Lib/test/test_asyncio/test_queues.py 61 Misc/NEWS 3 |
line wrap: on
line diff
--- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -47,7 +47,7 @@ class Queue: # Futures. self._getters = collections.deque()
# Pairs of (item, Future).[](#l1.7)
# Futures[](#l1.8) self._putters = collections.deque()[](#l1.9) self._unfinished_tasks = 0[](#l1.10) self._finished = locks.Event(loop=self._loop)[](#l1.11)
@@ -98,7 +98,7 @@ class Queue: def _consume_done_putters(self): # Delete waiters at the head of the put() queue who've timed out.
while self._putters and self._putters[0][1].done():[](#l1.16)
while self._putters and self._putters[0].done():[](#l1.17) self._putters.popleft()[](#l1.18)
def qsize(self): @@ -148,8 +148,9 @@ class Queue: elif self._maxsize > 0 and self._maxsize <= self.qsize(): waiter = futures.Future(loop=self._loop)
self._putters.append((item, waiter))[](#l1.25)
self._putters.append(waiter)[](#l1.26) yield from waiter[](#l1.27)
self._put(item)[](#l1.28)
else: self.__put_internal(item) @@ -186,8 +187,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?'
item, putter = self._putters.popleft()[](#l1.36)
self.__put_internal(item)[](#l1.37)
putter = self._putters.popleft()[](#l1.38)
# When a getter runs and frees up a slot so this putter can # run, we need to defer the put for a tick to ensure that @@ -201,9 +201,39 @@ class Queue: return self._get() else: waiter = futures.Future(loop=self._loop)
self._getters.append(waiter)[](#l1.46)
try:[](#l1.47)
return (yield from waiter)[](#l1.48)
except futures.CancelledError:[](#l1.49)
# if we get CancelledError, it means someone cancelled this[](#l1.50)
# get() coroutine. But there is a chance that the waiter[](#l1.51)
# already is ready and contains an item that has just been[](#l1.52)
# removed from the queue. In this case, we need to put the item[](#l1.53)
# back into the front of the queue. This get() must either[](#l1.54)
# succeed without fault or, if it gets cancelled, it must be as[](#l1.55)
# if it never happened.[](#l1.56)
if waiter.done():[](#l1.57)
self._put_it_back(waiter.result())[](#l1.58)
raise[](#l1.59)
self._getters.append(waiter)[](#l1.61)
return (yield from waiter)[](#l1.62)
- def _put_it_back(self, item):
"""[](#l1.64)
This is called when we have a waiter to get() an item and this waiter[](#l1.65)
gets cancelled. In this case, we put the item back: wake up another[](#l1.66)
waiter or put it in the _queue.[](#l1.67)
"""[](#l1.68)
self._consume_done_getters()[](#l1.69)
if self._getters:[](#l1.70)
assert not self._queue, ([](#l1.71)
'queue non-empty, why are getters waiting?')[](#l1.72)
getter = self._getters.popleft()[](#l1.74)
self._put_internal(item)[](#l1.75)
# getter cannot be cancelled, we just removed done getters[](#l1.77)
getter.set_result(item)[](#l1.78)
else:[](#l1.79)
self._queue.appendleft(item)[](#l1.80)
def get_nowait(self): """Remove and return an item from the queue. @@ -213,8 +243,7 @@ class Queue: self._consume_done_putters() if self._putters: assert self.full(), 'queue not full, why are putters waiting?'
item, putter = self._putters.popleft()[](#l1.88)
self.__put_internal(item)[](#l1.89)
putter = self._putters.popleft()[](#l1.90) # Wake putter on next tick.[](#l1.91)
# getter cannot be cancelled, we just removed done putters
--- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -171,7 +171,7 @@ class QueueGetTests(_QueueTestBase): q.put_nowait(1) waiter = asyncio.Future(loop=self.loop)
q._putters.append((2, waiter))[](#l2.7)
q._putters.append(waiter)[](#l2.8)
res = self.loop.run_until_complete(q.get()) self.assertEqual(1, res) @@ -322,6 +322,64 @@ class QueuePutTests(_QueueTestBase): q.put_nowait(1) self.assertEqual(1, q.get_nowait())
loop = self.new_test_loop(gen)[](#l2.21)
q = asyncio.Queue(loop=loop)[](#l2.23)
reader = loop.create_task(q.get())[](#l2.25)
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))[](#l2.27)
q.put_nowait(1)[](#l2.29)
q.put_nowait(2)[](#l2.30)
reader.cancel()[](#l2.31)
try:[](#l2.33)
loop.run_until_complete(reader)[](#l2.34)
except asyncio.CancelledError:[](#l2.35)
# try again[](#l2.36)
reader = loop.create_task(q.get())[](#l2.37)
loop.run_until_complete(reader)[](#l2.38)
result = reader.result()[](#l2.40)
# if we get 2, it means 1 got dropped
self.assertEqual(1, result)[](#l2.42)
def gen():[](#l2.46)
yield 0.01[](#l2.47)
yield 0.1[](#l2.48)
loop = self.new_test_loop(gen)[](#l2.50)
q = asyncio.Queue(1, loop=loop)[](#l2.51)
q.put_nowait(1)[](#l2.53)
# putting a second item in the queue has to block (qsize=1)[](#l2.55)
writer = loop.create_task(q.put(2))[](#l2.56)
loop.run_until_complete(asyncio.sleep(0.01, loop=loop))[](#l2.57)
value1 = q.get_nowait()[](#l2.59)
self.assertEqual(value1, 1)[](#l2.60)
writer.cancel()[](#l2.62)
try:[](#l2.63)
loop.run_until_complete(writer)[](#l2.64)
except asyncio.CancelledError:[](#l2.65)
# try again[](#l2.66)
writer = loop.create_task(q.put(2))[](#l2.67)
loop.run_until_complete(writer)[](#l2.68)
value2 = q.get_nowait()[](#l2.70)
self.assertEqual(value2, 2)[](#l2.71)
self.assertEqual(q.qsize(), 0)[](#l2.72)
+ def test_nonblocking_put_exception(self): q = asyncio.Queue(maxsize=1, loop=self.loop) q.put_nowait(1) @@ -374,6 +432,7 @@ class QueuePutTests(_QueueTestBase): test_utils.run_briefly(self.loop) self.assertTrue(put_c.done()) self.assertEqual(q.get_nowait(), 'a')
test_utils.run_briefly(self.loop)[](#l2.81) self.assertEqual(q.get_nowait(), 'b')[](#l2.82)
self.loop.run_until_complete(put_b)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -63,6 +63,9 @@ Core and Builtins