import mock from netlib import http from netlib import odict from netlib import tutils from netlib import utils from netlib.http import semantics from netlib.http.semantics import CONTENT_MISSING class TestProtocolMixin(object): @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response") @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request") def test_assemble_request(self, mock_request_method, mock_response_method): p = semantics.ProtocolMixin() p.assemble(tutils.treq()) assert mock_request_method.called assert not mock_response_method.called @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response") @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request") def test_assemble_response(self, mock_request_method, mock_response_method): p = semantics.ProtocolMixin() p.assemble(tutils.tresp()) assert not mock_request_method.called assert mock_response_method.called def test_assemble_foo(self): p = semantics.ProtocolMixin() tutils.raises(ValueError, p.assemble, 'foo') class TestRequest(object): def test_repr(self): r = tutils.treq() assert repr(r) def test_headers(self): tutils.raises(AssertionError, semantics.Request, 'form_in', 'method', 'scheme', 'host', 'port', 'path', (1, 1), 'foobar', ) req = semantics.Request( 'form_in', 'method', 'scheme', 'host', 'port', 'path', (1, 1), ) assert isinstance(req.headers, http.Headers) def test_equal(self): a = tutils.treq() b = tutils.treq() assert a == b assert not a == 'foo' assert not b == 'foo' assert not 'foo' == a assert not 'foo' == b def test_legacy_first_line(self): req = tutils.treq() assert req.legacy_first_line('relative') == "GET /path HTTP/1.1" assert req.legacy_first_line('authority') == "GET address:22 HTTP/1.1" assert req.legacy_first_line('absolute') == "GET http://address:22/path HTTP/1.1" tutils.raises(http.HttpError, req.legacy_first_line, 'foobar') def test_anticache(self): req = tutils.treq() req.headers["If-Modified-Since"] = "foo" req.headers["If-None-Match"] = "bar" req.anticache() assert "If-Modified-Since" not in req.headers assert "If-None-Match" not in req.headers def test_anticomp(self): req = tutils.treq() req.headers["Accept-Encoding"] = "foobar" req.anticomp() assert req.headers["Accept-Encoding"] == "identity" def test_constrain_encoding(self): req = tutils.treq() req.headers["Accept-Encoding"] = "identity, gzip, foo" req.constrain_encoding() assert "foo" not in req.headers["Accept-Encoding"] def test_update_host(self): req = tutils.treq() req.headers["Host"] = "" req.host = "foobar" req.update_host_header() assert req.headers["Host"] == "foobar" def test_get_form(self): req = tutils.treq() assert req.get_form() == odict.ODict() @mock.patch("netlib.http.semantics.Request.get_form_multipart") @mock.patch("netlib.http.semantics.Request.get_form_urlencoded") def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart): req = tutils.treq() assert req.get_form() == odict.ODict() req = tutils.treq() req.body = "foobar" req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED req.get_form() assert req.get_form_urlencoded.called assert not req.get_form_multipart.called @mock.patch("netlib.http.semantics.Request.get_form_multipart") @mock.patch("netlib.http.semantics.Request.get_form_urlencoded") def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): req = tutils.treq() req.body = "foobar" req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART req.get_form() assert not req.get_form_urlencoded.called assert req.get_form_multipart.called def test_get_form_urlencoded(self): req = tutils.treq("foobar") assert req.get_form_urlencoded() == odict.ODict() req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED assert req.get_form_urlencoded() == odict.ODict(utils.urldecode(req.body)) def test_get_form_multipart(self): req = tutils.treq("foobar") assert req.get_form_multipart() == odict.ODict() req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART assert req.get_form_multipart() == odict.ODict( utils.multipartdecode( req.headers, req.body ) ) def test_set_form_urlencoded(self): req = tutils.treq() req.set_form_urlencoded(odict.ODict([('foo', 'bar'), ('rab', 'oof')])) assert req.headers["Content-Type"] == semantics.HDR_FORM_URLENCODED assert req.body def test_get_path_components(self): req = tutils.treq() assert req.get_path_components() # TODO: add meaningful assertions def test_set_path_components(self): req = tutils.treq() req.set_path_components(["foo", "bar"]) # TODO: add meaningful assertions def test_get_query(self): req = tutils.treq() assert req.get_query().lst == [] req.url = "http://localhost:80/foo?bar=42" assert req.get_query().lst == [("bar", "42")] def test_set_query(self): req = tutils.treq() req.set_query(odict.ODict([])) def test_pretty_host(self): r = tutils.treq() assert r.pretty_host(True) == "address" assert r.pretty_host(False) == "address" r.headers["host"] = "other" assert r.pretty_host(True) == "other" assert r.pretty_host(False) == "address" r.host = None assert r.pretty_host(True) == "other" assert r.pretty_host(False) is None del r.headers["host"] assert r.pretty_host(True) is None assert r.pretty_host(False) is None # Invalid IDNA r.headers["host"] = ".disqus.com" assert r.pretty_host(True) == ".disqus.com" def test_pretty_url(self): req = tutils.treq() req.form_out = "authority" assert req.pretty_url(True) == "address:22" assert req.pretty_url(False) == "address:22" req.form_out = "relative" assert req.pretty_url(True) == "http://address:22/path" assert req.pretty_url(False) == "http://address:22/path" def test_get_cookies_none(self): headers = http.Headers() r = tutils.treq() r.headers = headers assert len(r.get_cookies()) == 0 def test_get_cookies_single(self): r = tutils.treq() r.headers = http.Headers(cookie="cookiename=cookievalue") result = r.get_cookies() assert len(result) == 1 assert result['cookiename'] == ['cookievalue'] def test_get_cookies_double(self): r = tutils.treq() r.headers = http.Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") result = r.get_cookies() assert len(result) == 2 assert result['cookiename'] == ['cookievalue'] assert result['othercookiename'] == ['othercookievalue'] def test_get_cookies_withequalsign(self): r = tutils.treq() r.headers = http.Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") result = r.get_cookies() assert len(result) == 2 assert result['cookiename'] == ['coo=kievalue'] assert result['othercookiename'] == ['othercookievalue'] def test_set_cookies(self): r = tutils.treq() r.headers = http.Headers(cookie="cookiename=cookievalue") result = r.get_cookies() result["cookiename"] = ["foo"] r.set_cookies(result) assert r.get_cookies()["cookiename"] == ["foo"] def test_set_url(self): r = tutils.treq_absolute() r.url = "https://otheraddress:42/ORLY" assert r.scheme == "https" assert r.host == "otheraddress" assert r.port == 42 assert r.path == "/ORLY" try: r.url = "//localhost:80/foo@bar" assert False except: assert True # def test_asterisk_form_in(self): # f = tutils.tflow(req=None) # protocol = mock_protocol("OPTIONS * HTTP/1.1") # f.request = HTTPRequest.from_protocol(protocol) # # assert f.request.form_in == "relative" # f.request.host = f.server_conn.address.host # f.request.port = f.server_conn.address.port # f.request.scheme = "http" # assert protocol.assemble(f.request) == ( # "OPTIONS * HTTP/1.1\r\n" # "Host: address:22\r\n" # "Content-Length: 0\r\n\r\n") # # def test_relative_form_in(self): # protocol = mock_protocol("GET /foo\xff HTTP/1.1") # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) # # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") # r = HTTPRequest.from_protocol(protocol) # assert r.headers["Upgrade"] == ["h2c"] # # def test_expect_header(self): # protocol = mock_protocol( # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") # r = HTTPRequest.from_protocol(protocol) # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" # assert r.content == "foo" # assert protocol.tcp_handler.rfile.read(3) == "bar" # # def test_authority_form_in(self): # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) # # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") # r = HTTPRequest.from_protocol(protocol) # r.scheme, r.host, r.port = "http", "address", 22 # assert protocol.assemble(r) == ( # "CONNECT address:22 HTTP/1.1\r\n" # "Host: address:22\r\n" # "Content-Length: 0\r\n\r\n") # assert r.pretty_url(False) == "address:22" # # def test_absolute_form_in(self): # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) # # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") # r = HTTPRequest.from_protocol(protocol) # assert protocol.assemble(r) == ( # "GET http://address:22/ HTTP/1.1\r\n" # "Host: address:22\r\n" # "Content-Length: 0\r\n\r\n") # # def test_http_options_relative_form_in(self): # """ # Exercises fix for Issue #392. # """ # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") # r = HTTPRequest.from_protocol(protocol) # r.host = 'address' # r.port = 80 # r.scheme = "http" # assert protocol.assemble(r) == ( # "OPTIONS /secret/resource HTTP/1.1\r\n" # "Host: address\r\n" # "Content-Length: 0\r\n\r\n") # # def test_http_options_absolute_form_in(self): # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") # r = HTTPRequest.from_protocol(protocol) # r.host = 'address' # r.port = 80 # r.scheme = "http" # assert protocol.assemble(r) == ( # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" # "Host: address\r\n" # "Content-Length: 0\r\n\r\n") class TestEmptyRequest(object): def test_init(self): req = semantics.EmptyRequest() assert req class TestResponse(object): def test_headers(self): tutils.raises(AssertionError, semantics.Response, (1, 1), 200, headers='foobar', ) resp = semantics.Response( (1, 1), 200, ) assert isinstance(resp.headers, http.Headers) def test_equal(self): a = tutils.tresp() b = tutils.tresp() assert a == b assert not a == 'foo' assert not b == 'foo' assert not 'foo' == a assert not 'foo' == b def test_repr(self): r = tutils.tresp() assert "unknown content type" in repr(r) r.headers["content-type"] = "foo" assert "foo" in repr(r) assert repr(tutils.tresp(content=CONTENT_MISSING)) def test_get_cookies_none(self): resp = tutils.tresp() resp.headers = http.Headers() assert not resp.get_cookies() def test_get_cookies_simple(self): resp = tutils.tresp() resp.headers = http.Headers(set_cookie="cookiename=cookievalue") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result assert result["cookiename"][0] == ["cookievalue", odict.ODict()] def test_get_cookies_with_parameters(self): resp = tutils.tresp() resp.headers = http.Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result assert result["cookiename"][0][0] == "cookievalue" attrs = result["cookiename"][0][1] assert len(attrs) == 4 assert attrs["domain"] == ["example.com"] assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] assert attrs["path"] == ["/"] assert attrs["httponly"] == [None] def test_get_cookies_no_value(self): resp = tutils.tresp() resp.headers = http.Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") result = resp.get_cookies() assert len(result) == 1 assert "cookiename" in result assert result["cookiename"][0][0] == "" assert len(result["cookiename"][0][1]) == 2 def test_get_cookies_twocookies(self): resp = tutils.tresp() resp.headers = http.Headers([ ["Set-Cookie", "cookiename=cookievalue"], ["Set-Cookie", "othercookie=othervalue"] ]) result = resp.get_cookies() assert len(result) == 2 assert "cookiename" in result assert result["cookiename"][0] == ["cookievalue", odict.ODict()] assert "othercookie" in result assert result["othercookie"][0] == ["othervalue", odict.ODict()] def test_set_cookies(self): resp = tutils.tresp() v = resp.get_cookies() v.add("foo", ["bar", odict.ODictCaseless()]) resp.set_cookies(v) v = resp.get_cookies() assert len(v) == 1 assert v["foo"] == [["bar", odict.ODictCaseless()]] class TestHeaders(object): def _2host(self): return semantics.Headers( [ ["Host", "example.com"], ["host", "example.org"] ] ) def test_init(self): headers = semantics.Headers() assert len(headers) == 0 headers = semantics.Headers([["Host", "example.com"]]) assert len(headers) == 1 assert headers["Host"] == "example.com" headers = semantics.Headers(Host="example.com") assert len(headers) == 1 assert headers["Host"] == "example.com" headers = semantics.Headers( [["Host", "invalid"]], Host="example.com" ) assert len(headers) == 1 assert headers["Host"] == "example.com" headers = semantics.Headers( [["Host", "invalid"], ["Accept", "text/plain"]], Host="example.com" ) assert len(headers) == 2 assert headers["Host"] == "example.com" assert headers["Accept"] == "text/plain" def test_getitem(self): headers = semantics.Headers(Host="example.com") assert headers["Host"] == "example.com" assert headers["host"] == "example.com" tutils.raises(KeyError, headers.__getitem__, "Accept") headers = self._2host() assert headers["Host"] == "example.com, example.org" def test_str(self): headers = semantics.Headers(Host="example.com") assert bytes(headers) == "Host: example.com\r\n" headers = semantics.Headers([ ["Host", "example.com"], ["Accept", "text/plain"] ]) assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n" def test_setitem(self): headers = semantics.Headers() headers["Host"] = "example.com" assert "Host" in headers assert "host" in headers assert headers["Host"] == "example.com" headers["host"] = "example.org" assert "Host" in headers assert "host" in headers assert headers["Host"] == "example.org" headers["accept"] = "text/plain" assert len(headers) == 2 assert "Accept" in headers assert "Host" in headers headers = self._2host() assert len(headers.fields) == 2 headers["Host"] = "example.com" assert len(headers.fields) == 1 assert "Host" in headers def test_delitem(self): headers = semantics.Headers(Host="example.com") assert len(headers) == 1 del headers["host"] assert len(headers) == 0 try: del headers["host"] except KeyError: assert True else: assert False headers = self._2host() del headers["Host"] assert len(headers) == 0 def test_keys(self): headers = semantics.Headers(Host="example.com") assert len(headers.keys()) == 1 assert headers.keys()[0] == "Host" headers = self._2host() assert len(headers.keys()) == 1 assert headers.keys()[0] == "Host" def test_eq_ne(self): headers1 = semantics.Headers(Host="example.com") headers2 = semantics.Headers(host="example.com") assert not (headers1 == headers2) assert headers1 != headers2 headers1 = semantics.Headers(Host="example.com") headers2 = semantics.Headers(Host="example.com") assert headers1 == headers2 assert not (headers1 != headers2) assert headers1 != 42 def test_get_all(self): headers = self._2host() assert headers.get_all("host") == ["example.com", "example.org"] assert headers.get_all("accept") == [] def test_set_all(self): headers = semantics.Headers(Host="example.com") headers.set_all("Accept", ["text/plain"]) assert len(headers) == 2 assert "accept" in headers headers = self._2host() headers.set_all("Host", ["example.org"]) assert headers["host"] == "example.org" headers.set_all("Host", ["example.org", "example.net"]) assert headers["host"] == "example.org, example.net" def test_state(self): headers = self._2host() assert len(headers.get_state()) == 2 assert headers == semantics.Headers.from_state(headers.get_state()) headers2 = semantics.Headers() assert headers != headers2 headers2.load_state(headers.get_state()) assert headers == headers2