mirror of
synced 2024-11-27 02:24:18 +00:00
381 lines
12 KiB
381 lines
12 KiB
from __future__ import absolute_import, print_function, division
import re
import six
from six.moves import urllib
from netlib import utils
import netlib.http.url
from . import cookies
from .. import encoding
from ..multidict import MultiDictView
from .headers import Headers
from .message import Message, _native, _always_bytes, MessageData
# This regex extracts & splits the host header into host and port.
# Handles the edge case of IPv6 addresses containing colons.
# https://bugzilla.mozilla.org/show_bug.cgi?id=45891
host_header_re = re.compile(r"^(?P<host>[^:]+|\[.+\])(?::(?P<port>\d+))?$")
class RequestData(MessageData):
def __init__(self, first_line_format, method, scheme, host, port, path, http_version, headers=(), content=None,
timestamp_start=None, timestamp_end=None):
if not isinstance(headers, Headers):
headers = Headers(headers)
self.first_line_format = first_line_format
self.method = method
self.scheme = scheme
self.host = host
self.port = port
self.path = path
self.http_version = http_version
self.headers = headers
self.content = content
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
class Request(Message):
An HTTP request.
def __init__(self, *args, **kwargs):
self.data = RequestData(*args, **kwargs)
def __repr__(self):
if self.host and self.port:
hostport = "{}:{}".format(self.host, self.port)
hostport = ""
path = self.path or ""
return "Request({} {}{})".format(
self.method, hostport, path
def replace(self, pattern, repl, flags=0):
Replaces a regular expression pattern with repl in the headers, the
request path and the body of the request. Encoded content will be
decoded before replacement, and re-encoded afterwards.
The number of replacements made.
# TODO: Proper distinction between text and bytes.
c = super(Request, self).replace(pattern, repl, flags)
self.path, pc = utils.safe_subn(
pattern, repl, self.path, flags=flags
c += pc
return c
def first_line_format(self):
HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_.
origin-form and asterisk-form are subsumed as "relative".
return self.data.first_line_format
def first_line_format(self, first_line_format):
self.data.first_line_format = first_line_format
def method(self):
HTTP request method, e.g. "GET".
return _native(self.data.method).upper()
def method(self, method):
self.data.method = _always_bytes(method)
def scheme(self):
HTTP request scheme, which should be "http" or "https".
return _native(self.data.scheme)
def scheme(self, scheme):
self.data.scheme = _always_bytes(scheme)
def host(self):
Target host. This may be parsed from the raw request
(e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header, if present.
if six.PY2: # pragma: no cover
return self.data.host
if not self.data.host:
return self.data.host
return self.data.host.decode("idna")
except UnicodeError:
return self.data.host.decode("utf8", "surrogateescape")
def host(self, host):
if isinstance(host, six.text_type):
# There's no non-strict mode for IDNA encoding.
# We don't want this operation to fail though, so we try
# utf8 as a last resort.
host = host.encode("idna", "strict")
except UnicodeError:
host = host.encode("utf8", "surrogateescape")
self.data.host = host
# Update host header
if "host" in self.headers:
if host:
self.headers["host"] = host
def port(self):
Target port
return self.data.port
def port(self, port):
self.data.port = port
def path(self):
HTTP request path, e.g. "/index.html".
Guaranteed to start with a slash, except for OPTIONS requests, which may just be "*".
if self.data.path is None:
return None
return _native(self.data.path)
def path(self, path):
self.data.path = _always_bytes(path)
def url(self):
The URL string, constructed from the request's URL components
if self.first_line_format == "authority":
return "%s:%d" % (self.host, self.port)
return netlib.http.url.unparse_url(self.scheme, self.host, self.port, self.path)
def url(self, url):
self.scheme, self.host, self.port, self.path = netlib.http.url.parse_url(url)
def _parse_host_header(self):
"""Extract the host and port from Host header"""
if "host" not in self.headers:
return None, None
host, port = self.headers["host"], None
m = host_header_re.match(host)
if m:
host = m.group("host").strip("[]")
if m.group("port"):
port = int(m.group("port"))
return host, port
def pretty_host(self):
Similar to :py:attr:`host`, but using the Host headers as an additional preferred data source.
This is useful in transparent mode where :py:attr:`host` is only an IP address,
but may not reflect the actual destination as the Host header could be spoofed.
host, port = self._parse_host_header()
if not host:
return self.host
if not port:
port = 443 if self.scheme == 'https' else 80
# Prefer the original address if host header has an unexpected form
return host if port == self.port else self.host
def pretty_url(self):
Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`.
if self.first_line_format == "authority":
return "%s:%d" % (self.pretty_host, self.port)
return netlib.http.url.unparse_url(self.scheme, self.pretty_host, self.port, self.path)
def query(self):
# type: () -> MultiDictView
The request query string as an :py:class:`MultiDictView` object.
return MultiDictView(
def _get_query(self):
_, _, _, _, query, _ = urllib.parse.urlparse(self.url)
return tuple(netlib.http.url.urldecode(query))
def _set_query(self, value):
query = netlib.http.url.urlencode(value)
scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url)
_, _, _, self.path = netlib.http.url.parse_url(
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
def query(self, value):
def cookies(self):
# type: () -> MultiDictView
The request cookies.
An empty :py:class:`MultiDictView` object if the cookie monster ate them all.
return MultiDictView(
def _get_cookies(self):
h = self.headers.get_all("Cookie")
return tuple(cookies.parse_cookie_headers(h))
def _set_cookies(self, value):
self.headers["cookie"] = cookies.format_cookie_header(value)
def cookies(self, value):
def path_components(self):
The URL's path components as a tuple of strings.
Components are unquoted.
_, _, path, _, _, _ = urllib.parse.urlparse(self.url)
# This needs to be a tuple so that it's immutable.
# Otherwise, this would fail silently:
# request.path_components.append("foo")
return tuple(urllib.parse.unquote(i) for i in path.split("/") if i)
def path_components(self, components):
components = map(lambda x: urllib.parse.quote(x, safe=""), components)
path = "/" + "/".join(components)
scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url)
_, _, _, self.path = netlib.http.url.parse_url(
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
def anticache(self):
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
delheaders = [
for i in delheaders:
self.headers.pop(i, None)
def anticomp(self):
Modifies this request to remove headers that will compress the
resource's data.
self.headers["accept-encoding"] = "identity"
def constrain_encoding(self):
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
accept_encoding = self.headers.get("accept-encoding")
if accept_encoding:
self.headers["accept-encoding"] = (
', '.join(
for e in encoding.ENCODINGS
if e in accept_encoding
def urlencoded_form(self):
The URL-encoded form data as an :py:class:`MultiDictView` object.
An empty MultiDictView if the content-type indicates non-form data
or the content could not be parsed.
return MultiDictView(
def _get_urlencoded_form(self):
is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
return tuple(netlib.http.url.urldecode(self.content))
return ()
def _set_urlencoded_form(self, value):
Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
This will overwrite the existing content if there is one.
self.headers["content-type"] = "application/x-www-form-urlencoded"
self.content = netlib.http.url.urlencode(value)
def urlencoded_form(self, value):
def multipart_form(self):
The multipart form data as an :py:class:`MultipartFormDict` object.
None if the content-type indicates non-form data.
return MultiDictView(
def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
return utils.multipartdecode(self.headers, self.content)
return ()
def _set_multipart_form(self, value):
raise NotImplementedError()
def multipart_form(self, value):