[Python-Dev] Async subprocesses on Windows with tulip (original) (raw)

Richard Oudkerk shibturn at gmail.com
Sun May 19 17:59:52 CEST 2013


Attached is a pretty trivial example of asynchronous interaction with a python subprocess using tulip on Windows. It does not use transports or protocols -- instead sock_recv() and sock_sendall() are used inside tasks.

I am not sure what the plan is for dealing with subprocesses currently. Shall I just add this to the examples folder for now?

-- Richard -------------- next part -------------- ''' Example of asynchronous interaction with a subprocess on Windows.

This requires use of overlapped pipe handles and (a modified) iocp proactor. '''

import itertools import logging import msvcrt import os import subprocess import sys import tempfile import _winapi

import tulip from tulip import _overlapped, windows_events, events

PIPE = subprocess.PIPE BUFSIZE = 8192 _mmap_counter=itertools.count()

def _pipe(duplex=True, overlapped=(True, True)): ''' Return handles for a pipe with one or both ends overlapped. ''' address = tempfile.mktemp(prefix=r'\.\pipe\python-pipe-%d-%d-' % (os.getpid(), next(_mmap_counter)))

if duplex:
    openmode = _winapi.PIPE_ACCESS_DUPLEX
    access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
    obsize, ibsize = BUFSIZE, BUFSIZE
else:
    openmode = _winapi.PIPE_ACCESS_INBOUND
    access = _winapi.GENERIC_WRITE
    obsize, ibsize = 0, BUFSIZE

openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE

if overlapped[0]:
    openmode |= _winapi.FILE_FLAG_OVERLAPPED

if overlapped[1]:
    flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED
else:
    flags_and_attribs = 0

h1 = h2 = None
try:
    h1 = _winapi.CreateNamedPipe(
        address, openmode, _winapi.PIPE_WAIT,
        1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)

    h2 = _winapi.CreateFile(
        address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
        flags_and_attribs, _winapi.NULL)

    ov = _winapi.ConnectNamedPipe(h1, overlapped=True)
    ov.GetOverlappedResult(True)
    return h1, h2
except:
    if h1 is not None:
        _winapi.CloseHandle(h1)
    if h2 is not None:
        _winapi.CloseHandle(h2)
    raise

class PipeHandle: ''' Wrapper for a pipe handle ''' def init(self, handle): self._handle = handle

@property
def handle(self):
    return self._handle

def fileno(self):
    return self._handle

def close(self, *, CloseHandle=_winapi.CloseHandle):
    if self._handle is not None:
        CloseHandle(self._handle)
        self._handle = None

__del__ = close

def __enter__(self):
    return self

def __exit__(self, t, v, tb):
    self.close()

class Popen(subprocess.Popen): ''' Subclass of Popen which uses overlapped pipe handles wrapped with PipeHandle instead of normal file objects for stdin, stdout, stderr. ''' _WriteWrapper = PipeHandle _ReadWrapper = PipeHandle

def __init__(self, args, *, executable=None, stdin=None, stdout=None,
             stderr=None, preexec_fn=None, close_fds=False,
             shell=False, cwd=None, env=None, startupinfo=None,
             creationflags=0, restore_signals=True,
             start_new_session=False, pass_fds=()):
    stdin_rfd = stdout_wfd = stderr_wfd = None
    stdin_wh = stdout_rh = stderr_rh = None
    if stdin == PIPE:
        stdin_rh, stdin_wh = _pipe(False, (False, True))
        stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
    if stdout == PIPE:
        stdout_rh, stdout_wh = _pipe(False, (True, False))
        stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
    if stderr == PIPE:
        stderr_rh, stderr_wh = _pipe(False, (True, False))
        stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
    try:
        super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                         stderr=stderr_wfd, executable=executable,
                         preexec_fn=preexec_fn, close_fds=close_fds,
                         shell=shell, cwd=cwd, env=env,
                         startupinfo=startupinfo,
                         creationflags=creationflags,
                         restore_signals=restore_signals,
                         start_new_session=start_new_session,
                         pass_fds=pass_fds)
    except:
        for h in (stdin_wh, stdout_rh, stderr_rh):
            _winapi.CloseHandle(h)
        raise
    else:
        if stdin_wh is not None:
            self.stdin = self._WriteWrapper(stdin_wh)
        if stdout_rh is not None:
            self.stdout = self._ReadWrapper(stdout_rh)
        if stderr_rh is not None:
            self.stderr = self._ReadWrapper(stderr_rh)
    finally:
        if stdin == PIPE:
            os.close(stdin_rfd)
        if stdout == PIPE:
            os.close(stdout_wfd)
        if stderr == PIPE:
            os.close(stderr_wfd)

class ProactorEventLoop(windows_events.ProactorEventLoop): ''' Eventloop which uses ReadFile() and WriteFile() instead of WSARecv() and WSASend() for PipeHandle objects. ''' def sock_recv(self, conn, n): self._proactor._register_with_iocp(conn) ov = _overlapped.Overlapped(_winapi.NULL) handle = getattr(conn, 'handle', None) if handle is None: ov.WSARecv(conn.fileno(), n, 0) else: ov.ReadFile(conn.fileno(), n) return self._proactor._register(ov, conn, ov.getresult)

def sock_sendall(self, conn, data):
    self._proactor._register_with_iocp(conn)
    ov = _overlapped.Overlapped(_winapi.NULL)
    handle = getattr(conn, 'handle', None)
    if handle is None:
        ov.WSASend(conn.fileno(), data, 0)
    else:
        ov.WriteFile(conn.fileno(), data)
    return self._proactor._register(ov, conn, ov.getresult)

if name == 'main': @tulip.task def read_and_close(loop, f): with f: collected = [] while True: s = yield from loop.sock_recv(f, 4096) if s == b'': return b''.join(collected) collected.append(s)

@tulip.task
def write_and_close(loop, f, buf):
    with f:
        return (yield from loop.sock_sendall(f, buf))

@tulip.task
def main(loop):
    # start process which upper cases its input
    code = r'''if 1:
                   import os
                   os.write(2, b"starting\n")
                   while True:
                       s = os.read(0, 1024)
                       if not s:
                           break
                       s = s.upper()
                       while s:
                           n = os.write(1, s)
                           s = s[n:]
                   os.write(2, b"exiting\n")
                   '''
    p = Popen([sys.executable, '-c', code],
              stdin=PIPE, stdout=PIPE, stderr=PIPE)

    # start tasks to write to and read from the process
    bytes_written = write_and_close(loop, p.stdin, b"hello world\n"*100000)
    stdout_data = read_and_close(loop, p.stdout)
    stderr_data = read_and_close(loop, p.stderr)

    # wait for tasks to finish and get the results
    bytes_written = yield from bytes_written
    stdout_data = yield from stdout_data
    stderr_data = yield from stderr_data

    # print results
    print('bytes_written:', bytes_written)
    print('stdout_data[:50]:', stdout_data[:50])
    print('len(stdout_data):', len(stdout_data))
    print('stderr_data:', stderr_data)


loop = ProactorEventLoop()
events.set_event_loop(loop)
try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()


More information about the Python-Dev mailing list