[Python-Dev] Async subprocesses on Windows with tulip (original) (raw)
Richard Oudkerk shibturn at gmail.com
Sun May 19 17:59:52 CEST 2013
- Previous message: [Python-Dev] Ordering keyword dicts
- Next message: [Python-Dev] Async subprocesses on Windows with tulip
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Attached is a pretty trivial example of asynchronous interaction with a python subprocess using tulip on Windows. It does not use transports or protocols -- instead sock_recv() and sock_sendall() are used inside tasks.
I am not sure what the plan is for dealing with subprocesses currently. Shall I just add this to the examples folder for now?
-- Richard -------------- next part -------------- ''' Example of asynchronous interaction with a subprocess on Windows.
This requires use of overlapped pipe handles and (a modified) iocp proactor. '''
import itertools import logging import msvcrt import os import subprocess import sys import tempfile import _winapi
import tulip from tulip import _overlapped, windows_events, events
PIPE = subprocess.PIPE BUFSIZE = 8192 _mmap_counter=itertools.count()
def _pipe(duplex=True, overlapped=(True, True)): ''' Return handles for a pipe with one or both ends overlapped. ''' address = tempfile.mktemp(prefix=r'\.\pipe\python-pipe-%d-%d-' % (os.getpid(), next(_mmap_counter)))
if duplex:
openmode = _winapi.PIPE_ACCESS_DUPLEX
access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE
else:
openmode = _winapi.PIPE_ACCESS_INBOUND
access = _winapi.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE
openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
if overlapped[0]:
openmode |= _winapi.FILE_FLAG_OVERLAPPED
if overlapped[1]:
flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED
else:
flags_and_attribs = 0
h1 = h2 = None
try:
h1 = _winapi.CreateNamedPipe(
address, openmode, _winapi.PIPE_WAIT,
1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
h2 = _winapi.CreateFile(
address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING,
flags_and_attribs, _winapi.NULL)
ov = _winapi.ConnectNamedPipe(h1, overlapped=True)
ov.GetOverlappedResult(True)
return h1, h2
except:
if h1 is not None:
_winapi.CloseHandle(h1)
if h2 is not None:
_winapi.CloseHandle(h2)
raise
class PipeHandle: ''' Wrapper for a pipe handle ''' def init(self, handle): self._handle = handle
@property
def handle(self):
return self._handle
def fileno(self):
return self._handle
def close(self, *, CloseHandle=_winapi.CloseHandle):
if self._handle is not None:
CloseHandle(self._handle)
self._handle = None
__del__ = close
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
class Popen(subprocess.Popen): ''' Subclass of Popen which uses overlapped pipe handles wrapped with PipeHandle instead of normal file objects for stdin, stdout, stderr. ''' _WriteWrapper = PipeHandle _ReadWrapper = PipeHandle
def __init__(self, args, *, executable=None, stdin=None, stdout=None,
stderr=None, preexec_fn=None, close_fds=False,
shell=False, cwd=None, env=None, startupinfo=None,
creationflags=0, restore_signals=True,
start_new_session=False, pass_fds=()):
stdin_rfd = stdout_wfd = stderr_wfd = None
stdin_wh = stdout_rh = stderr_rh = None
if stdin == PIPE:
stdin_rh, stdin_wh = _pipe(False, (False, True))
stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
if stdout == PIPE:
stdout_rh, stdout_wh = _pipe(False, (True, False))
stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
if stderr == PIPE:
stderr_rh, stderr_wh = _pipe(False, (True, False))
stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
try:
super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
stderr=stderr_wfd, executable=executable,
preexec_fn=preexec_fn, close_fds=close_fds,
shell=shell, cwd=cwd, env=env,
startupinfo=startupinfo,
creationflags=creationflags,
restore_signals=restore_signals,
start_new_session=start_new_session,
pass_fds=pass_fds)
except:
for h in (stdin_wh, stdout_rh, stderr_rh):
_winapi.CloseHandle(h)
raise
else:
if stdin_wh is not None:
self.stdin = self._WriteWrapper(stdin_wh)
if stdout_rh is not None:
self.stdout = self._ReadWrapper(stdout_rh)
if stderr_rh is not None:
self.stderr = self._ReadWrapper(stderr_rh)
finally:
if stdin == PIPE:
os.close(stdin_rfd)
if stdout == PIPE:
os.close(stdout_wfd)
if stderr == PIPE:
os.close(stderr_wfd)
class ProactorEventLoop(windows_events.ProactorEventLoop): ''' Eventloop which uses ReadFile() and WriteFile() instead of WSARecv() and WSASend() for PipeHandle objects. ''' def sock_recv(self, conn, n): self._proactor._register_with_iocp(conn) ov = _overlapped.Overlapped(_winapi.NULL) handle = getattr(conn, 'handle', None) if handle is None: ov.WSARecv(conn.fileno(), n, 0) else: ov.ReadFile(conn.fileno(), n) return self._proactor._register(ov, conn, ov.getresult)
def sock_sendall(self, conn, data):
self._proactor._register_with_iocp(conn)
ov = _overlapped.Overlapped(_winapi.NULL)
handle = getattr(conn, 'handle', None)
if handle is None:
ov.WSASend(conn.fileno(), data, 0)
else:
ov.WriteFile(conn.fileno(), data)
return self._proactor._register(ov, conn, ov.getresult)
if name == 'main': @tulip.task def read_and_close(loop, f): with f: collected = [] while True: s = yield from loop.sock_recv(f, 4096) if s == b'': return b''.join(collected) collected.append(s)
@tulip.task
def write_and_close(loop, f, buf):
with f:
return (yield from loop.sock_sendall(f, buf))
@tulip.task
def main(loop):
# start process which upper cases its input
code = r'''if 1:
import os
os.write(2, b"starting\n")
while True:
s = os.read(0, 1024)
if not s:
break
s = s.upper()
while s:
n = os.write(1, s)
s = s[n:]
os.write(2, b"exiting\n")
'''
p = Popen([sys.executable, '-c', code],
stdin=PIPE, stdout=PIPE, stderr=PIPE)
# start tasks to write to and read from the process
bytes_written = write_and_close(loop, p.stdin, b"hello world\n"*100000)
stdout_data = read_and_close(loop, p.stdout)
stderr_data = read_and_close(loop, p.stderr)
# wait for tasks to finish and get the results
bytes_written = yield from bytes_written
stdout_data = yield from stdout_data
stderr_data = yield from stderr_data
# print results
print('bytes_written:', bytes_written)
print('stdout_data[:50]:', stdout_data[:50])
print('len(stdout_data):', len(stdout_data))
print('stderr_data:', stderr_data)
loop = ProactorEventLoop()
events.set_event_loop(loop)
try:
loop.run_until_complete(main(loop))
finally:
loop.close()
- Previous message: [Python-Dev] Ordering keyword dicts
- Next message: [Python-Dev] Async subprocesses on Windows with tulip
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]