cpython: b07b0b7517da (original) (raw)
--- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -87,6 +87,13 @@ def capture_server(evt, buf, serv): serv.close() evt.set() +def bind_af_aware(sock, addr):
- """Helper function to bind a socket according to its family."""
- if sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.[](#l1.10)
unlink(addr)[](#l1.11)
- sock.bind(addr)
+ class HelperFunctionTests(unittest.TestCase): def test_readwriteexc(self): @@ -467,22 +474,22 @@ class BaseTestHandler(asyncore.dispatche raise -class TCPServer(asyncore.dispatcher): +class BaseServer(asyncore.dispatcher): """A server which listens on an address and dispatches the connection to a handler. """
self.create_socket()[](#l1.30)
self.create_socket(family)[](#l1.31) self.set_reuse_addr()[](#l1.32)
self.bind((host, port))[](#l1.33)
bind_af_aware(self.socket, addr)[](#l1.34) self.listen(5)[](#l1.35) self.handler = handler[](#l1.36)
return self.socket.getsockname()[:2][](#l1.40)
return self.socket.getsockname()[](#l1.41)
def handle_accepted(self, sock, addr): self.handler(sock) @@ -493,9 +500,9 @@ class TCPServer(asyncore.dispatcher): class BaseClient(BaseTestHandler):
self.create_socket()[](#l1.52)
self.create_socket(family)[](#l1.53) self.connect(address)[](#l1.54)
def handle_connect(self): @@ -525,8 +532,8 @@ class BaseTestAPI(unittest.TestCase): def handle_connect(self): self.flag = True
server = TCPServer()[](#l1.61)
client = TestClient(server.address)[](#l1.62)
server = BaseServer(self.family, self.addr)[](#l1.63)
client = TestClient(self.family, server.address)[](#l1.64) self.loop_waiting_for_flag(client)[](#l1.65)
def test_handle_accept(self): @@ -534,18 +541,18 @@ class BaseTestAPI(unittest.TestCase): class TestListener(BaseTestHandler):
def __init__(self):[](#l1.72)
def __init__(self, family, addr):[](#l1.73) BaseTestHandler.__init__(self)[](#l1.74)
self.create_socket()[](#l1.75)
self.bind((HOST, 0))[](#l1.76)
self.create_socket(family)[](#l1.77)
bind_af_aware(self.socket, addr)[](#l1.78) self.listen(5)[](#l1.79)
self.address = self.socket.getsockname()[:2][](#l1.80)
self.address = self.socket.getsockname()[](#l1.81)
def handle_accept(self): self.flag = True
server = TestListener()[](#l1.86)
client = BaseClient(server.address)[](#l1.87)
server = TestListener(self.family, self.addr)[](#l1.88)
client = BaseClient(self.family, server.address)[](#l1.89) self.loop_waiting_for_flag(server)[](#l1.90)
def test_handle_accepted(self): @@ -553,12 +560,12 @@ class BaseTestAPI(unittest.TestCase): class TestListener(BaseTestHandler):
def __init__(self):[](#l1.97)
def __init__(self, family, addr):[](#l1.98) BaseTestHandler.__init__(self)[](#l1.99)
self.create_socket()[](#l1.100)
self.bind((HOST, 0))[](#l1.101)
self.create_socket(family)[](#l1.102)
bind_af_aware(self.socket, addr)[](#l1.103) self.listen(5)[](#l1.104)
self.address = self.socket.getsockname()[:2][](#l1.105)
self.address = self.socket.getsockname()[](#l1.106)
def handle_accept(self): asyncore.dispatcher.handle_accept(self) @@ -567,8 +574,8 @@ class BaseTestAPI(unittest.TestCase): sock.close() self.flag = True
server = TestListener()[](#l1.114)
client = BaseClient(server.address)[](#l1.115)
server = TestListener(self.family, self.addr)[](#l1.116)
client = BaseClient(self.family, server.address)[](#l1.117) self.loop_waiting_for_flag(server)[](#l1.118)
@@ -584,8 +591,8 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.init(self, conn) self.send(b'x' * 1024)
server = TCPServer(TestHandler)[](#l1.125)
client = TestClient(server.address)[](#l1.126)
server = BaseServer(self.family, self.addr, TestHandler)[](#l1.127)
client = TestClient(self.family, server.address)[](#l1.128) self.loop_waiting_for_flag(client)[](#l1.129)
def test_handle_write(self): @@ -595,8 +602,8 @@ class BaseTestAPI(unittest.TestCase): def handle_write(self): self.flag = True
server = TCPServer()[](#l1.136)
client = TestClient(server.address)[](#l1.137)
server = BaseServer(self.family, self.addr)[](#l1.138)
client = TestClient(self.family, server.address)[](#l1.139) self.loop_waiting_for_flag(client)[](#l1.140)
def test_handle_close(self): @@ -619,8 +626,8 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.init(self, conn) self.close()
server = TCPServer(TestHandler)[](#l1.147)
client = TestClient(server.address)[](#l1.148)
server = BaseServer(self.family, self.addr, TestHandler)[](#l1.149)
client = TestClient(self.family, server.address)[](#l1.150) self.loop_waiting_for_flag(client)[](#l1.151)
@unittest.skipIf(sys.platform.startswith("sunos"), @@ -629,6 +636,8 @@ class BaseTestAPI(unittest.TestCase): # Make sure handle_expt is called on OOB data received. # Note: this might fail on some platforms as OOB data is # tenuously supported and rarely used.
if self.family == socket.AF_UNIX:[](#l1.158)
self.skipTest("Not applicable to AF_UNIX sockets.")[](#l1.159)
class TestClient(BaseClient): def handle_expt(self): @@ -639,8 +648,8 @@ class BaseTestAPI(unittest.TestCase): BaseTestHandler.init(self, conn) self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = TCPServer(TestHandler)[](#l1.167)
client = TestClient(server.address)[](#l1.168)
server = BaseServer(self.family, self.addr, TestHandler)[](#l1.169)
client = TestClient(self.family, server.address)[](#l1.170) self.loop_waiting_for_flag(client)[](#l1.171)
def test_handle_error(self): @@ -657,13 +666,13 @@ class BaseTestAPI(unittest.TestCase): else: raise Exception("exception not raised")
server = TCPServer()[](#l1.178)
client = TestClient(server.address)[](#l1.179)
server = BaseServer(self.family, self.addr)[](#l1.180)
client = TestClient(self.family, server.address)[](#l1.181) self.loop_waiting_for_flag(client)[](#l1.182)
def test_connection_attributes(self):
server = TCPServer()[](#l1.185)
client = BaseClient(server.address)[](#l1.186)
server = BaseServer(self.family, self.addr)[](#l1.187)
client = BaseClient(self.family, server.address)[](#l1.188)
# we start disconnected self.assertFalse(server.connected) @@ -693,25 +702,29 @@ class BaseTestAPI(unittest.TestCase): def test_create_socket(self): s = asyncore.dispatcher()
s.create_socket()[](#l1.196)
self.assertEqual(s.socket.family, socket.AF_INET)[](#l1.197)
s.create_socket(self.family)[](#l1.198)
self.assertEqual(s.socket.family, self.family)[](#l1.199) SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)[](#l1.200) self.assertEqual(s.socket.type, socket.SOCK_STREAM | SOCK_NONBLOCK)[](#l1.201)
if self.family == socket.AF_UNIX:[](#l1.204)
self.skipTest("Not applicable to AF_UNIX sockets.")[](#l1.205) s1 = asyncore.dispatcher()[](#l1.206)
s1.create_socket()[](#l1.207)
s1.bind((HOST, 0))[](#l1.208)
s1.create_socket(self.family)[](#l1.209)
s1.bind(self.addr)[](#l1.210) s1.listen(5)[](#l1.211) port = s1.socket.getsockname()[1][](#l1.212)
s2.create_socket()[](#l1.215)
s2.create_socket(self.family)[](#l1.216) # EADDRINUSE indicates the socket was correctly bound[](#l1.217)
self.assertRaises(socket.error, s2.bind, (HOST, port))[](#l1.218)
self.assertRaises(socket.error, s2.bind, (self.addr[0], port))[](#l1.219)
def test_set_reuse_addr(self):
sock = socket.socket()[](#l1.222)
if self.family == socket.AF_UNIX:[](#l1.223)
self.skipTest("Not applicable to AF_UNIX sockets.")[](#l1.224)
sock = socket.socket(self.family)[](#l1.225) try:[](#l1.226) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)[](#l1.227) except socket.error:[](#l1.228)
@@ -719,11 +732,11 @@ class BaseTestAPI(unittest.TestCase): else: # if SO_REUSEADDR succeeded for sock we expect asyncore # to do the same
s = asyncore.dispatcher(socket.socket())[](#l1.233)
s = asyncore.dispatcher(socket.socket(self.family))[](#l1.234) self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,[](#l1.235) socket.SO_REUSEADDR))[](#l1.236) s.socket.close()[](#l1.237)
s.create_socket()[](#l1.238)
s.create_socket(self.family)[](#l1.239) s.set_reuse_addr()[](#l1.240) self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,[](#l1.241) socket.SO_REUSEADDR))[](#l1.242)
@@ -731,18 +744,44 @@ class BaseTestAPI(unittest.TestCase): sock.close() -class TestAPI_UseSelect(BaseTestAPI): +class TestAPI_UseIPv4Sockets(BaseTestAPI):
+ +@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') +class TestAPI_UseIPv6Sockets(BaseTestAPI):
+ +@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'Unix sockets required') +class TestAPI_UseUnixSockets(BaseTestAPI):
+ +class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets): use_poll = False @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') -class TestAPI_UsePoll(BaseTestAPI): +class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets): use_poll = True +class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets):
+ +@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') +class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets):
def test_main(): tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
DispatcherWithSendTests_UsePoll, TestAPI_UseSelect,[](#l1.284)
TestAPI_UsePoll, FileWrapperTest][](#l1.285)
DispatcherWithSendTests_UsePoll, FileWrapperTest,[](#l1.286)
TestAPI_UseIPv4Select, TestAPI_UseIPv4Poll, TestAPI_UseIPv6Select,[](#l1.287)
run_unittest(*tests) if name == "main":TestAPI_UseIPv6Poll, TestAPI_UseUnixSockets][](#l1.288)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -1221,6 +1221,8 @@ Extension Modules Tests ----- +- Issue #12656: Add tests for IPv6 and Unix sockets to test_asyncore. +