cpython: e9ea679a92fa (original) (raw)
--- a/Lib/test/test_urllib2.py +++ b/Lib/test/test_urllib2.py @@ -23,6 +23,7 @@ import http.client
CacheFTPHandler (hard to write)
parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
+ class TrivialTests(unittest.TestCase): def test___all__(self): @@ -73,6 +74,7 @@ class TrivialTests(unittest.TestCase): err = urllib.error.URLError('reason') self.assertIn(err.reason, str(err)) + class RequestHdrsTests(unittest.TestCase): def test_request_headers_dict(self): @@ -132,7 +134,6 @@ class RequestHdrsTests(unittest.TestCase req.remove_header("Unredirected-spam") self.assertFalse(req.has_header("Unredirected-spam")) - def test_password_manager(self): mgr = urllib.request.HTTPPasswordMgr() add = mgr.add_password @@ -236,43 +237,60 @@ class RequestHdrsTests(unittest.TestCase class MockOpener: addheaders = [] + def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.req, self.data, self.timeout = req, data, timeout + def error(self, proto, *args): self.proto, self.args = proto, args + class MockFile:
+ class MockHeaders(dict): def getheaders(self, name): return list(self.values()) + class MockResponse(io.StringIO): def init(self, code, msg, headers, data, url=None): io.StringIO.init(self, data) self.code, self.msg, self.headers, self.url = code, msg, headers, url + def info(self): return self.headers + def geturl(self): return self.url + class MockCookieJar: def add_cookie_header(self, request): self.ach_req = request + def extract_cookies(self, response, request): self.ec_req, self.ec_r = request, response + class FakeMethod: def init(self, meth_name, action, handle): self.meth_name = meth_name self.handle = handle self.action = action + def call(self, *args): return self.handle(self.meth_name, self.action, *args) + class MockHTTPResponse(io.IOBase): def init(self, fp, msg, status, reason): self.fp = fp @@ -326,24 +344,31 @@ class MockHTTPClass: self.data = body if self.raise_on_endheaders: raise OSError() + def getresponse(self): return MockHTTPResponse(MockFile(), {}, 200, "OK") def close(self): pass + class MockHandler: # useful for testing handler machinery # see add_ordered_mock_handlers() docstring handler_order = 500 + def init(self, methods): self._define_methods(methods) + def _define_methods(self, methods): for spec in methods:
if len(spec) == 2: name, action = spec[](#l1.113)
else: name, action = spec, None[](#l1.114)
if len(spec) == 2:[](#l1.115)
name, action = spec[](#l1.116)
else:[](#l1.117)
name, action = spec, None[](#l1.118) meth = FakeMethod(name, action, self.handle)[](#l1.119) setattr(self.__class__, name, meth)[](#l1.120)
+ def handle(self, fn_name, action, *args, **kwds): self.parent.calls.append((self, fn_name, args, kwds)) if action is None: @@ -366,16 +391,21 @@ class MockHandler: elif action == "raise": raise urllib.error.URLError("blah") assert False
+ def add_parent(self, parent): self.parent = parent self.parent.calls = [] + def lt(self, other): if not hasattr(other, "handler_order"): # No handler_order, leave in original order. Yuck. return True return self.handler_order < other.handler_order + def add_ordered_mock_handlers(opener, meth_spec): """Create MockHandlers and add them to an OpenerDirector. @@ -398,7 +428,9 @@ def add_ordered_mock_handlers(opener, me handlers = [] count = 0 for meths in meth_spec:
class MockHandlerSubclass(MockHandler): pass[](#l1.152)
class MockHandlerSubclass(MockHandler):[](#l1.153)
pass[](#l1.154)
+ h = MockHandlerSubclass(meths) h.handler_order += count h.add_parent(opener) @@ -407,12 +439,14 @@ def add_ordered_mock_handlers(opener, me opener.add_handler(h) return handlers + def build_test_opener(*handler_instances): opener = OpenerDirector() for h in handler_instances: opener.add_handler(h) return opener + class MockHTTPHandler(urllib.request.BaseHandler): # useful for testing redirections and auth # sends supplied headers and code as first response @@ -421,9 +455,11 @@ class MockHTTPHandler(urllib.request.Bas self.code = code self.headers = headers self.reset() + def reset(self): self._count = 0 self.requests = [] + def http_open(self, req): import email, http.client, copy self.requests.append(copy.deepcopy(req)) @@ -438,6 +474,7 @@ class MockHTTPHandler(urllib.request.Bas msg = email.message_from_string("\r\n\r\n") return MockResponse(200, "OK", msg, "", req.get_full_url()) + class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): # Useful for testing the Proxy-Authorization request by verifying the # properties of httpcon @@ -468,12 +505,14 @@ class MockHTTPHandlerCheckAuth(urllib.re return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) + class MockPasswordManager: def add_password(self, realm, uri, user, password): self.realm = realm self.url = uri self.user = user self.password = password + def find_user_password(self, realm, authuri): self.target_realm = realm self.target_url = authuri @@ -538,11 +577,11 @@ class OpenerDirectorTests(unittest.TestC def test_handler_order(self): o = OpenerDirector() handlers = []
for meths, handler_order in [[](#l1.213)
([("http_open", "return self")], 500),[](#l1.214)
(["http_open"], 0),[](#l1.215)
]:[](#l1.216)
class MockHandlerSubclass(MockHandler): pass[](#l1.217)
for meths, handler_order in [([("http_open", "return self")], 500),[](#l1.218)
(["http_open"], 0)]:[](#l1.219)
class MockHandlerSubclass(MockHandler):[](#l1.220)
pass[](#l1.221)
+ h = MockHandlerSubclass(meths) h.handler_order = handler_order handlers.append(h) @@ -580,7 +619,8 @@ class OpenerDirectorTests(unittest.TestC handlers = add_ordered_mock_handlers(o, meth_spec) class Unknown:
def __eq__(self, other): return True[](#l1.230)
def __eq__(self, other):[](#l1.231)
return True[](#l1.232)
req = Request("http://example.com/")[](#l1.234) o.open(req) @@ -593,7 +633,6 @@ class OpenerDirectorTests(unittest.TestC self.assertEqual((handler, method_name), got[:2]) self.assertEqual(args, got[2]) - def test_processors(self): # *_request / *_response methods get called appropriately o = OpenerDirector() @@ -629,6 +668,7 @@ class OpenerDirectorTests(unittest.TestC if args[1] is not None: self.assertIsInstance(args[1], MockResponse) + def sanepathname2url(path): try: path.encode("utf-8") @@ -640,18 +680,25 @@ def sanepathname2url(path): # XXX don't ask me about the mac... return urlpath + class HandlerTests(unittest.TestCase): def test_ftp(self): class MockFTPWrapper:
def __init__(self, data): self.data = data[](#l1.261)
def __init__(self, data):[](#l1.262)
self.data = data[](#l1.263)
+ def retrfile(self, filename, filetype): self.filename, self.filetype = filename, filetype return io.StringIO(self.data), len(self.data)
def close(self): pass[](#l1.268)
def close(self):[](#l1.270)
pass[](#l1.271)
class NullFTPHandler(urllib.request.FTPHandler):
def __init__(self, data): self.data = data[](#l1.274)
def __init__(self, data):[](#l1.275)
self.data = data[](#l1.276)
+ def connect_ftp(self, user, passwd, host, port, dirs, timeout=socket.GLOBAL_DEFAULT_TIMEOUT): self.user, self.passwd = user, passwd @@ -889,7 +936,7 @@ class HandlerTests(unittest.TestCase): self.assertRaises(ValueError, h.do_request, req) else: newreq = h.do_request_(req)
self.assertEqual(int(newreq.get_header('Content-length')),30)[](#l1.285)
self.assertEqual(int(newreq.get_header('Content-length')), 30)[](#l1.286)
file_obj.close() @@ -922,12 +969,12 @@ class HandlerTests(unittest.TestCase): # Check whether host is determined correctly if there is no proxy np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")[](#l1.294)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com")[](#l1.295)
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)[](#l1.298)
ds_req.set_proxy("someproxy:3128", None)[](#l1.299) p_ds_req = h.do_request_(ds_req)[](#l1.300)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")[](#l1.301)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com")[](#l1.302)
def test_full_url_setter(self): # Checks to ensure that components are set correctly after setting the @@ -969,15 +1016,14 @@ class HandlerTests(unittest.TestCase): weird_url = 'http://www.python.org?getspam'[](#l1.307) req = Request(weird_url) newreq = h.do_request_(req)
self.assertEqual(newreq.host,'www.python.org')[](#l1.310)
self.assertEqual(newreq.selector,'/?getspam')[](#l1.311)
self.assertEqual(newreq.host, 'www.python.org')[](#l1.312)
self.assertEqual(newreq.selector, '/?getspam')[](#l1.313)
url_without_path = 'http://www.python.org'[](#l1.315) req = Request(url_without_path) newreq = h.do_request_(req)
self.assertEqual(newreq.host,'www.python.org')[](#l1.318)
self.assertEqual(newreq.selector,'')[](#l1.319)
self.assertEqual(newreq.host, 'www.python.org')[](#l1.321)
self.assertEqual(newreq.selector, '')[](#l1.322)
def test_errors(self): h = urllib.request.HTTPErrorProcessor() @@ -1064,6 +1110,7 @@ class HandlerTests(unittest.TestCase): # loop detection req = Request(from_url) req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT + def redirect(h, req, url=to_url): h.http_error_302(req, MockFile(), 302, "Blah", MockHeaders({"location": url})) @@ -1094,7 +1141,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_redirections) - def test_invalid_redirect(self): from_url = "http://example.com/a.html"[](#l1.340) valid_schemes = ['http','https','ftp'] @@ -1197,7 +1243,6 @@ class HandlerTests(unittest.TestCase): self.assertEqual(req.host, "www.python.org")[](#l1.343) del os.environ['no_proxy'] - def test_proxy_https(self): o = OpenerDirector() ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) @@ -1221,21 +1266,21 @@ class HandlerTests(unittest.TestCase): https_handler = MockHTTPSHandler() o.add_handler(https_handler) req = Request("https://www.example.com/")[](#l1.353)
req.add_header("Proxy-Authorization","FooBar")[](#l1.354)
req.add_header("User-Agent","Grail")[](#l1.355)
req.add_header("Proxy-Authorization", "FooBar")[](#l1.356)
req.add_header("User-Agent", "Grail")[](#l1.357) self.assertEqual(req.host, "www.example.com")[](#l1.358) self.assertIsNone(req._tunnel_host)[](#l1.359) o.open(req)[](#l1.360) # Verify Proxy-Authorization gets tunneled to request.[](#l1.361) # httpsconn req_headers do not have the Proxy-Authorization header but[](#l1.362) # the req will have.[](#l1.363)
self.assertNotIn(("Proxy-Authorization","FooBar"),[](#l1.364)
self.assertNotIn(("Proxy-Authorization", "FooBar"),[](#l1.365) https_handler.httpconn.req_headers)[](#l1.366)
self.assertIn(("User-Agent","Grail"),[](#l1.367)
self.assertIn(("User-Agent", "Grail"),[](#l1.368) https_handler.httpconn.req_headers)[](#l1.369) self.assertIsNotNone(req._tunnel_host)[](#l1.370) self.assertEqual(req.host, "proxy.example.com:3128")[](#l1.371)
self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")[](#l1.372)
self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")[](#l1.373)
# TODO: This should be only for OSX @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") @@ -1267,7 +1312,7 @@ class HandlerTests(unittest.TestCase): realm = "ACME Widget Store" http_handler = MockHTTPHandler( 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
(quote_char, realm, quote_char) )[](#l1.381)
(quote_char, realm, quote_char))[](#l1.382) opener.add_handler(auth_handler)[](#l1.383) opener.add_handler(http_handler)[](#l1.384) self._test_basic_auth(opener, auth_handler, "Authorization",[](#l1.385)
@@ -1325,13 +1370,16 @@ class HandlerTests(unittest.TestCase): def init(self): OpenerDirector.init(self) self.recorded = [] + def record(self, info): self.recorded.append(info) + class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("digest") urllib.request.HTTPDigestAuthHandler.http_error_401(self, *args, **kwds) + class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): def http_error_401(self, *args, **kwds): self.parent.record("basic") @@ -1367,7 +1415,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: Kerberos\r\n\r\n') opener.add_handler(digest_auth_handler) opener.add_handler(http_handler)
self.assertRaises(ValueError,opener.open,"http://www.example.com")[](#l1.407)
self.assertRaises(ValueError, opener.open, "http://www.example.com")[](#l1.408)
def test_unsupported_auth_basic_handler(self): # While using BasicAuthHandler @@ -1377,7 +1425,7 @@ class HandlerTests(unittest.TestCase): 401, 'WWW-Authenticate: NTLM\r\n\r\n') opener.add_handler(basic_auth_handler) opener.add_handler(http_handler)
self.assertRaises(ValueError,opener.open,"http://www.example.com")[](#l1.416)
self.assertRaises(ValueError, opener.open, "http://www.example.com")[](#l1.417)
def _test_basic_auth(self, opener, auth_handler, auth_header, realm, http_handler, password_manager, @@ -1510,6 +1558,7 @@ class HandlerTests(unittest.TestCase): self.assertTrue(conn.fakesock.closed, "Connection not closed") + class MiscTests(unittest.TestCase): def opener_has_handler(self, opener, handler_class): @@ -1517,11 +1566,16 @@ class MiscTests(unittest.TestCase): for h in opener.handlers)) def test_build_opener(self):
class MyHTTPHandler(urllib.request.HTTPHandler): pass[](#l1.433)
class MyHTTPHandler(urllib.request.HTTPHandler):[](#l1.434)
pass[](#l1.435)
+ class FooHandler(urllib.request.BaseHandler):
def foo_open(self): pass[](#l1.438)
def foo_open(self):[](#l1.439)
pass[](#l1.440)
+ class BarHandler(urllib.request.BaseHandler):
def bar_open(self): pass[](#l1.443)
def bar_open(self):[](#l1.444)
pass[](#l1.445)
build_opener = urllib.request.build_opener @@ -1548,7 +1602,9 @@ class MiscTests(unittest.TestCase): self.opener_has_handler(o, urllib.request.HTTPHandler) # Issue2670: multiple handlers sharing the same base class
class MyOtherHTTPHandler(urllib.request.HTTPHandler): pass[](#l1.453)
class MyOtherHTTPHandler(urllib.request.HTTPHandler):[](#l1.454)
pass[](#l1.455)
+ o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) self.opener_has_handler(o, MyHTTPHandler) self.opener_has_handler(o, MyOtherHTTPHandler) @@ -1584,6 +1640,8 @@ class MiscTests(unittest.TestCase): self.assertEqual(err.headers, 'Content-Length: 42') expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) self.assertEqual(str(err), expected_errmsg)
expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)[](#l1.464)
self.assertEqual(repr(err), expected_errmsg)[](#l1.465)
def test_parse_proxy(self): parse_proxy_test_cases = [ @@ -1622,9 +1680,10 @@ class MiscTests(unittest.TestCase): self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), + class RequestTests(unittest.TestCase): class PutRequest(Request):
method='PUT'[](#l1.476)
method = 'PUT'[](#l1.477)
def setUp(self): self.get = Request("http://www.python.org/~jeremy/")[](#l1.480) @@ -1713,7 +1772,7 @@ class RequestTests(unittest.TestCase): def test_url_fullurl_get_full_url(self): urls = ['http://docs.python.org',[](#l1.483) 'http://docs.python.org/library/urllib2.html#OK',[](#l1.484)
'http://www.python.org/?qs=query#fragment=true' ][](#l1.485)
'http://www.python.org/?qs=query#fragment=true'][](#l1.486) for url in urls:[](#l1.487) req = Request(url)[](#l1.488) self.assertEqual(req.get_full_url(), req.full_url)[](#l1.489)
--- a/Lib/urllib/error.py +++ b/Lib/urllib/error.py @@ -35,6 +35,7 @@ class URLError(OSError): def str(self): return '<urlopen error %s>' % self.reason + class HTTPError(URLError, urllib.response.addinfourl): """Raised when HTTP error occurs, but also acts like non-error return""" __super_init = urllib.response.addinfourl.init @@ -55,6 +56,9 @@ class HTTPError(URLError, urllib.respons def str(self): return 'HTTP Error %s: %s' % (self.code, self.msg)
+ # since URLError specifies a .reason attribute, HTTPError should also # provide this attribute. See issue13211 for discussion. @property @@ -69,8 +73,9 @@ class HTTPError(URLError, urllib.respons def headers(self, headers): self.hdrs = headers -# exception raised when downloaded size does not match content-length + class ContentTooShortError(URLError):
- """Exception raised when downloaded size does not match content-length.""" def init(self, message, content): URLError.init(self, message) self.content = content