(original) (raw)
Index: Doc/lib/libasynchat.tex =================================================================== --- Doc/lib/libasynchat.tex (revision 53734) +++ Doc/lib/libasynchat.tex (working copy) @@ -57,7 +57,9 @@ \begin{methoddesc}{collect_incoming_data}{data} Called with \var{data} holding an arbitrary amount of received data. - The default method, which must be overridden, raises a \exception{NotImplementedError} exception. + The default method, which must be overridden, raises a + \exception{NotImplementedError} exception. There is a sample implementation + as \method{_collect_incoming_data}. \end{methoddesc} \begin{methoddesc}{discard_buffers}{} @@ -120,14 +122,6 @@ channels tested by the \cfunction{select()} loop for readability. \end{methoddesc} -\begin{methoddesc}{refill_buffer}{} - Refills the output buffer by calling the \method{more()} method of the - producer at the head of the fifo. If it is exhausted then the - producer is popped off the fifo and the next producer is activated. - If the current producer is, or becomes, \code{None} then the channel - is closed. -\end{methoddesc} - \begin{methoddesc}{set_terminator}{term} Sets the terminating condition to be recognised on the channel. \code{term} may be any of three types of value, corresponding to three different ways @@ -150,44 +144,16 @@ or the channel is connected and the channel's output buffer is non-empty. \end{methoddesc} -\subsection{asynchat - Auxiliary Classes and Functions} - -\begin{classdesc}{simple_producer}{data\optional{, buffer_size=512}} - A \class{simple_producer} takes a chunk of data and an optional buffer size. - Repeated calls to its \method{more()} method yield successive chunks of the - data no larger than \var{buffer_size}. -\end{classdesc} - -\begin{methoddesc}{more}{} - Produces the next chunk of information from the producer, or returns the empty string. +\begin{methoddesc}{_collect_incoming_data}{data} + Sample implementation of a data collection rutine to be used in conjunction + with \method{_get_data} in a user-specified \method{found_terminator}. \end{methoddesc} -\begin{classdesc}{fifo}{\optional{list=None}} - Each channel maintains a \class{fifo} holding data which has been pushed by the - application but not yet popped for writing to the channel. - A \class{fifo} is a list used to hold data and/or producers until they are required. - If the \var{list} argument is provided then it should contain producers or - data items to be written to the channel. -\end{classdesc} - -\begin{methoddesc}{is_empty}{} - Returns \code{True} iff the fifo is empty. +\begin{methoddesc}{_get_data}{} + Will return and clear the data received with the sample \method{_collect_incoming_data} + implementation. \end{methoddesc} -\begin{methoddesc}{first}{} - Returns the least-recently \method{push()}ed item from the fifo. -\end{methoddesc} - -\begin{methoddesc}{push}{data} - Adds the given data (which may be a string or a producer object) to the - producer fifo. -\end{methoddesc} - -\begin{methoddesc}{pop}{} - If the fifo is not empty, returns \code{True, first()}, deleting the popped - item. Returns \code{False, None} for an empty fifo. -\end{methoddesc} - The \module{asynchat} module also defines one utility function, which may be of use in network and textual analysis operations. @@ -220,23 +186,17 @@ asynchat.async_chat.__init__(self, conn=conn) self.addr = addr self.sessions = sessions - self.ibuffer = [] - self.obuffer = "" self.set_terminator("\r\n\r\n") self.reading_headers = True self.handling = False self.cgi_data = None self.log = log + self.collect_incoming_data = self._collect_incoming_data - def collect_incoming_data(self, data): - """Buffer the data""" - self.ibuffer.append(data) - def found_terminator(self): if self.reading_headers: self.reading_headers = False - self.parse_headers("".join(self.ibuffer)) - self.ibuffer = [] + self.parse_headers(self._get_data()) if self.op.upper() == "POST": clen = self.headers.getheader("content-length") self.set_terminator(int(clen)) @@ -246,9 +206,8 @@ self.handle_request() elif not self.handling: self.set_terminator(None) # browsers sometimes over-send - self.cgi_data = parse(self.headers, "".join(self.ibuffer)) + self.cgi_data = parse(self.headers, self._get_data()) self.handling = True - self.ibuffer = [] self.handle_request() \end{verbatim} Index: Doc/lib/libasyncore.tex =================================================================== --- Doc/lib/libasyncore.tex (revision 53734) +++ Doc/lib/libasyncore.tex (working copy) @@ -221,7 +221,24 @@ when they are garbage-collected. \end{methoddesc} +\subsection{asyncore - Auxiliary Classes and Functions} +\begin{classdesc}{file_dispatcher}{fd\optional{, map=None}} + A \class{file_dispatcher} takes a file descriptor or file object along with + optional map argument and wraps it for use with the \function{poll()} or + \function{loop()} functions. If provided a file object or anything with a .fileno() + method, that method will be called and passed to the \class{file_wrapper} constructor. + Availability: \UNIX +\end{classdesc} + +\begin{classdesc}{file_wrapper}{fd} + A \class{file_wrapper} takes an integer file descriptor and calls \refmodule{os}.\function{dup()} + to duplicate the handle so that the original handle and the one provided may be closed + independently. Implements sufficient methods to emulate a socket for use by the + \class{file_dispatcher} class. + Availability: \UNIX +\end{classdesc} + \subsection{asyncore Example basic HTTP client \label{asyncore-example}} Here is a very basic HTTP client that uses the \class{dispatcher} Index: Lib/asynchat.py =================================================================== --- Lib/asynchat.py (revision 53734) +++ Lib/asynchat.py (working copy) @@ -60,16 +60,35 @@ ac_out_buffer_size = 4096 def __init__ (self, conn=None): + # for string terminator matching self.ac_in_buffer = '' - self.ac_out_buffer = '' - self.producer_fifo = fifo() + + # we use a list here rather than cStringIO for a few reasons... + # del lst[:] is faster than sio.truncate(0) + # lst = [] is faster than sio.truncate(0) + # cStringIO will be gaining unicode support in py3k, which + # will negatively affect the performance of bytes compared to + # a ''.join() equivalent + self.incoming = [] + + # we toss the use of the "simple producer" and replace it with + # a pure deque, which the original fifo was a wrapping of + self.producer_fifo = deque() asyncore.dispatcher.__init__ (self, conn) def collect_incoming_data(self, data): - raise NotImplementedError, "must be implemented in subclass" + raise NotImplementedError("must be implemented in subclass") + + def _collect_incoming_data(self, data): + self.incoming.append(data) + + def _get_data(self): + d = ''.join(self.incoming) + del self.incoming[:] + return d def found_terminator(self): - raise NotImplementedError, "must be implemented in subclass" + raise NotImplementedError("must be implemented in subclass") def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, an integer, or None" @@ -96,7 +115,7 @@ # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator - # combos with a single recv(1024). + # combos with a single recv(4096). while self.ac_in_buffer: lb = len(self.ac_in_buffer) @@ -150,129 +169,79 @@ self.ac_in_buffer = '' def handle_write (self): - self.initiate_send () + self.initiate_send() def handle_close (self): self.close() def push (self, data): - self.producer_fifo.push (simple_producer (data)) + sabs = self.ac_out_buffer_size + if len(data) > sabs: + for i in xrange(0, len(data), sabs): + self.producer_fifo.append(data[i:i+sabs]) + else: + self.producer_fifo.append(data) self.initiate_send() - + def push_with_producer (self, producer): - self.producer_fifo.push (producer) + self.producer_fifo.append(producer) self.initiate_send() - + def readable (self): "predicate for inclusion in the readable for select()" - return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + # cannot use the old predicate, it violates the claim of the + # set_terminator method. + + # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + return 1 def writable (self): "predicate for inclusion in the writable for select()" - # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) - # this is about twice as fast, though not as clear. - return not ( - (self.ac_out_buffer == '') and - self.producer_fifo.is_empty() and - self.connected - ) + return self.producer_fifo or (not self.connected) def close_when_done (self): "automatically close this channel once the outgoing queue is empty" - self.producer_fifo.push (None) + self.producer_fifo.append(None) - # refill the outgoing buffer by calling the more() method - # of the first producer in the queue - def refill_buffer (self): - while 1: - if len(self.producer_fifo): - p = self.producer_fifo.first() - # a 'None' in the producer fifo is a sentinel, - # telling us to close the channel. - if p is None: - if not self.ac_out_buffer: - self.producer_fifo.pop() - self.close() - return - elif isinstance(p, str): - self.producer_fifo.pop() - self.ac_out_buffer = self.ac_out_buffer + p - return - data = p.more() - if data: - self.ac_out_buffer = self.ac_out_buffer + data - return - else: - self.producer_fifo.pop() - else: - return - def initiate_send (self): - obs = self.ac_out_buffer_size - # try to refill the buffer - if (len (self.ac_out_buffer) < obs): - self.refill_buffer() - - if self.ac_out_buffer and self.connected: - # try to send the buffer + while self.producer_fifo and self.connected: + first = self.producer_fifo[0] + # handle empty string/buffer or None entry + if not first: + del self.producer_fifo[0] + if first is None: + self.handle_close() + return + + # handle classic producer behavior try: - num_sent = self.send (self.ac_out_buffer[:obs]) - if num_sent: - self.ac_out_buffer = self.ac_out_buffer[num_sent:] - + buffer(first) + except TypeError: + self.producer_fifo.appendleft(first.more()) + continue + + # send the data + try: + num_sent = self.send(first) except socket.error, why: self.handle_error() return + + if num_sent: + if num_sent < len(first): + self.producer_fifo[0] = first[num_sent:] + else: + del self.producer_fifo[0] + + # we tried to send some actual data + return def discard_buffers (self): # Emergencies only! self.ac_in_buffer = '' - self.ac_out_buffer = '' - while self.producer_fifo: - self.producer_fifo.pop() + del self.incoming[:] + self.producer_fifo.clear() - -class simple_producer: - - def __init__ (self, data, buffer_size=512): - self.data = data - self.buffer_size = buffer_size - - def more (self): - if len (self.data) > self.buffer_size: - result = self.data[:self.buffer_size] - self.data = self.data[self.buffer_size:] - return result - else: - result = self.data - self.data = '' - return result - -class fifo: - def __init__ (self, list=None): - if not list: - self.list = deque() - else: - self.list = deque(list) - - def __len__ (self): - return len(self.list) - - def is_empty (self): - return not self.list - - def first (self): - return self.list[0] - - def push (self, data): - self.list.append(data) - - def pop (self): - if self.list: - return (1, self.list.popleft()) - else: - return (0, None) - # Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. Index: Lib/asyncore.py =================================================================== --- Lib/asyncore.py (revision 53734) +++ Lib/asyncore.py (working copy) @@ -53,20 +53,26 @@ import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ - ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode + ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode try: socket_map except NameError: socket_map = {} +def _strerror(err): + res = os.strerror(err) + if res == 'Unknown error': + res = errorcode[err] + return res + class ExitNow(Exception): pass def read(obj): try: obj.handle_read_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -74,15 +80,15 @@ def write(obj): try: obj.handle_write_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() -def _exception (obj): +def _exception(obj): try: obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -95,7 +101,7 @@ obj.handle_write_event() if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -116,14 +122,15 @@ e.append(fd) if [] == r == w == e: time.sleep(timeout) - else: - try: - r, w, e = select.select(r, w, e, timeout) - except select.error, err: - if err[0] != EINTR: - raise - else: - return + return + + try: + r, w, e = select.select(r, w, e, timeout) + except select.error, err: + if err[0] != EINTR: + raise + else: + return for fd in r: obj = map.get(fd) @@ -208,19 +215,30 @@ self._map = socket_map else: self._map = map - + + self._fileno = None + if sock: + # Set to nonblocking just to make sure for cases where we + # get a socket from a blocking source. + sock.setblocking(0) self.set_socket(sock, map) - # I think it should inherit this anyway - self.socket.setblocking(0) self.connected = True - # XXX Does the constructor require that the socket passed - # be connected? + # The constructor no longer requires that the socket + # passed be connected. try: self.addr = sock.getpeername() except socket.error: - # The addr isn't crucial - pass + if err[0] == ENOTCONN: + # To handle the case where we got an unconnected + # socket. + self.connected = False + else: + # The socket is broken in some unknown way, alert + # the user and remove it from the map (to prevent + # polling of broken sockets). + self.del_channel(map) + raise else: self.socket = None @@ -254,10 +272,9 @@ def create_socket(self, family, type): self.family_and_type = family, type - self.socket = socket.socket(family, type) - self.socket.setblocking(0) - self._fileno = self.socket.fileno() - self.add_channel() + sock = socket.socket(family, type) + sock.setblocking(0) + self.set_socket(sock) def set_socket(self, sock, map=None): self.socket = sock @@ -295,7 +312,7 @@ def listen(self, num): self.accepting = True if os.name == 'nt' and num > 5: - num = 1 + num = 5 return self.socket.listen(num) def bind(self, addr): @@ -310,10 +327,9 @@ return if err in (0, EISCONN): self.addr = address - self.connected = True - self.handle_connect() + self.handle_connect_event() else: - raise socket.error, (err, errorcode[err]) + raise socket.error(err, errorcode[err]) def accept(self): # XXX can return either an address pair or None @@ -333,9 +349,11 @@ except socket.error, why: if why[0] == EWOULDBLOCK: return 0 + elif why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): + self.handle_close() + return 0 else: raise - return 0 def recv(self, buffer_size): try: @@ -349,15 +367,21 @@ return data except socket.error, why: # winsock sometimes throws ENOTCONN - if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: + if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: self.handle_close() return '' else: raise def close(self): + self.connected = False + self.accepting = False self.del_channel() - self.socket.close() + try: + self.socket.close() + except socket.error, why: + if why[0] not in (ENOTCONN, EBADF): + raise # cheap inheritance, used to pass all other attribute # references to the underlying socket object. @@ -377,27 +401,53 @@ def handle_read_event(self): if self.accepting: - # for an accepting socket, getting a read implies - # that we are connected - if not self.connected: - self.connected = True + # accepting sockets are never connected, they "spawn" new + # sockets that are connected self.handle_accept() elif not self.connected: - self.handle_connect() - self.connected = True + self.handle_connect_event() self.handle_read() else: self.handle_read() + def handle_connect_event(self): + self.connected = True + self.handle_connect() + def handle_write_event(self): - # getting a write implies that we are connected + if self.accepting: + # Accepting sockets shouldn't get a write event. + # We will pretend it didn't happen. + return + if not self.connected: - self.handle_connect() - self.connected = True + #check for errors + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise socket.error(err, strerror(err)) + + self.handle_connect_event() self.handle_write() def handle_expt_event(self): - self.handle_expt() + # if the handle_expt is the same default worthless method, + # we'll not even bother calling it, we'll instead generate + # a useful error + x = True + try: + y1 = self.__class__.handle_expt.im_func + y2 = dispatcher.handle_expt.im_func + x = y1 is y2 + except AttributeError: + pass + + if x: + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + msg = _strerror(err) + + raise socket.error(err, msg) + else: + self.handle_expt() def handle_error(self): nil, t, v, tbinfo = compact_traceback() @@ -473,7 +523,8 @@ def compact_traceback(): t, v, tb = sys.exc_info() tbinfo = [] - assert tb # Must have a traceback + if not tb: # Must have a traceback + raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, @@ -489,11 +540,22 @@ info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None): +def close_all(map=None, ignore_all=False): if map is None: map = socket_map for x in map.values(): - x.socket.close() + try: + x.close() + except OSError, x: + if x[0] == EBADF: + pass + elif not ignore_all: + raise + except (ExitNow, KeyboardInterrupt, SystemExit): + raise + except: + if not ignore_all: + raise map.clear() # Asynchronous File I/O: @@ -513,11 +575,12 @@ import fcntl class file_wrapper: - # here we override just enough to make a file + # Here we override just enough to make a file # look like a socket for the purposes of asyncore. + # The passed fd is automatically os.dup()'d def __init__(self, fd): - self.fd = fd + self.fd = os.dup(fd) def recv(self, *args): return os.read(self.fd, *args) @@ -539,6 +602,10 @@ def __init__(self, fd, map=None): dispatcher.__init__(self, None, map) self.connected = True + try: + fd = fd.fileno() + except AttributeError: + pass self.set_file(fd) # set it to non-blocking mode flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revision 0) +++ Lib/test/test_asyncore.py (revision 0) @@ -0,0 +1,219 @@ +import sys +import unittest +import time +import asyncore +import os +import socket +import select +import errno +from tempfile import TemporaryFile + +__all__ = ['asyncoreTestCase'] + +channel_errors = [] +server_errors = [] +client_errors = [] +errs = [] +test_port = 7001 + +_poll = asyncore.poll +_poll2 = asyncore.poll2 + +class EchoChannel(asyncore.dispatcher): + + def __init__ (self, server, conn, addr): + asyncore.dispatcher.__init__(self, conn) + self.buffer = '' + + def readable(self): + return True + + def writable(self): + return len(self.buffer) > 0 + + def handle_read(self): + self.buffer += self.recv(1024) + + def handle_write(self): + n = self.send(self.buffer) + self.buffer = self.buffer[n:] + + def handle_error(self): + channel_errors.append(sys.exc_info()[1]) + asyncore.handle_error(self) + +class EchoServer(asyncore.dispatcher): + + channel_class = EchoChannel + + def __init__(self, ip, port): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((ip, port)) + self.listen(2) + host, port = self.socket.getsockname() + if not ip: + self.log_info('Computing default hostname', 'warning') + ip = socket.gethostbyname(socket.gethostname()) + try: + self.server_name = socket.gethostbyaddr(ip)[0] + except socket.error: + self.log_info('Cannot do reverse lookup', 'warning') + self.server_name = ip # use the IP address as the "hostname" + + def writable(self): + return False + + def readable(self): + return True + + def handle_accept(self): + conn, addr = self.accept() + self.channel_class(self, conn, addr) + + def handle_read(self): + pass + + def handle_error(self): + asyncore.dispatcher.handle_error(self) + server_errors.append(sys.exc_info()[1]) + +class MyClient(asyncore.dispatcher): + + def __init__(self, data): + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.data = data + self.obuffer = data + self.ibuffer = '' + + def readable(self): + return True + + def writable(self): + return len(self.obuffer) > 0 + + def handle_read(self): + self.ibuffer += self.recv(1024) + if self.ibuffer == self.data: + self.close() + + def handle_write(self): + n = self.send(self.obuffer) + self.obuffer = self.obuffer[n:] + + def handle_connect(self): + pass + + def handle_error(self): + asyncore.dispatcher.handle_error(self) + client_errors.append(sys.exc_info()[1]) + +class ErrorClient(MyClient): + def handle_error(self): + errs.append(sys.exc_info()) + +class asyncoreTestCase(unittest.TestCase): + + def tearDown(self): + asyncore.close_all() + + def test_dispatcher(self): + srv = EchoServer('localhost', test_port) + srv.channel_class = EchoChannel + + cl = MyClient('ping') + cl.connect(('localhost', test_port)) + + start = time.time() + while asyncore.socket_map and time.time() - start < 5: + if cl.ibuffer == 'ping': + break + _poll(0.1) + + self.assertEqual(cl.ibuffer, 'ping') + + asyncore.close_all() + self.assertEqual(asyncore.socket_map, {}) + self.assertEqual(channel_errors, []) + self.assertEqual(server_errors, []) + self.assertEqual(client_errors, []) + + def test_closed(self): + cl = ErrorClient('foo') + cl.close() + try: + cl.connect(('localhost', test_port)) + except: + cl.handle_error() + else: + raise Exception("Connecting a closed socket should be an error") + + self.assertEqual(errs[-1][0], socket.error) + self.assertEqual(errs[-1][1].args[0], errno.EBADF) + self.assertEqual(len(asyncore.socket_map), 0) + +if os.name == 'posix': + __all__.append('file_dispatcherTestCase') + + class AsynRW(asyncore.file_dispatcher): + def __init__(self, fd, data=''): + asyncore.file_dispatcher.__init__(self, fd) + self.obuffer = data + self.ibuffer = '' + self.can_read = True + def readable(self): + return self.can_read + def writable(self): + return len(self.obuffer) > 0 + def handle_read(self): + self.ibuffer += self.read(1024) + def handle_write(self): + n = self.write(self.obuffer) + self.obuffer = self.obuffer[n:] + + class file_dispatcherTestCase(unittest.TestCase): + + def setUp(self): + self.fp = TemporaryFile() + + def tearDown(self): + self.fp.close() + asyncore.close_all() + + def testClose(self): + asyn_fp = AsynRW(0) + self.assertNotEqual(asyncore.socket_map, {}) + asyn_fp.close() + self.assertEqual(asyncore.socket_map, {}) + + def testRead(self): + text = 'some data' + + self.fp.write(text) + self.fp.seek(0) + + asyn_fp = AsynRW(self.fp.fileno()) + + last = time.time() + while asyn_fp.ibuffer != text and time.time() - last < 2: + _poll(0.1) + + self.assertEqual(asyn_fp.ibuffer, text) + + def testWrite(self): + text = 'some data' + + asyn_fp = AsynRW(self.fp.fileno(), text) + asyn_fp.can_read = False + last = time.time() + while asyn_fp.obuffer != '' and time.time() - last < 2: + _poll(0.1) + + self.assertEqual(asyn_fp.obuffer, '') + self.fp.seek(0) + self.assertEqual(self.fp.read(1024), text) + +if __name__ == '__main__': + unittest.main()