cpython: fb8a093db8b1 (original) (raw)
--- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -78,20 +78,23 @@ class _OverlappedFuture(futures.Future): self._ov = None -class _WaitHandleFuture(futures.Future): +class _BaseWaitHandleFuture(futures.Future): """Subclass of Future which represents a wait handle."""
- def init(self, ov, handle, wait_handle, *, loop=None): super().init(loop=loop) if self._source_traceback: del self._source_traceback[-1]
# iocp and ov are only used by cancel() to notify IocpProactor[](#l1.16)
# that the wait was cancelled[](#l1.17)
self._iocp = iocp[](#l1.18)
# Keep a reference to the Overlapped object to keep it alive until the[](#l1.19)
# wait is unregistered[](#l1.20) self._ov = ov[](#l1.21) self._handle = handle[](#l1.22) self._wait_handle = wait_handle[](#l1.23)
# Should we call UnregisterWaitEx() if the wait completes[](#l1.25)
# or is cancelled?[](#l1.26)
self._registered = True[](#l1.27)
+ def _poll(self): # non-blocking wait: use a timeout of 0 millisecond return (_winapi.WaitForSingleObject(self._handle, 0) == @@ -99,21 +102,32 @@ class _WaitHandleFuture(futures.Future): def _repr_info(self): info = super()._repr_info()
info.insert(1, 'handle=%#x' % self._handle)[](#l1.36)
if self._wait_handle:[](#l1.37)
info.append('handle=%#x' % self._handle)[](#l1.38)
if self._handle is not None:[](#l1.39) state = 'signaled' if self._poll() else 'waiting'[](#l1.40)
info.insert(1, 'wait_handle=<%s, %#x>'[](#l1.41)
% (state, self._wait_handle))[](#l1.42)
info.append(state)[](#l1.43)
if self._wait_handle is not None:[](#l1.44)
info.append('wait_handle=%#x' % self._wait_handle)[](#l1.45) return info[](#l1.46)
- def _unregister_wait_cb(self, fut):
# The wait was unregistered: it's not safe to destroy the Overlapped[](#l1.49)
# object[](#l1.50)
self._ov = None[](#l1.51)
if self._wait_handle is None:[](#l1.54)
if not self._registered:[](#l1.55) return[](#l1.56)
self._registered = False[](#l1.57)
+ try: _overlapped.UnregisterWait(self._wait_handle) except OSError as exc:
# ERROR_IO_PENDING is not an error, the wait was unregistered[](#l1.62)
if exc.winerror != _overlapped.ERROR_IO_PENDING:[](#l1.63)
self._wait_handle = None[](#l1.64)
if exc.winerror == _overlapped.ERROR_IO_PENDING:[](#l1.65)
# ERROR_IO_PENDING is not an error, the wait was unregistered[](#l1.66)
self._unregister_wait_cb(None)[](#l1.67)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:[](#l1.68) context = {[](#l1.69) 'message': 'Failed to unregister the wait handle',[](#l1.70) 'exception': exc,[](#l1.71)
@@ -122,26 +136,91 @@ class _WaitHandleFuture(futures.Future): if self._source_traceback: context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context)
self._wait_handle = None[](#l1.76)
self._iocp = None[](#l1.77)
self._ov = None[](#l1.78)
else:[](#l1.79)
self._wait_handle = None[](#l1.80)
self._unregister_wait_cb(None)[](#l1.81)
result = super().cancel()[](#l1.84)
if self._ov is not None:[](#l1.85)
# signal the cancellation to the overlapped object[](#l1.86)
_overlapped.PostQueuedCompletionStatus(self._iocp, True,[](#l1.87)
0, self._ov.address)[](#l1.88) self._unregister_wait()[](#l1.89)
return result[](#l1.90)
return super().cancel()[](#l1.91)
def set_exception(self, exception):
self._unregister_wait()[](#l1.94) super().set_exception(exception)[](#l1.95)
self._unregister_wait()[](#l1.96)
self._unregister_wait()[](#l1.99) super().set_result(result)[](#l1.100)
self._unregister_wait()[](#l1.101)
+ + +class _WaitCancelFuture(_BaseWaitHandleFuture):
- """Subclass of Future which represents a wait for the cancellation of a
- _WaitHandleFuture using an event.
- """
- def init(self, ov, event, wait_handle, *, loop=None):
super().__init__(ov, event, wait_handle, loop=loop)[](#l1.110)
self._done_callback = None[](#l1.112)
- def _schedule_callbacks(self):
super(_WaitCancelFuture, self)._schedule_callbacks()[](#l1.115)
if self._done_callback is not None:[](#l1.116)
self._done_callback(self)[](#l1.117)
+ + +class _WaitHandleFuture(_BaseWaitHandleFuture):
- def init(self, ov, handle, wait_handle, proactor, *, loop=None):
super().__init__(ov, handle, wait_handle, loop=loop)[](#l1.122)
self._proactor = proactor[](#l1.123)
self._unregister_proactor = True[](#l1.124)
self._event = _overlapped.CreateEvent(None, True, False, None)[](#l1.125)
self._event_fut = None[](#l1.126)
- def _unregister_wait_cb(self, fut):
if self._event is not None:[](#l1.129)
_winapi.CloseHandle(self._event)[](#l1.130)
self._event = None[](#l1.131)
self._event_fut = None[](#l1.132)
# If the wait was cancelled, the wait may never be signalled, so[](#l1.134)
# it's required to unregister it. Otherwise, IocpProactor.close() will[](#l1.135)
# wait forever for an event which will never come.[](#l1.136)
#[](#l1.137)
# If the IocpProactor already received the event, it's safe to call[](#l1.138)
# _unregister() because we kept a reference to the Overlapped object[](#l1.139)
# which is used as an unique key.[](#l1.140)
self._proactor._unregister(self._ov)[](#l1.141)
self._proactor = None[](#l1.142)
super()._unregister_wait_cb(fut)[](#l1.144)
- def _unregister_wait(self):
if not self._registered:[](#l1.147)
return[](#l1.148)
self._registered = False[](#l1.149)
try:[](#l1.151)
_overlapped.UnregisterWaitEx(self._wait_handle, self._event)[](#l1.152)
except OSError as exc:[](#l1.153)
self._wait_handle = None[](#l1.154)
if exc.winerror == _overlapped.ERROR_IO_PENDING:[](#l1.155)
# ERROR_IO_PENDING is not an error, the wait was unregistered[](#l1.156)
self._unregister_wait_cb(None)[](#l1.157)
elif exc.winerror != _overlapped.ERROR_IO_PENDING:[](#l1.158)
context = {[](#l1.159)
'message': 'Failed to unregister the wait handle',[](#l1.160)
'exception': exc,[](#l1.161)
'future': self,[](#l1.162)
}[](#l1.163)
if self._source_traceback:[](#l1.164)
context['source_traceback'] = self._source_traceback[](#l1.165)
self._loop.call_exception_handler(context)[](#l1.166)
else:[](#l1.167)
self._wait_handle = None[](#l1.168)
self._event_fut = self._proactor._wait_cancel([](#l1.169)
self._event,[](#l1.170)
self._unregister_wait_cb)[](#l1.171)
class PipeServer(object): @@ -291,6 +370,7 @@ class IocpProactor: _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) self._cache = {} self._registered = weakref.WeakSet()
self._unregistered = [][](#l1.179) self._stopped_serving = weakref.WeakSet()[](#l1.180)
def repr(self): @@ -438,6 +518,16 @@ class IocpProactor: Return a Future object. The result of the future is True if the wait completed, or False if the wait did not complete (on timeout). """
return self._wait_for_handle(handle, timeout, False)[](#l1.187)
- def _wait_cancel(self, event, done_callback):
fut = self._wait_for_handle(event, None, True)[](#l1.190)
# add_done_callback() cannot be used because the wait may only complete[](#l1.191)
# in IocpProactor.close(), while the event loop is not running.[](#l1.192)
fut._done_callback = done_callback[](#l1.193)
return fut[](#l1.194)
- def _wait_for_handle(self, handle, timeout, _is_cancel): if timeout is None: ms = _winapi.INFINITE else:
@@ -447,9 +537,13 @@ class IocpProactor: # We only create ov so we can use ov.address as a key for the cache. ov = _overlapped.Overlapped(NULL)
wh = _overlapped.RegisterWaitWithQueue([](#l1.204)
wait_handle = _overlapped.RegisterWaitWithQueue([](#l1.205) handle, self._iocp, ov.address, ms)[](#l1.206)
f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)[](#l1.207)
if _is_cancel:[](#l1.208)
f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)[](#l1.209)
else:[](#l1.210)
f = _WaitHandleFuture(ov, handle, wait_handle, self,[](#l1.211)
loop=self._loop)[](#l1.212) if f._source_traceback:[](#l1.213) del f._source_traceback[-1][](#l1.214)
@@ -462,14 +556,6 @@ class IocpProactor: # False even though we have not timed out. return f._poll()
if f._poll():[](#l1.220)
try:[](#l1.221)
result = f._poll()[](#l1.222)
except OSError as exc:[](#l1.223)
f.set_exception(exc)[](#l1.224)
else:[](#l1.225)
f.set_result(result)[](#l1.226)
- self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) return f @@ -521,6 +607,15 @@ class IocpProactor: self._cache[ov.address] = (f, ov, obj, callback) return f
Call this method when its future has been cancelled. The event can[](#l1.238)
already be signalled (pending in the proactor event queue). It is also[](#l1.239)
safe if the event is never signalled (because it was cancelled).[](#l1.240)
"""[](#l1.241)
self._unregistered.append(ov)[](#l1.242)
+ def _get_accept_socket(self, family): s = socket.socket(family) s.settimeout(0) @@ -541,7 +636,7 @@ class IocpProactor: while True: status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) if status is None:
return[](#l1.251)
break[](#l1.252) ms = 0[](#l1.253)
err, transferred, key, address = status @@ -576,6 +671,11 @@ class IocpProactor: f.set_result(value) self._results.append(f)
# Remove unregisted futures[](#l1.260)
for ov in self._unregistered:[](#l1.261)
self._cache.pop(ov.address, None)[](#l1.262)
self._unregistered.clear()[](#l1.263)
+ def _stop_serving(self, obj): # obj is a socket or pipe handle. It will be closed in # BaseProactorEventLoop._stop_serving() which will make any
--- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -309,6 +309,29 @@ overlapped_UnregisterWait(PyObject *self Py_RETURN_NONE; } +PyDoc_STRVAR(
- UnregisterWaitEx_doc,
- "UnregisterWaitEx(WaitHandle, Event) -> None\n\n"
- "Unregister wait handle.\n");
+ +static PyObject * +overlapped_UnregisterWaitEx(PyObject *self, PyObject *args) +{