cpython: f713d9b6393c (original) (raw)
Mercurial > cpython
changeset 86549:f713d9b6393c
Issue #19170: telnetlib: use selectors. [#19170]
Charles-François Natali cf.natali@gmail.com | |
---|---|
date | Mon, 21 Oct 2013 14:02:12 +0200 |
parents | 10dbe5591e62 |
children | dad1debba93c |
files | Lib/telnetlib.py Lib/test/test_telnetlib.py |
diffstat | 2 files changed, 100 insertions(+), 274 deletions(-)[+] [-] Lib/telnetlib.py 273 Lib/test/test_telnetlib.py 101 |
line wrap: on
line diff
--- a/Lib/telnetlib.py +++ b/Lib/telnetlib.py @@ -17,13 +17,12 @@ guido Guido van Rossum pts/2 Note that read_all() won't read until eof -- it just reads some data -- but it guarantees to read at least one byte unless EOF is hit. -It is possible to pass a Telnet object to select.select() in order to -wait until more data is available. Note that in this case, -read_eager() may return b'' even if there was data on the socket, -because the protocol negotiation may have eaten the data. This is why -EOFError is needed in some cases to distinguish between "no data" and -"connection closed" (since the socket also appears ready for reading -when it is closed). +It is possible to pass a Telnet object to a selector in order to wait until +more data is available. Note that in this case, read_eager() may return b'' +even if there was data on the socket, because the protocol negotiation may have +eaten the data. This is why EOFError is needed in some cases to distinguish +between "no data" and "connection closed" (since the socket also appears ready +for reading when it is closed). To do:
Imported modules
-import errno import sys import socket -import select +import selectors all = ["Telnet"] @@ -130,6 +128,15 @@ PRAGMA_HEARTBEAT = bytes([140]) # TELOPT EXOPL = bytes([255]) # Extended-Options-List NOOPT = bytes([0]) + +# poll/select have the advantage of not requiring any extra file descriptor, +# contrarily to epoll/kqueue (also, they require a single syscall). +if hasattr(selectors, 'PollSelector'):
+ + class Telnet: """Telnet interface class. @@ -206,7 +213,6 @@ class Telnet: self.sb = 0 # flag for SB and SE sequence. self.sbdataq = b'' self.option_callback = None
self._has_poll = hasattr(select, 'poll')[](#l1.55) if host is not None:[](#l1.56) self.open(host, port, timeout)[](#l1.57)
@@ -289,61 +295,6 @@ class Telnet: is closed and no cooked data is available. """
if self._has_poll:[](#l1.63)
return self._read_until_with_poll(match, timeout)[](#l1.64)
else:[](#l1.65)
return self._read_until_with_select(match, timeout)[](#l1.66)
- def _read_until_with_poll(self, match, timeout):
"""Read until a given string is encountered or until timeout.[](#l1.69)
This method uses select.poll() to implement the timeout.[](#l1.71)
"""[](#l1.72)
n = len(match)[](#l1.73)
call_timeout = timeout[](#l1.74)
if timeout is not None:[](#l1.75)
from time import time[](#l1.76)
time_start = time()[](#l1.77)
self.process_rawq()[](#l1.78)
i = self.cookedq.find(match)[](#l1.79)
if i < 0:[](#l1.80)
poller = select.poll()[](#l1.81)
poll_in_or_priority_flags = select.POLLIN | select.POLLPRI[](#l1.82)
poller.register(self, poll_in_or_priority_flags)[](#l1.83)
while i < 0 and not self.eof:[](#l1.84)
try:[](#l1.85)
ready = poller.poll(call_timeout)[](#l1.86)
except OSError as e:[](#l1.87)
if e.errno == errno.EINTR:[](#l1.88)
if timeout is not None:[](#l1.89)
elapsed = time() - time_start[](#l1.90)
call_timeout = timeout-elapsed[](#l1.91)
continue[](#l1.92)
raise[](#l1.93)
for fd, mode in ready:[](#l1.94)
if mode & poll_in_or_priority_flags:[](#l1.95)
i = max(0, len(self.cookedq)-n)[](#l1.96)
self.fill_rawq()[](#l1.97)
self.process_rawq()[](#l1.98)
i = self.cookedq.find(match, i)[](#l1.99)
if timeout is not None:[](#l1.100)
elapsed = time() - time_start[](#l1.101)
if elapsed >= timeout:[](#l1.102)
break[](#l1.103)
call_timeout = timeout-elapsed[](#l1.104)
poller.unregister(self)[](#l1.105)
if i >= 0:[](#l1.106)
i = i + n[](#l1.107)
buf = self.cookedq[:i][](#l1.108)
self.cookedq = self.cookedq[i:][](#l1.109)
return buf[](#l1.110)
return self.read_very_lazy()[](#l1.111)
- def _read_until_with_select(self, match, timeout=None):
"""Read until a given string is encountered or until timeout.[](#l1.114)
The timeout is implemented using select.select().[](#l1.116)
"""[](#l1.117) n = len(match)[](#l1.118) self.process_rawq()[](#l1.119) i = self.cookedq.find(match)[](#l1.120)
@@ -352,27 +303,26 @@ class Telnet: buf = self.cookedq[:i] self.cookedq = self.cookedq[i:] return buf
s_reply = ([self], [], [])[](#l1.125)
s_args = s_reply[](#l1.126) if timeout is not None:[](#l1.127)
s_args = s_args + (timeout,)[](#l1.128) from time import time[](#l1.129)
time_start = time()[](#l1.130)
while not self.eof and select.select(*s_args) == s_reply:[](#l1.131)
i = max(0, len(self.cookedq)-n)[](#l1.132)
self.fill_rawq()[](#l1.133)
self.process_rawq()[](#l1.134)
i = self.cookedq.find(match, i)[](#l1.135)
if i >= 0:[](#l1.136)
i = i+n[](#l1.137)
buf = self.cookedq[:i][](#l1.138)
self.cookedq = self.cookedq[i:][](#l1.139)
return buf[](#l1.140)
if timeout is not None:[](#l1.141)
elapsed = time() - time_start[](#l1.142)
if elapsed >= timeout:[](#l1.143)
break[](#l1.144)
s_args = s_reply + (timeout-elapsed,)[](#l1.145)
deadline = time() + timeout[](#l1.146)
with _TelnetSelector() as selector:[](#l1.147)
selector.register(self, selectors.EVENT_READ)[](#l1.148)
while not self.eof:[](#l1.149)
if selector.select(timeout):[](#l1.150)
i = max(0, len(self.cookedq)-n)[](#l1.151)
self.fill_rawq()[](#l1.152)
self.process_rawq()[](#l1.153)
i = self.cookedq.find(match, i)[](#l1.154)
if i >= 0:[](#l1.155)
i = i+n[](#l1.156)
buf = self.cookedq[:i][](#l1.157)
self.cookedq = self.cookedq[i:][](#l1.158)
return buf[](#l1.159)
if timeout is not None:[](#l1.160)
timeout = deadline - time()[](#l1.161)
if timeout < 0:[](#l1.162)
break[](#l1.163) return self.read_very_lazy()[](#l1.164)
def read_all(self): @@ -577,29 +527,35 @@ class Telnet: def sock_avail(self): """Test whether data is available on the socket."""
return select.select([self], [], [], 0) == ([self], [], [])[](#l1.171)
with _TelnetSelector() as selector:[](#l1.172)
selector.register(self, selectors.EVENT_READ)[](#l1.173)
return bool(selector.select(0))[](#l1.174)
def interact(self): """Interaction function, emulates a very dumb telnet client.""" if sys.platform == "win32": self.mt_interact() return
while 1:[](#l1.181)
rfd, wfd, xfd = select.select([self, sys.stdin], [], [])[](#l1.182)
if self in rfd:[](#l1.183)
try:[](#l1.184)
text = self.read_eager()[](#l1.185)
except EOFError:[](#l1.186)
print('*** Connection closed by remote host ***')[](#l1.187)
break[](#l1.188)
if text:[](#l1.189)
sys.stdout.write(text.decode('ascii'))[](#l1.190)
sys.stdout.flush()[](#l1.191)
if sys.stdin in rfd:[](#l1.192)
line = sys.stdin.readline().encode('ascii')[](#l1.193)
if not line:[](#l1.194)
break[](#l1.195)
self.write(line)[](#l1.196)
with _TelnetSelector() as selector:[](#l1.197)
selector.register(self, selectors.EVENT_READ)[](#l1.198)
selector.register(sys.stdin, selectors.EVENT_READ)[](#l1.199)
while True:[](#l1.201)
for key, events in selector.select():[](#l1.202)
if key.fileobj is self:[](#l1.203)
try:[](#l1.204)
text = self.read_eager()[](#l1.205)
except EOFError:[](#l1.206)
print('*** Connection closed by remote host ***')[](#l1.207)
return[](#l1.208)
if text:[](#l1.209)
sys.stdout.write(text.decode('ascii'))[](#l1.210)
sys.stdout.flush()[](#l1.211)
elif key.fileobj is sys.stdin:[](#l1.212)
line = sys.stdin.readline().encode('ascii')[](#l1.213)
if not line:[](#l1.214)
return[](#l1.215)
self.write(line)[](#l1.216)
def mt_interact(self): """Multithreaded version of interact().""" @@ -646,79 +602,6 @@ class Telnet: results are undeterministic, and may depend on the I/O timing. """
if self._has_poll:[](#l1.224)
return self._expect_with_poll(list, timeout)[](#l1.225)
else:[](#l1.226)
return self._expect_with_select(list, timeout)[](#l1.227)
- def _expect_with_poll(self, expect_list, timeout=None):
"""Read until one from a list of a regular expressions matches.[](#l1.230)
This method uses select.poll() to implement the timeout.[](#l1.232)
"""[](#l1.233)
re = None[](#l1.234)
expect_list = expect_list[:][](#l1.235)
indices = range(len(expect_list))[](#l1.236)
for i in indices:[](#l1.237)
if not hasattr(expect_list[i], "search"):[](#l1.238)
if not re: import re[](#l1.239)
expect_list[i] = re.compile(expect_list[i])[](#l1.240)
call_timeout = timeout[](#l1.241)
if timeout is not None:[](#l1.242)
from time import time[](#l1.243)
time_start = time()[](#l1.244)
self.process_rawq()[](#l1.245)
m = None[](#l1.246)
for i in indices:[](#l1.247)
m = expect_list[i].search(self.cookedq)[](#l1.248)
if m:[](#l1.249)
e = m.end()[](#l1.250)
text = self.cookedq[:e][](#l1.251)
self.cookedq = self.cookedq[e:][](#l1.252)
break[](#l1.253)
if not m:[](#l1.254)
poller = select.poll()[](#l1.255)
poll_in_or_priority_flags = select.POLLIN | select.POLLPRI[](#l1.256)
poller.register(self, poll_in_or_priority_flags)[](#l1.257)
while not m and not self.eof:[](#l1.258)
try:[](#l1.259)
ready = poller.poll(call_timeout)[](#l1.260)
except OSError as e:[](#l1.261)
if e.errno == errno.EINTR:[](#l1.262)
if timeout is not None:[](#l1.263)
elapsed = time() - time_start[](#l1.264)
call_timeout = timeout-elapsed[](#l1.265)
continue[](#l1.266)
raise[](#l1.267)
for fd, mode in ready:[](#l1.268)
if mode & poll_in_or_priority_flags:[](#l1.269)
self.fill_rawq()[](#l1.270)
self.process_rawq()[](#l1.271)
for i in indices:[](#l1.272)
m = expect_list[i].search(self.cookedq)[](#l1.273)
if m:[](#l1.274)
e = m.end()[](#l1.275)
text = self.cookedq[:e][](#l1.276)
self.cookedq = self.cookedq[e:][](#l1.277)
break[](#l1.278)
if timeout is not None:[](#l1.279)
elapsed = time() - time_start[](#l1.280)
if elapsed >= timeout:[](#l1.281)
break[](#l1.282)
call_timeout = timeout-elapsed[](#l1.283)
poller.unregister(self)[](#l1.284)
if m:[](#l1.285)
return (i, m, text)[](#l1.286)
text = self.read_very_lazy()[](#l1.287)
if not text and self.eof:[](#l1.288)
raise EOFError[](#l1.289)
return (-1, None, text)[](#l1.290)
- def _expect_with_select(self, list, timeout=None):
"""Read until one from a list of a regular expressions matches.[](#l1.293)
The timeout is implemented using select.select().[](#l1.295)
"""[](#l1.296) re = None[](#l1.297) list = list[:][](#l1.298) indices = range(len(list))[](#l1.299)
@@ -728,27 +611,27 @@ class Telnet: list[i] = re.compile(list[i]) if timeout is not None: from time import time
time_start = time()[](#l1.304)
while 1:[](#l1.305)
self.process_rawq()[](#l1.306)
for i in indices:[](#l1.307)
m = list[i].search(self.cookedq)[](#l1.308)
if m:[](#l1.309)
e = m.end()[](#l1.310)
text = self.cookedq[:e][](#l1.311)
self.cookedq = self.cookedq[e:][](#l1.312)
return (i, m, text)[](#l1.313)
if self.eof:[](#l1.314)
break[](#l1.315)
if timeout is not None:[](#l1.316)
elapsed = time() - time_start[](#l1.317)
if elapsed >= timeout:[](#l1.318)
break[](#l1.319)
s_args = ([self.fileno()], [], [], timeout-elapsed)[](#l1.320)
r, w, x = select.select(*s_args)[](#l1.321)
if not r:[](#l1.322)
break[](#l1.323)
self.fill_rawq()[](#l1.324)
deadline = time() + timeout[](#l1.325)
with _TelnetSelector() as selector:[](#l1.326)
selector.register(self, selectors.EVENT_READ)[](#l1.327)
while not self.eof:[](#l1.328)
self.process_rawq()[](#l1.329)
for i in indices:[](#l1.330)
m = list[i].search(self.cookedq)[](#l1.331)
if m:[](#l1.332)
e = m.end()[](#l1.333)
text = self.cookedq[:e][](#l1.334)
self.cookedq = self.cookedq[e:][](#l1.335)
return (i, m, text)[](#l1.336)
if timeout is not None:[](#l1.337)
ready = selector.select(timeout)[](#l1.338)
timeout = deadline - time()[](#l1.339)
if not ready:[](#l1.340)
if timeout < 0:[](#l1.341)
break[](#l1.342)
else:[](#l1.343)
continue[](#l1.344)
self.fill_rawq()[](#l1.345) text = self.read_very_lazy()[](#l1.346) if not text and self.eof:[](#l1.347) raise EOFError[](#l1.348)
--- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -1,10 +1,9 @@ import socket -import select +import selectors import telnetlib import time import contextlib -import unittest from unittest import TestCase from test import support threading = support.import_module('threading') @@ -112,40 +111,32 @@ class TelnetAlike(telnetlib.Telnet): self._messages += out.getvalue() return -def mock_select(*s_args):
- block = False
- for l in s_args:
for fob in l:[](#l2.22)
if isinstance(fob, TelnetAlike):[](#l2.23)
block = fob.sock.block[](#l2.24)
- if block:
return [[], [], []][](#l2.26)
- else:
return s_args[](#l2.28)
+class MockSelector(selectors.BaseSelector): def init(self):
self._file_objs = [][](#l2.35)
super().__init__()[](#l2.36)
self.keys = {}[](#l2.37)
- def register(self, fileobj, events, data=None):
key = selectors.SelectorKey(fileobj, 0, events, data)[](#l2.40)
self.keys[fileobj] = key[](#l2.41)
return key[](#l2.42)
- def register(self, fd, eventmask):
self.test_case.assertTrue(hasattr(fd, 'fileno'), fd)[](#l2.45)
self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI)[](#l2.46)
self._file_objs.append(fd)[](#l2.47)
for fob in self._file_objs:[](#l2.55)
if isinstance(fob, TelnetAlike):[](#l2.56)
block = fob.sock.block[](#l2.57)
for fileobj in self.keys:[](#l2.58)
if isinstance(fileobj, TelnetAlike):[](#l2.59)
block = fileobj.sock.block[](#l2.60)
break[](#l2.61) if block:[](#l2.62) return [][](#l2.63) else:[](#l2.64)
return zip(self._file_objs, [select.POLLIN]*len(self._file_objs))[](#l2.65)
return [(key, key.events) for key in self.keys.values()][](#l2.66)
@contextlib.contextmanager def test_socket(reads): @@ -159,7 +150,7 @@ def test_socket(reads): socket.create_connection = old_conn return -def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): +def test_telnet(reads=(), cls=TelnetAlike): ''' return a telnetlib.Telnet object that uses a SocketStub with reads queued up to be read ''' for x in reads: @@ -167,29 +158,14 @@ def test_telnet(reads=(), cls=TelnetAlik with test_socket(reads): telnet = cls('dummy', 0) telnet._messages = '' # debuglevel output
if use_poll is not None:[](#l2.86)
if use_poll and not telnet._has_poll:[](#l2.87)
raise unittest.SkipTest('select.poll() required.')[](#l2.88)
return telnet - class ExpectAndReadTestCase(TestCase): def setUp(self):telnet._has_poll = use_poll[](#l2.89)
self.old_select = select.select[](#l2.95)
select.select = mock_select[](#l2.96)
self.old_poll = False[](#l2.97)
if hasattr(select, 'poll'):[](#l2.98)
self.old_poll = select.poll[](#l2.99)
select.poll = MockPoller[](#l2.100)
MockPoller.test_case = self[](#l2.101)
self.old_selector = telnetlib._TelnetSelector[](#l2.103)
def tearDown(self):telnetlib._TelnetSelector = MockSelector[](#l2.104)
if self.old_poll:[](#l2.106)
MockPoller.test_case = None[](#l2.107)
select.poll = self.old_poll[](#l2.108)
select.select = self.old_select[](#l2.109)
telnetlib._TelnetSelector = self.old_selector[](#l2.111)
class ReadTests(ExpectAndReadTestCase): def test_read_until(self): @@ -208,22 +184,6 @@ class ReadTests(ExpectAndReadTestCase): data = telnet.read_until(b'match') self.assertEqual(data, expect)
- def test_read_until_with_poll(self):
"""Use select.poll() to implement telnet.read_until()."""[](#l2.120)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.121)
telnet = test_telnet(want, use_poll=True)[](#l2.122)
select.select = lambda *_: self.fail('unexpected select() call.')[](#l2.123)
data = telnet.read_until(b'match')[](#l2.124)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.125)
- def test_read_until_with_select(self):
"""Use select.select() to implement telnet.read_until()."""[](#l2.128)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.129)
telnet = test_telnet(want, use_poll=False)[](#l2.130)
if self.old_poll:[](#l2.131)
select.poll = lambda *_: self.fail('unexpected poll() call.')[](#l2.132)
data = telnet.read_until(b'match')[](#l2.133)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.134)
def test_read_all(self): """ @@ -427,23 +387,6 @@ class ExpectTests(ExpectAndReadTestCase) (,,data) = telnet.expect([b'match']) self.assertEqual(data, b''.join(want[:-1]))
- def test_expect_with_poll(self):
"""Use select.poll() to implement telnet.expect()."""[](#l2.143)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.144)
telnet = test_telnet(want, use_poll=True)[](#l2.145)
select.select = lambda *_: self.fail('unexpected select() call.')[](#l2.146)
(_,_,data) = telnet.expect([b'match'])[](#l2.147)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.148)
- def test_expect_with_select(self):
"""Use select.select() to implement telnet.expect()."""[](#l2.151)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.152)
telnet = test_telnet(want, use_poll=False)[](#l2.153)
if self.old_poll:[](#l2.154)
select.poll = lambda *_: self.fail('unexpected poll() call.')[](#l2.155)
(_,_,data) = telnet.expect([b'match'])[](#l2.156)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.157)
- def test_main(verbose=None): support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests,