cpython: de229dde486b (original) (raw)
--- a/Lib/telnetlib.py +++ b/Lib/telnetlib.py @@ -34,6 +34,7 @@ To do:
Imported modules
+import errno import sys import socket import select @@ -205,6 +206,7 @@ class Telnet: self.sb = 0 # flag for SB and SE sequence. self.sbdataq = b'' self.option_callback = None
self._has_poll = hasattr(select, 'poll')[](#l1.15) if host is not None:[](#l1.16) self.open(host, port, timeout)[](#l1.17)
@@ -287,6 +289,61 @@ class Telnet: is closed and no cooked data is available. """
if self._has_poll:[](#l1.23)
return self._read_until_with_poll(match, timeout)[](#l1.24)
else:[](#l1.25)
return self._read_until_with_select(match, timeout)[](#l1.26)
- def _read_until_with_poll(self, match, timeout):
"""Read until a given string is encountered or until timeout.[](#l1.29)
This method uses select.poll() to implement the timeout.[](#l1.31)
"""[](#l1.32)
n = len(match)[](#l1.33)
call_timeout = timeout[](#l1.34)
if timeout is not None:[](#l1.35)
from time import time[](#l1.36)
time_start = time()[](#l1.37)
self.process_rawq()[](#l1.38)
i = self.cookedq.find(match)[](#l1.39)
if i < 0:[](#l1.40)
poller = select.poll()[](#l1.41)
poll_in_or_priority_flags = select.POLLIN | select.POLLPRI[](#l1.42)
poller.register(self, poll_in_or_priority_flags)[](#l1.43)
while i < 0 and not self.eof:[](#l1.44)
try:[](#l1.45)
ready = poller.poll(call_timeout)[](#l1.46)
except select.error as e:[](#l1.47)
if e.errno == errno.EINTR:[](#l1.48)
if timeout is not None:[](#l1.49)
elapsed = time() - time_start[](#l1.50)
call_timeout = timeout-elapsed[](#l1.51)
continue[](#l1.52)
raise[](#l1.53)
for fd, mode in ready:[](#l1.54)
if mode & poll_in_or_priority_flags:[](#l1.55)
i = max(0, len(self.cookedq)-n)[](#l1.56)
self.fill_rawq()[](#l1.57)
self.process_rawq()[](#l1.58)
i = self.cookedq.find(match, i)[](#l1.59)
if timeout is not None:[](#l1.60)
elapsed = time() - time_start[](#l1.61)
if elapsed >= timeout:[](#l1.62)
break[](#l1.63)
call_timeout = timeout-elapsed[](#l1.64)
poller.unregister(self)[](#l1.65)
if i >= 0:[](#l1.66)
i = i + n[](#l1.67)
buf = self.cookedq[:i][](#l1.68)
self.cookedq = self.cookedq[i:][](#l1.69)
return buf[](#l1.70)
return self.read_very_lazy()[](#l1.71)
- def _read_until_with_select(self, match, timeout=None):
"""Read until a given string is encountered or until timeout.[](#l1.74)
The timeout is implemented using select.select().[](#l1.76)
"""[](#l1.77) n = len(match)[](#l1.78) self.process_rawq()[](#l1.79) i = self.cookedq.find(match)[](#l1.80)
@@ -589,6 +646,79 @@ class Telnet: results are undeterministic, and may depend on the I/O timing. """
if self._has_poll:[](#l1.85)
return self._expect_with_poll(list, timeout)[](#l1.86)
else:[](#l1.87)
return self._expect_with_select(list, timeout)[](#l1.88)
- def _expect_with_poll(self, expect_list, timeout=None):
"""Read until one from a list of a regular expressions matches.[](#l1.91)
This method uses select.poll() to implement the timeout.[](#l1.93)
"""[](#l1.94)
re = None[](#l1.95)
expect_list = expect_list[:][](#l1.96)
indices = range(len(expect_list))[](#l1.97)
for i in indices:[](#l1.98)
if not hasattr(expect_list[i], "search"):[](#l1.99)
if not re: import re[](#l1.100)
expect_list[i] = re.compile(expect_list[i])[](#l1.101)
call_timeout = timeout[](#l1.102)
if timeout is not None:[](#l1.103)
from time import time[](#l1.104)
time_start = time()[](#l1.105)
self.process_rawq()[](#l1.106)
m = None[](#l1.107)
for i in indices:[](#l1.108)
m = expect_list[i].search(self.cookedq)[](#l1.109)
if m:[](#l1.110)
e = m.end()[](#l1.111)
text = self.cookedq[:e][](#l1.112)
self.cookedq = self.cookedq[e:][](#l1.113)
break[](#l1.114)
if not m:[](#l1.115)
poller = select.poll()[](#l1.116)
poll_in_or_priority_flags = select.POLLIN | select.POLLPRI[](#l1.117)
poller.register(self, poll_in_or_priority_flags)[](#l1.118)
while not m and not self.eof:[](#l1.119)
try:[](#l1.120)
ready = poller.poll(call_timeout)[](#l1.121)
except select.error as e:[](#l1.122)
if e.errno == errno.EINTR:[](#l1.123)
if timeout is not None:[](#l1.124)
elapsed = time() - time_start[](#l1.125)
call_timeout = timeout-elapsed[](#l1.126)
continue[](#l1.127)
raise[](#l1.128)
for fd, mode in ready:[](#l1.129)
if mode & poll_in_or_priority_flags:[](#l1.130)
self.fill_rawq()[](#l1.131)
self.process_rawq()[](#l1.132)
for i in indices:[](#l1.133)
m = expect_list[i].search(self.cookedq)[](#l1.134)
if m:[](#l1.135)
e = m.end()[](#l1.136)
text = self.cookedq[:e][](#l1.137)
self.cookedq = self.cookedq[e:][](#l1.138)
break[](#l1.139)
if timeout is not None:[](#l1.140)
elapsed = time() - time_start[](#l1.141)
if elapsed >= timeout:[](#l1.142)
break[](#l1.143)
call_timeout = timeout-elapsed[](#l1.144)
poller.unregister(self)[](#l1.145)
if m:[](#l1.146)
return (i, m, text)[](#l1.147)
text = self.read_very_lazy()[](#l1.148)
if not text and self.eof:[](#l1.149)
raise EOFError[](#l1.150)
return (-1, None, text)[](#l1.151)
- def _expect_with_select(self, list, timeout=None):
"""Read until one from a list of a regular expressions matches.[](#l1.154)
The timeout is implemented using select.select().[](#l1.156)
"""[](#l1.157) re = None[](#l1.158) list = list[:][](#l1.159) indices = range(len(list))[](#l1.160)
--- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -75,8 +75,8 @@ class GeneralTests(TestCase): class SocketStub(object): ''' a socket proxy that re-defines sendall() '''
- def init(self, reads=()):
def sendall(self, data): @@ -102,7 +102,7 @@ class TelnetAlike(telnetlib.Telnet): self._messages += out.getvalue() returnself.reads = list(reads) # Intentionally make a copy.[](#l2.10) self.writes = [][](#l2.11) self.block = False[](#l2.12)
-def new_select(*s_args): +def mock_select(*s_args): block = False for l in s_args: for fob in l: @@ -113,6 +113,30 @@ def new_select(*s_args): else: return s_args +class MockPoller(object):
- def register(self, fd, eventmask):
self.test_case.assertTrue(hasattr(fd, 'fileno'), fd)[](#l2.34)
self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI)[](#l2.35)
self._file_objs.append(fd)[](#l2.36)
- def poll(self, timeout=None):
block = False[](#l2.39)
for fob in self._file_objs:[](#l2.40)
if isinstance(fob, TelnetAlike):[](#l2.41)
block = fob.sock.block[](#l2.42)
if block:[](#l2.43)
return [][](#l2.44)
else:[](#l2.45)
return zip(self._file_objs, [select.POLLIN]*len(self._file_objs))[](#l2.46)
+ @contextlib.contextmanager def test_socket(reads): def new_conn(*ignored): @@ -125,7 +149,7 @@ def test_socket(reads): socket.create_connection = old_conn return -def test_telnet(reads=[], cls=TelnetAlike): +def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): ''' return a telnetlib.Telnet object that uses a SocketStub with reads queued up to be read ''' for x in reads: @@ -133,15 +157,28 @@ def test_telnet(reads=[], cls=TelnetAlik with test_socket(reads): telnet = cls('dummy', 0) telnet._messages = '' # debuglevel output
if use_poll is not None:[](#l2.67)
if use_poll and not telnet._has_poll:[](#l2.68)
raise unittest.SkipTest('select.poll() required.')[](#l2.69)
return telnet -class ReadTests(TestCase): + +class ExpectAndReadTestCase(TestCase): def setUp(self): self.old_select = select.selecttelnet._has_poll = use_poll[](#l2.70)
select.select = new_select[](#l2.78)
self.old_poll = select.poll[](#l2.79)
select.select = mock_select[](#l2.80)
select.poll = MockPoller[](#l2.81)
MockPoller.test_case = self[](#l2.82)
MockPoller.test_case = None[](#l2.85)
select.poll = self.old_poll[](#l2.86) select.select = self.old_select[](#l2.87)
+ +class ReadTests(ExpectAndReadTestCase): def test_read_until(self): """ read_until(expected, timeout=None) @@ -158,6 +195,21 @@ class ReadTests(TestCase): data = telnet.read_until(b'match') self.assertEqual(data, expect)
- def test_read_until_with_poll(self):
"""Use select.poll() to implement telnet.read_until()."""[](#l2.99)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.100)
telnet = test_telnet(want, use_poll=True)[](#l2.101)
select.select = lambda *_: self.fail('unexpected select() call.')[](#l2.102)
data = telnet.read_until(b'match')[](#l2.103)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.104)
- def test_read_until_with_select(self):
"""Use select.select() to implement telnet.read_until()."""[](#l2.107)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.108)
telnet = test_telnet(want, use_poll=False)[](#l2.109)
select.poll = lambda *_: self.fail('unexpected poll() call.')[](#l2.110)
data = telnet.read_until(b'match')[](#l2.111)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.112)
def test_read_all(self): """ @@ -349,8 +401,38 @@ class OptionTests(TestCase): self.assertRegex(telnet._messages, r'0.*test') +class ExpectTests(ExpectAndReadTestCase):
- def test_expect(self):
"""[](#l2.122)
expect(expected, [timeout])[](#l2.123)
Read until the expected string has been seen, or a timeout is[](#l2.124)
hit (default is no timeout); may block.[](#l2.125)
"""[](#l2.126)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.127)
telnet = test_telnet(want)[](#l2.128)
(_,_,data) = telnet.expect([b'match'])[](#l2.129)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.130)
- def test_expect_with_poll(self):
"""Use select.poll() to implement telnet.expect()."""[](#l2.133)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.134)
telnet = test_telnet(want, use_poll=True)[](#l2.135)
select.select = lambda *_: self.fail('unexpected select() call.')[](#l2.136)
(_,_,data) = telnet.expect([b'match'])[](#l2.137)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.138)
- def test_expect_with_select(self):
"""Use select.select() to implement telnet.expect()."""[](#l2.141)
want = [b'x' * 10, b'match', b'y' * 10][](#l2.142)
telnet = test_telnet(want, use_poll=False)[](#l2.143)
select.poll = lambda *_: self.fail('unexpected poll() call.')[](#l2.144)
(_,_,data) = telnet.expect([b'match'])[](#l2.145)
self.assertEqual(data, b''.join(want[:-1]))[](#l2.146)
+ + def test_main(verbose=None):
if name == 'main': test_main()
--- a/Misc/ACKS +++ b/Misc/ACKS @@ -410,6 +410,7 @@ Chris Hoffman Albert Hofkamp Tomas Hoger Jonathan Hogg +Akintayo Holder Gerrit Holl Shane Holloway Rune Holm
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -87,6 +87,9 @@ Core and Builtins Library ------- +- Issue #14635: telnetlib will use poll() rather than select() when possible