Fix callbacks race in SelectorLoop.sock_connect. by 1st1 · Pull Request #366 · python/asyncio (original) (raw)

While testing uvloop on recent CPython 3.5.2 I found a regression in loop.sock_connect, introduced in ed17848.

The bug breaks loop.sock_* in a very serious way, making programs that use those methods prone to random hangs after socket is connected.

How to trigger

Let's imagine we have a server, that sends some data (let's say b'hello') to the client immediately after connect. And the client program is the following:

data = await self.recv_all(sock, 5) assert data == b'hello' await self.loop.sock_sendall(sock, PAYLOAD)

If the PAYLOAD is big enough, the client program will hang forever.

Explanation

The cause of the hang is a race between callbacks -- one related to loop.sock_connect and one to sock_sendall.

Here's the relevant piece of code from selector_events.py:

def sock_connect(self, sock, address):
    """Connect to a remote socket at address.

    This method is a coroutine.
    """
    if self._debug and sock.gettimeout() != 0:
        raise ValueError("the socket must be non-blocking")

    fut = self.create_future()
    if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
        self._sock_connect(fut, sock, address)
    else:
        resolved = base_events._ensure_resolved(
            address, family=sock.family, proto=sock.proto, loop=self)
        resolved.add_done_callback(
            lambda resolved: self._on_resolved(fut, sock, resolved))

    return fut

def _on_resolved(self, fut, sock, resolved):
    try:
        _, _, _, _, address = resolved.result()[0]
    except Exception as exc:
        fut.set_exception(exc)
    else:
        self._sock_connect(fut, sock, address)

def _sock_connect(self, fut, sock, address):
    fd = sock.fileno()
    try:
        sock.connect(address)
    except (BlockingIOError, InterruptedError):
        # Issue #23618: When the C function connect() fails with EINTR, the
        # connection runs in background. We have to wait until the socket
        # becomes writable to be notified when the connection succeed or
        # fails.
        fut.add_done_callback(functools.partial(self._sock_connect_done,
                                                fd))
        self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
    except Exception as exc:
        fut.set_exception(exc)
    else:
        fut.set_result(None)

def _sock_connect_done(self, fd, fut):
    self.remove_writer(fd)

def _sock_connect_cb(self, fut, sock, address):
    if fut.cancelled():
        return

    try:
        err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            # Jump to any except clause below.
            raise OSError(err, 'Connect call failed %s' % (address,))
    except (BlockingIOError, InterruptedError):
        # socket is still registered, the callback will be retried later
        pass
    except Exception as exc:
        fut.set_exception(exc)
    else:
        fut.set_result(None)

Before ed17848, sock_connect called _sock_connect directly:

  1. sock_connect created a fut Future.
  2. If the address wasn't already resolved it raised an error.
  3. If the address was resolved, it called _sock_connect, which attached a callback to the fut -- _sock_connect_done.
  4. sock_connect then returned fut to the caller.
  5. If the caller is a coroutine, it's wrapped in asyncio.Task. Therefore, fut now have two callbacks attached to it: [_sock_connect_done, Task._wakeup]

After that commit:

  1. sock_connect creates a fut Future.
  2. Then calls _ensure_resolved (linked fut to the result of that call's Future).
  3. sock_connect returns fut to the caller.
  4. If the caller is a coroutine, its Task will add a callback to the fut, eventually resulting in this: [Task._wakeup, _sock_connect_done]

Therefore, after ed17848, _sock_connect_done can be called after await loop.sock_connect() line. If the program calls loop.sock_sendall after sock_connect, _sock_connect_done will remove writer callback that sock_sendall set up.

/cc @gvanrossum @ajdavis @Haypo