move bits around

This commit is contained in:
Thomas Kriechbaumer 2015-07-14 23:02:14 +02:00
parent bd5ee21284
commit f50deb7b76
28 changed files with 318 additions and 290 deletions

2
netlib/http/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from exceptions import *
from semantics import *

View File

@ -1,6 +1,7 @@
from __future__ import (absolute_import, print_function, division)
from argparse import Action, ArgumentTypeError
from . import http
from .. import http
class NullProxyAuth(object):
@ -46,7 +47,7 @@ class BasicProxyAuth(NullProxyAuth):
auth_value = headers.get(self.AUTH_HEADER, [])
if not auth_value:
return False
parts = http.parse_http_basic_auth(auth_value[0])
parts = http.http1.parse_http_basic_auth(auth_value[0])
if not parts:
return False
scheme, username, password = parts

View File

@ -1,3 +1,7 @@
import re
from .. import odict
"""
A flexible module for cookie parsing and manipulation.
@ -22,10 +26,6 @@ variants. Serialization follows RFC6265.
# TODO
# - Disallow LHS-only Cookie values
import re
import odict
def _read_until(s, start, term):
"""

View File

@ -0,0 +1,9 @@
class HttpError(Exception):
def __init__(self, code, message):
super(HttpError, self).__init__(message)
self.code = code
class HttpErrorConnClosed(HttpError):
pass

View File

@ -0,0 +1 @@
from protocol import *

View File

@ -1,37 +1,13 @@
from __future__ import (absolute_import, print_function, division)
import binascii
import collections
import string
import urlparse
import binascii
import sys
from . import odict, utils, tcp, http_semantics, http_status
import urlparse
class HttpError(Exception):
def __init__(self, code, message):
super(HttpError, self).__init__(message)
self.code = code
class HttpErrorConnClosed(HttpError):
pass
def _is_valid_port(port):
if not 0 <= port <= 65535:
return False
return True
def _is_valid_host(host):
try:
host.decode("idna")
except ValueError:
return False
if "\0" in host:
return None
return True
from netlib import odict, utils, tcp, http
from .. import status_codes
from ..exceptions import *
def get_request_line(fp):
@ -44,51 +20,6 @@ def get_request_line(fp):
line = fp.readline()
return line
def parse_url(url):
"""
Returns a (scheme, host, port, path) tuple, or None on error.
Checks that:
port is an integer 0-65535
host is a valid IDNA-encoded hostname with no null-bytes
path is valid ASCII
"""
try:
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
except ValueError:
return None
if not scheme:
return None
if '@' in netloc:
# FIXME: Consider what to do with the discarded credentials here Most
# probably we should extend the signature to return these as a separate
# value.
_, netloc = string.rsplit(netloc, '@', maxsplit=1)
if ':' in netloc:
host, port = string.rsplit(netloc, ':', maxsplit=1)
try:
port = int(port)
except ValueError:
return None
else:
host = netloc
if scheme == "https":
port = 443
else:
port = 80
path = urlparse.urlunparse(('', '', path, params, query, fragment))
if not path.startswith("/"):
path = "/" + path
if not _is_valid_host(host):
return None
if not utils.isascii(path):
return None
if not _is_valid_port(port):
return None
return scheme, host, port, path
def read_headers(fp):
"""
Read a set of headers from a file pointer. Stop once a blank line is
@ -193,6 +124,7 @@ def parse_http_protocol(s):
def parse_http_basic_auth(s):
# TODO: check if this is HTTP/1 only - otherwise move it to netlib.http.semantics
words = s.split()
if len(words) != 2:
return None
@ -208,6 +140,7 @@ def parse_http_basic_auth(s):
def assemble_http_basic_auth(scheme, username, password):
# TODO: check if this is HTTP/1 only - otherwise move it to netlib.http.semantics
v = binascii.b2a_base64(username + ":" + password)
return scheme + " " + v
@ -245,9 +178,9 @@ def parse_init_connect(line):
port = int(port)
except ValueError:
return None
if not _is_valid_port(port):
if not http.is_valid_port(port):
return None
if not _is_valid_host(host):
if not http.is_valid_host(host):
return None
return host, port, httpversion
@ -258,7 +191,7 @@ def parse_init_proxy(line):
return None
method, url, httpversion = v
parts = parse_url(url)
parts = http.parse_url(url)
if not parts:
return None
scheme, host, port, path = parts
@ -421,6 +354,7 @@ def expected_http_body_size(headers, is_request, request_method, response_code):
return -1
# TODO: make this a regular class - just like Response
Request = collections.namedtuple(
"Request",
[
@ -529,7 +463,7 @@ def read_request(rfile, include_body=True, body_size_limit=None, wfile=None):
def read_response(rfile, request_method, body_size_limit, include_body=True):
"""
Return an (httpversion, code, msg, headers, content) tuple.
Returns an http.Response
By default, both response header and body are read.
If include_body=False is specified, content may be one of the
@ -538,6 +472,7 @@ def read_response(rfile, request_method, body_size_limit, include_body=True):
- "", if the response must not have a response body (e.g. it's a
response to a HEAD request)
"""
line = rfile.readline()
# Possible leftover from previous message
if line == "\r\n" or line == "\n":
@ -568,7 +503,7 @@ def read_response(rfile, request_method, body_size_limit, include_body=True):
# if include_body==False then a None content means the body should be
# read separately
content = None
return http_semantics.Response(httpversion, code, msg, headers, content)
return http.Response(httpversion, code, msg, headers, content)
def request_preamble(method, resource, http_major="1", http_minor="1"):
@ -579,5 +514,5 @@ def request_preamble(method, resource, http_major="1", http_minor="1"):
def response_preamble(code, message=None, http_major="1", http_minor="1"):
if message is None:
message = http_status.RESPONSES.get(code)
message = status_codes.RESPONSES.get(code)
return 'HTTP/%s.%s %s %s' % (http_major, http_minor, code, message)

94
netlib/http/semantics.py Normal file
View File

@ -0,0 +1,94 @@
from __future__ import (absolute_import, print_function, division)
import binascii
import collections
import string
import sys
import urlparse
from .. import utils
class Response(object):
def __init__(
self,
httpversion,
status_code,
msg,
headers,
content,
sslinfo=None,
):
self.httpversion = httpversion
self.status_code = status_code
self.msg = msg
self.headers = headers
self.content = content
self.sslinfo = sslinfo
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
return "Response(%s - %s)" % (self.status_code, self.msg)
def is_valid_port(port):
if not 0 <= port <= 65535:
return False
return True
def is_valid_host(host):
try:
host.decode("idna")
except ValueError:
return False
if "\0" in host:
return None
return True
def parse_url(url):
"""
Returns a (scheme, host, port, path) tuple, or None on error.
Checks that:
port is an integer 0-65535
host is a valid IDNA-encoded hostname with no null-bytes
path is valid ASCII
"""
try:
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
except ValueError:
return None
if not scheme:
return None
if '@' in netloc:
# FIXME: Consider what to do with the discarded credentials here Most
# probably we should extend the signature to return these as a separate
# value.
_, netloc = string.rsplit(netloc, '@', maxsplit=1)
if ':' in netloc:
host, port = string.rsplit(netloc, ':', maxsplit=1)
try:
port = int(port)
except ValueError:
return None
else:
host = netloc
if scheme == "https":
port = 443
else:
port = 80
path = urlparse.urlunparse(('', '', path, params, query, fragment))
if not path.startswith("/"):
path = "/" + path
if not is_valid_host(host):
return None
if not utils.isascii(path):
return None
if not is_valid_port(port):
return None
return scheme, host, port, path

View File

@ -1,23 +0,0 @@
class Response(object):
def __init__(
self,
httpversion,
status_code,
msg,
headers,
content,
sslinfo=None,
):
self.httpversion = httpversion
self.status_code = status_code
self.msg = msg
self.headers = headers
self.content = content
self.sslinfo = sslinfo
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
return "Response(%s - %s)" % (self.status_code, self.msg)

View File

@ -6,7 +6,7 @@ import struct
import io
from .protocol import Masker
from .. import utils, odict, tcp
from netlib import utils, odict, tcp
DEFAULT = object()

View File

@ -5,7 +5,7 @@ import os
import struct
import io
from .. import utils, odict, tcp
from netlib import utils, odict, tcp
# Colleciton of utility functions that implement small portions of the RFC6455
# WebSockets Protocol Useful for building WebSocket clients and servers.

View File

View File

@ -1,20 +1,17 @@
import cStringIO
import textwrap
import binascii
from netlib import http, http_semantics, odict, tcp
from . import tutils, tservers
def test_httperror():
e = http.HttpError(404, "Not found")
assert str(e)
from netlib import http, odict, tcp
from netlib.http.http1 import protocol
from ... import tutils, tservers
def test_has_chunked_encoding():
h = odict.ODictCaseless()
assert not http.has_chunked_encoding(h)
assert not protocol.has_chunked_encoding(h)
h["transfer-encoding"] = ["chunked"]
assert http.has_chunked_encoding(h)
assert protocol.has_chunked_encoding(h)
def test_read_chunked():
@ -25,74 +22,74 @@ def test_read_chunked():
tutils.raises(
"malformed chunked body",
http.read_http_body,
protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
assert http.read_http_body(s, h, None, "GET", None, True) == "a"
assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n")
assert http.read_http_body(s, h, None, "GET", None, True) == "a"
assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
s = cStringIO.StringIO("\r\n")
tutils.raises(
"closed prematurely",
http.read_http_body,
protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("1\r\nfoo")
tutils.raises(
"malformed chunked body",
http.read_http_body,
protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("foo\r\nfoo")
tutils.raises(
http.HttpError,
http.read_http_body,
protocol.HttpError,
protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True)
tutils.raises("too large", protocol.read_http_body, s, h, 2, "GET", None, True)
def test_connection_close():
h = odict.ODictCaseless()
assert http.connection_close((1, 0), h)
assert not http.connection_close((1, 1), h)
assert protocol.connection_close((1, 0), h)
assert not protocol.connection_close((1, 1), h)
h["connection"] = ["keep-alive"]
assert not http.connection_close((1, 1), h)
assert not protocol.connection_close((1, 1), h)
h["connection"] = ["close"]
assert http.connection_close((1, 1), h)
assert protocol.connection_close((1, 1), h)
def test_get_header_tokens():
h = odict.ODictCaseless()
assert http.get_header_tokens(h, "foo") == []
assert protocol.get_header_tokens(h, "foo") == []
h["foo"] = ["bar"]
assert http.get_header_tokens(h, "foo") == ["bar"]
assert protocol.get_header_tokens(h, "foo") == ["bar"]
h["foo"] = ["bar, voing"]
assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
assert protocol.get_header_tokens(h, "foo") == ["bar", "voing"]
h["foo"] = ["bar, voing", "oink"]
assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
assert protocol.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
def test_read_http_body_request():
h = odict.ODictCaseless()
r = cStringIO.StringIO("testing")
assert http.read_http_body(r, h, None, "GET", None, True) == ""
assert protocol.read_http_body(r, h, None, "GET", None, True) == ""
def test_read_http_body_response():
h = odict.ODictCaseless()
s = tcp.Reader(cStringIO.StringIO("testing"))
assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
def test_read_http_body():
@ -100,14 +97,14 @@ def test_read_http_body():
h = odict.ODictCaseless()
h["content-length"] = [7]
s = cStringIO.StringIO("testing")
assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
# test content length: invalid header
h["content-length"] = ["foo"]
s = cStringIO.StringIO("testing")
tutils.raises(
http.HttpError,
http.read_http_body,
protocol.HttpError,
protocol.read_http_body,
s, h, None, "GET", 200, False
)
@ -115,8 +112,8 @@ def test_read_http_body():
h["content-length"] = [-1]
s = cStringIO.StringIO("testing")
tutils.raises(
http.HttpError,
http.read_http_body,
protocol.HttpError,
protocol.read_http_body,
s, h, None, "GET", 200, False
)
@ -124,25 +121,25 @@ def test_read_http_body():
h["content-length"] = [5]
s = cStringIO.StringIO("testing")
tutils.raises(
http.HttpError,
http.read_http_body,
protocol.HttpError,
protocol.read_http_body,
s, h, 4, "GET", 200, False
)
# test content length: content length < actual content
s = cStringIO.StringIO("testing")
assert len(http.read_http_body(s, h, None, "GET", 200, False)) == 5
assert len(protocol.read_http_body(s, h, None, "GET", 200, False)) == 5
# test no content length: limit > actual content
h = odict.ODictCaseless()
s = tcp.Reader(cStringIO.StringIO("testing"))
assert len(http.read_http_body(s, h, 100, "GET", 200, False)) == 7
assert len(protocol.read_http_body(s, h, 100, "GET", 200, False)) == 7
# test no content length: limit < actual content
s = tcp.Reader(cStringIO.StringIO("testing"))
tutils.raises(
http.HttpError,
http.read_http_body,
protocol.HttpError,
protocol.read_http_body,
s, h, 4, "GET", 200, False
)
@ -150,54 +147,54 @@ def test_read_http_body():
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
s = tcp.Reader(cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n"))
assert http.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
assert protocol.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
def test_expected_http_body_size():
# gibber in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["foo"]
assert http.expected_http_body_size(h, False, "GET", 200) is None
assert protocol.expected_http_body_size(h, False, "GET", 200) is None
# negative number in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["-7"]
assert http.expected_http_body_size(h, False, "GET", 200) is None
assert protocol.expected_http_body_size(h, False, "GET", 200) is None
# explicit length
h = odict.ODictCaseless()
h["content-length"] = ["5"]
assert http.expected_http_body_size(h, False, "GET", 200) == 5
assert protocol.expected_http_body_size(h, False, "GET", 200) == 5
# no length
h = odict.ODictCaseless()
assert http.expected_http_body_size(h, False, "GET", 200) == -1
assert protocol.expected_http_body_size(h, False, "GET", 200) == -1
# no length request
h = odict.ODictCaseless()
assert http.expected_http_body_size(h, True, "GET", None) == 0
assert protocol.expected_http_body_size(h, True, "GET", None) == 0
def test_parse_http_protocol():
assert http.parse_http_protocol("HTTP/1.1") == (1, 1)
assert http.parse_http_protocol("HTTP/0.0") == (0, 0)
assert not http.parse_http_protocol("HTTP/a.1")
assert not http.parse_http_protocol("HTTP/1.a")
assert not http.parse_http_protocol("foo/0.0")
assert not http.parse_http_protocol("HTTP/x")
assert protocol.parse_http_protocol("HTTP/1.1") == (1, 1)
assert protocol.parse_http_protocol("HTTP/0.0") == (0, 0)
assert not protocol.parse_http_protocol("HTTP/a.1")
assert not protocol.parse_http_protocol("HTTP/1.a")
assert not protocol.parse_http_protocol("foo/0.0")
assert not protocol.parse_http_protocol("HTTP/x")
def test_parse_init_connect():
assert http.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
assert not http.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
assert not http.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
assert not http.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
assert not http.parse_init_connect("bogus")
assert not http.parse_init_connect("GET host.com:443 HTTP/1.0")
assert not http.parse_init_connect("CONNECT host.com443 HTTP/1.0")
assert not http.parse_init_connect("CONNECT host.com:443 foo/1.0")
assert not http.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
assert protocol.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
assert not protocol.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
assert not protocol.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
assert not protocol.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
assert not protocol.parse_init_connect("bogus")
assert not protocol.parse_init_connect("GET host.com:443 HTTP/1.0")
assert not protocol.parse_init_connect("CONNECT host.com443 HTTP/1.0")
assert not protocol.parse_init_connect("CONNECT host.com:443 foo/1.0")
assert not protocol.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
def test_parse_init_proxy():
u = "GET http://foo.com:8888/test HTTP/1.1"
m, s, h, po, pa, httpversion = http.parse_init_proxy(u)
m, s, h, po, pa, httpversion = protocol.parse_init_proxy(u)
assert m == "GET"
assert s == "http"
assert h == "foo.com"
@ -206,27 +203,27 @@ def test_parse_init_proxy():
assert httpversion == (1, 1)
u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
assert not http.parse_init_proxy(u)
assert not protocol.parse_init_proxy(u)
assert not http.parse_init_proxy("invalid")
assert not http.parse_init_proxy("GET invalid HTTP/1.1")
assert not http.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
assert not protocol.parse_init_proxy("invalid")
assert not protocol.parse_init_proxy("GET invalid HTTP/1.1")
assert not protocol.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
def test_parse_init_http():
u = "GET /test HTTP/1.1"
m, u, httpversion = http.parse_init_http(u)
m, u, httpversion = protocol.parse_init_http(u)
assert m == "GET"
assert u == "/test"
assert httpversion == (1, 1)
u = "G\xfeET /test HTTP/1.1"
assert not http.parse_init_http(u)
assert not protocol.parse_init_http(u)
assert not http.parse_init_http("invalid")
assert not http.parse_init_http("GET invalid HTTP/1.1")
assert not http.parse_init_http("GET /test foo/1.1")
assert not http.parse_init_http("GET /test\xc0 HTTP/1.1")
assert not protocol.parse_init_http("invalid")
assert not protocol.parse_init_http("GET invalid HTTP/1.1")
assert not protocol.parse_init_http("GET /test foo/1.1")
assert not protocol.parse_init_http("GET /test\xc0 HTTP/1.1")
class TestReadHeaders:
@ -236,7 +233,7 @@ class TestReadHeaders:
data = textwrap.dedent(data)
data = data.strip()
s = cStringIO.StringIO(data)
return http.read_headers(s)
return protocol.read_headers(s)
def test_read_simple(self):
data = """
@ -290,7 +287,7 @@ class TestReadResponseNoContentLength(tservers.ServerTestBase):
def test_no_content_length(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
resp = http.read_response(c.rfile, "GET", None)
resp = protocol.read_response(c.rfile, "GET", None)
assert resp.content == "bar\r\n\r\n"
@ -298,7 +295,7 @@ def test_read_response():
def tst(data, method, limit, include_body=True):
data = textwrap.dedent(data)
r = cStringIO.StringIO(data)
return http.read_response(
return protocol.read_response(
r, method, limit, include_body=include_body
)
@ -307,13 +304,13 @@ def test_read_response():
data = """
HTTP/1.1 200 OK
"""
assert tst(data, "GET", None) == http_semantics.Response(
assert tst(data, "GET", None) == http.Response(
(1, 1), 200, 'OK', odict.ODictCaseless(), ''
)
data = """
HTTP/1.1 200
"""
assert tst(data, "GET", None) == http_semantics.Response(
assert tst(data, "GET", None) == http.Response(
(1, 1), 200, '', odict.ODictCaseless(), ''
)
data = """
@ -330,7 +327,7 @@ def test_read_response():
HTTP/1.1 200 OK
"""
assert tst(data, "GET", None) == http_semantics.Response(
assert tst(data, "GET", None) == http.Response(
(1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
)
@ -360,71 +357,28 @@ def test_read_response():
assert tst(data, "GET", None, include_body=False).content is None
def test_parse_url():
assert not http.parse_url("")
u = "http://foo.com:8888/test"
s, h, po, pa = http.parse_url(u)
assert s == "http"
assert h == "foo.com"
assert po == 8888
assert pa == "/test"
s, h, po, pa = http.parse_url("http://foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = http.parse_url("http://foo")
assert pa == "/"
s, h, po, pa = http.parse_url("https://foo")
assert po == 443
assert not http.parse_url("https://foo:bar")
assert not http.parse_url("https://foo:")
# Invalid IDNA
assert not http.parse_url("http://\xfafoo")
# Invalid PATH
assert not http.parse_url("http:/\xc6/localhost:56121")
# Null byte in host
assert not http.parse_url("http://foo\0")
# Port out of range
assert not http.parse_url("http://foo:999999")
# Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
assert not http.parse_url('http://lo[calhost')
def test_parse_http_basic_auth():
vals = ("basic", "foo", "bar")
assert http.parse_http_basic_auth(
http.assemble_http_basic_auth(*vals)
assert protocol.parse_http_basic_auth(
protocol.assemble_http_basic_auth(*vals)
) == vals
assert not http.parse_http_basic_auth("")
assert not http.parse_http_basic_auth("foo bar")
assert not protocol.parse_http_basic_auth("")
assert not protocol.parse_http_basic_auth("foo bar")
v = "basic " + binascii.b2a_base64("foo")
assert not http.parse_http_basic_auth(v)
assert not protocol.parse_http_basic_auth(v)
def test_get_request_line():
r = cStringIO.StringIO("\nfoo")
assert http.get_request_line(r) == "foo"
assert not http.get_request_line(r)
assert protocol.get_request_line(r) == "foo"
assert not protocol.get_request_line(r)
class TestReadRequest():
def tst(self, data, **kwargs):
r = cStringIO.StringIO(data)
return http.read_request(r, **kwargs)
return protocol.read_request(r, **kwargs)
def test_invalid(self):
tutils.raises(
@ -485,7 +439,7 @@ class TestReadRequest():
"Expect: 100-continue\r\n\r\n"
"foobar",
)
v = http.read_request(r, wfile=w)
v = protocol.read_request(r, wfile=w)
assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
assert v.content == "foo"
assert r.read(3) == "bar"

View File

View File

@ -2,7 +2,7 @@ import cStringIO
from test import tutils
from nose.tools import assert_equal
from netlib import tcp
from netlib.http2.frame import *
from netlib.http.http2.frame import *
def hex_to_file(data):

View File

@ -1,10 +1,9 @@
import OpenSSL
from netlib import http2
from netlib import tcp
from netlib.http2.frame import *
from test import tutils
from .. import tservers
from netlib.http import http2
from netlib.http.http2.frame import *
from ... import tutils, tservers
class EchoHandler(tcp.BaseHandler):

View File

@ -1,11 +1,12 @@
from netlib import odict, http_auth, http
import tutils
from netlib import odict, http
from netlib.http import authentication
from .. import tutils
class TestPassManNonAnon:
def test_simple(self):
p = http_auth.PassManNonAnon()
p = authentication.PassManNonAnon()
assert not p.test("", "")
assert p.test("user", "")
@ -15,14 +16,14 @@ class TestPassManHtpasswd:
def test_file_errors(self):
tutils.raises(
"malformed htpasswd file",
http_auth.PassManHtpasswd,
authentication.PassManHtpasswd,
tutils.test_data.path("data/server.crt"))
def test_simple(self):
pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
vals = ("basic", "test", "test")
http.assemble_http_basic_auth(*vals)
http.http1.assemble_http_basic_auth(*vals)
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@ -33,7 +34,7 @@ class TestPassManHtpasswd:
class TestPassManSingleUser:
def test_simple(self):
pm = http_auth.PassManSingleUser("test", "test")
pm = authentication.PassManSingleUser("test", "test")
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@ -42,7 +43,7 @@ class TestPassManSingleUser:
class TestNullProxyAuth:
def test_simple(self):
na = http_auth.NullProxyAuth(http_auth.PassManNonAnon())
na = authentication.NullProxyAuth(authentication.PassManNonAnon())
assert not na.auth_challenge_headers()
assert na.authenticate("foo")
na.clean({})
@ -51,17 +52,17 @@ class TestNullProxyAuth:
class TestBasicProxyAuth:
def test_simple(self):
ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
h = odict.ODictCaseless()
assert ba.auth_challenge_headers()
assert not ba.authenticate(h)
def test_authenticate_clean(self):
ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
hdrs = odict.ODictCaseless()
vals = ("basic", "foo", "bar")
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert ba.authenticate(hdrs)
ba.clean(hdrs)
@ -74,12 +75,12 @@ class TestBasicProxyAuth:
assert not ba.authenticate(hdrs)
vals = ("foo", "foo", "bar")
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test")
ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
vals = ("basic", "foo", "bar")
hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
@ -91,19 +92,19 @@ class TestAuthAction:
def test_nonanonymous(self):
m = Bunch()
aa = http_auth.NonanonymousAuthAction(None, "authenticator")
aa = authentication.NonanonymousAuthAction(None, "authenticator")
aa(None, m, None, None)
assert m.authenticator
def test_singleuser(self):
m = Bunch()
aa = http_auth.SingleuserAuthAction(None, "authenticator")
aa = authentication.SingleuserAuthAction(None, "authenticator")
aa(None, m, "foo:bar", None)
assert m.authenticator
tutils.raises("invalid", aa, None, m, "foo", None)
def test_httppasswd(self):
m = Bunch()
aa = http_auth.HtpasswdAuthAction(None, "authenticator")
aa = authentication.HtpasswdAuthAction(None, "authenticator")
aa(None, m, tutils.test_data.path("data/htpasswd"), None)
assert m.authenticator

View File

@ -1,6 +1,6 @@
import nose.tools
from netlib import http_cookies
from netlib.http import cookies
def test_read_token():
@ -13,7 +13,7 @@ def test_read_token():
[(" foo=bar", 1), ("foo", 4)],
]
for q, a in tokens:
nose.tools.eq_(http_cookies._read_token(*q), a)
nose.tools.eq_(cookies._read_token(*q), a)
def test_read_quoted_string():
@ -25,7 +25,7 @@ def test_read_quoted_string():
[('"fo\\\"" x', 0), ("fo\"", 6)],
]
for q, a in tokens:
nose.tools.eq_(http_cookies._read_quoted_string(*q), a)
nose.tools.eq_(cookies._read_quoted_string(*q), a)
def test_read_pairs():
@ -60,7 +60,7 @@ def test_read_pairs():
],
]
for s, lst in vals:
ret, off = http_cookies._read_pairs(s)
ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
@ -108,10 +108,10 @@ def test_pairs_roundtrips():
]
]
for s, lst in pairs:
ret, off = http_cookies._read_pairs(s)
ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
s2 = http_cookies._format_pairs(lst)
ret, off = http_cookies._read_pairs(s2)
s2 = cookies._format_pairs(lst)
ret, off = cookies._read_pairs(s2)
nose.tools.eq_(ret, lst)
@ -127,10 +127,10 @@ def test_cookie_roundtrips():
],
]
for s, lst in pairs:
ret = http_cookies.parse_cookie_header(s)
ret = cookies.parse_cookie_header(s)
nose.tools.eq_(ret.lst, lst)
s2 = http_cookies.format_cookie_header(ret)
ret = http_cookies.parse_cookie_header(s2)
s2 = cookies.format_cookie_header(ret)
ret = cookies.parse_cookie_header(s2)
nose.tools.eq_(ret.lst, lst)
@ -180,10 +180,10 @@ def test_parse_set_cookie_pairs():
],
]
for s, lst in pairs:
ret = http_cookies._parse_set_cookie_pairs(s)
ret = cookies._parse_set_cookie_pairs(s)
nose.tools.eq_(ret, lst)
s2 = http_cookies._format_set_cookie_pairs(ret)
ret2 = http_cookies._parse_set_cookie_pairs(s2)
s2 = cookies._format_set_cookie_pairs(ret)
ret2 = cookies._parse_set_cookie_pairs(s2)
nose.tools.eq_(ret2, lst)
@ -205,13 +205,13 @@ def test_parse_set_cookie_header():
]
]
for s, expected in vals:
ret = http_cookies.parse_set_cookie_header(s)
ret = cookies.parse_set_cookie_header(s)
if expected:
assert ret[0] == expected[0]
assert ret[1] == expected[1]
nose.tools.eq_(ret[2].lst, expected[2])
s2 = http_cookies.format_set_cookie_header(*ret)
ret2 = http_cookies.parse_set_cookie_header(s2)
s2 = cookies.format_set_cookie_header(*ret)
ret2 = cookies.parse_set_cookie_header(s2)
assert ret2[0] == expected[0]
assert ret2[1] == expected[1]
nose.tools.eq_(ret2[2].lst, expected[2])

View File

@ -0,0 +1,54 @@
import cStringIO
import textwrap
import binascii
from netlib import http, odict, tcp
from netlib.http import http1
from .. import tutils, tservers
def test_httperror():
e = http.exceptions.HttpError(404, "Not found")
assert str(e)
def test_parse_url():
assert not http.parse_url("")
u = "http://foo.com:8888/test"
s, h, po, pa = http.parse_url(u)
assert s == "http"
assert h == "foo.com"
assert po == 8888
assert pa == "/test"
s, h, po, pa = http.parse_url("http://foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
assert s == "http"
assert h == "foo"
assert po == 80
assert pa == "/bar"
s, h, po, pa = http.parse_url("http://foo")
assert pa == "/"
s, h, po, pa = http.parse_url("https://foo")
assert po == 443
assert not http.parse_url("https://foo:bar")
assert not http.parse_url("https://foo:")
# Invalid IDNA
assert not http.parse_url("http://\xfafoo")
# Invalid PATH
assert not http.parse_url("http:/\xc6/localhost:56121")
# Null byte in host
assert not http.parse_url("http://foo\0")
# Port out of range
assert not http.parse_url("http://foo:999999")
# Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
assert not http.parse_url('http://lo[calhost')

View File

@ -0,0 +1,6 @@
from netlib.http import user_agents
def test_get_shortcut():
assert user_agents.get_by_shortcut("c")[0] == "chrome"
assert not user_agents.get_by_shortcut("_")

View File

@ -1,6 +0,0 @@
from netlib import http_uastrings
def test_get_shortcut():
assert http_uastrings.get_by_shortcut("c")[0] == "chrome"
assert not http_uastrings.get_by_shortcut("_")

View File

View File

@ -2,8 +2,9 @@ import os
from nose.tools import raises
from netlib import tcp, websockets, http
from . import tutils, tservers
from netlib import tcp, http, websockets
from netlib.http.exceptions import *
from .. import tutils, tservers
class WebSocketsEchoHandler(tcp.BaseHandler):
@ -31,10 +32,10 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
frame.to_file(self.wfile)
def handshake(self):
req = http.read_request(self.rfile)
req = http.http1.read_request(self.rfile)
key = self.protocol.check_client_handshake(req.headers)
self.wfile.write(http.response_preamble(101) + "\r\n")
self.wfile.write(http.http1.response_preamble(101) + "\r\n")
headers = self.protocol.server_handshake_headers(key)
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
@ -55,14 +56,14 @@ class WebSocketsClient(tcp.TCPClient):
def connect(self):
super(WebSocketsClient, self).connect()
preamble = http.request_preamble("GET", "/")
preamble = http.http1.protocol.request_preamble("GET", "/")
self.wfile.write(preamble + "\r\n")
headers = self.protocol.client_handshake_headers()
self.client_nonce = headers.get_first("sec-websocket-key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
resp = http.read_response(self.rfile, "get", None)
resp = http.http1.protocol.read_response(self.rfile, "get", None)
server_nonce = self.protocol.check_server_handshake(resp.headers)
if not server_nonce == self.protocol.create_server_nonce(
@ -150,10 +151,10 @@ class TestWebSockets(tservers.ServerTestBase):
class BadHandshakeHandler(WebSocketsEchoHandler):
def handshake(self):
client_hs = http.read_request(self.rfile)
client_hs = http.http1.protocol.read_request(self.rfile)
self.protocol.check_client_handshake(client_hs.headers)
self.wfile.write(http.response_preamble(101) + "\r\n")
self.wfile.write(http.http1.protocol.response_preamble(101) + "\r\n")
headers = self.protocol.server_handshake_headers("malformed key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()