cpython: 17d688dedfca (original) (raw)
Mercurial > cpython
changeset 102798:17d688dedfca
Issue #27614: Merge test_docxmlrpc from 3.5 [#27614]
Martin Panter vadmium+py@gmail.com | |
---|---|
date | Sat, 20 Aug 2016 07:39:15 +0000 |
parents | e5777c5d108c(current diff)397f05044172(diff) |
children | 1455851e7332 |
files | |
diffstat | 1 files changed, 11 insertions(+), 30 deletions(-)[+] [-] Lib/test/test_docxmlrpc.py 41 |
line wrap: on
line diff
--- a/Lib/test/test_docxmlrpc.py +++ b/Lib/test/test_docxmlrpc.py @@ -3,12 +3,8 @@ import http.client import sys from test import support threading = support.import_module('threading') -import time -import socket import unittest -PORT = None - def make_request_and_skipIf(condition, reason): # If we skip the test, we have to make a request because # the server created in setUp blocks expecting one to come in. @@ -23,13 +19,10 @@ def make_request_and_skipIf(condition, r return decorator -def server(evt, numrequests): +def make_server(): serv = DocXMLRPCServer(("localhost", 0), logRequests=False) try:
global PORT[](#l1.25)
PORT = serv.socket.getsockname()[1][](#l1.26)
- # Add some documentation serv.set_server_title("DocXMLRPCServer Test Documentation") serv.set_server_name("DocXMLRPCServer Test Docs") @@ -66,43 +59,31 @@ def server(evt, numrequests): serv.register_function(lambda x, y: x-y) serv.register_function(annotation) serv.register_instance(ClassWithAnnotation()) -
while numrequests > 0:[](#l1.36)
serv.handle_request()[](#l1.37)
numrequests -= 1[](#l1.38)
- except socket.timeout:
pass[](#l1.40)
- finally:
PORT = None[](#l1.45)
evt.set()[](#l1.46)
raise[](#l1.47)
class DocXMLRPCHTTPGETServer(unittest.TestCase): def setUp(self):
self._threads = support.threading_setup()[](#l1.51) # Enable server feedback[](#l1.52) DocXMLRPCServer._send_traceback_header = True[](#l1.53)
self.evt = threading.Event()[](#l1.55)
threading.Thread(target=server, args=(self.evt, 1)).start()[](#l1.56)
self.serv = make_server()[](#l1.57)
self.thread = threading.Thread(target=self.serv.serve_forever)[](#l1.58)
self.thread.start()[](#l1.59)
# wait for port to be assigned[](#l1.61)
deadline = time.monotonic() + 10.0[](#l1.62)
while PORT is None:[](#l1.63)
time.sleep(0.010)[](#l1.64)
if time.monotonic() > deadline:[](#l1.65)
break[](#l1.66)
PORT = self.serv.server_address[1][](#l1.68) self.client = http.client.HTTPConnection("localhost:%d" % PORT)[](#l1.69)
def tearDown(self): self.client.close()
self.evt.wait()[](#l1.74)
- # Disable server feedback DocXMLRPCServer._send_traceback_header = False
support.threading_cleanup(*self._threads)[](#l1.78)
self.serv.shutdown()[](#l1.79)
self.thread.join()[](#l1.80)
self.serv.server_close()[](#l1.81)
def test_valid_get_response(self): self.client.request("GET", "/")