mitmproxy/netlib/http/request.py

410 lines
13 KiB
Python
Raw Normal View History

2015-09-25 22:39:04 +00:00
from __future__ import absolute_import, print_function, division
import re
import urllib
2015-09-25 22:39:04 +00:00
from netlib import multidict
from netlib import strutils
from netlib.http import multipart
from netlib.http import cookies
from netlib.http import headers as nheaders
from netlib.http import message
import netlib.http.url
2015-09-25 22:39:04 +00:00
# This regex extracts & splits the host header into host and port.
# Handles the edge case of IPv6 addresses containing colons.
# https://bugzilla.mozilla.org/show_bug.cgi?id=45891
host_header_re = re.compile(r"^(?P<host>[^:]+|\[.+\])(?::(?P<port>\d+))?$")
2015-09-25 22:39:04 +00:00
2016-04-11 21:14:18 +00:00
class RequestData(message.MessageData):
2016-08-18 15:24:27 +00:00
def __init__(
self,
first_line_format,
method,
scheme,
host,
port,
path,
http_version,
headers=(),
content=None,
timestamp_start=None,
timestamp_end=None
):
if isinstance(method, str):
2016-07-08 00:29:22 +00:00
method = method.encode("ascii", "strict")
if isinstance(scheme, str):
2016-07-08 00:29:22 +00:00
scheme = scheme.encode("ascii", "strict")
if isinstance(host, str):
2016-07-08 00:29:22 +00:00
host = host.encode("idna", "strict")
if isinstance(path, str):
2016-07-08 00:29:22 +00:00
path = path.encode("ascii", "strict")
if isinstance(http_version, str):
2016-07-08 00:29:22 +00:00
http_version = http_version.encode("ascii", "strict")
if not isinstance(headers, nheaders.Headers):
headers = nheaders.Headers(headers)
if isinstance(content, str):
2016-07-08 00:29:22 +00:00
raise ValueError("Content must be bytes, not {}".format(type(content).__name__))
2015-09-25 22:39:04 +00:00
self.first_line_format = first_line_format
self.method = method
2016-07-02 00:17:16 +00:00
self.scheme = scheme
2015-09-25 22:39:04 +00:00
self.host = host
self.port = port
self.path = path
self.http_version = http_version
self.headers = headers
self.content = content
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
class Request(message.Message):
2015-09-25 22:39:04 +00:00
"""
An HTTP request.
"""
def __init__(self, *args, **kwargs):
super(Request, self).__init__()
2016-04-02 11:50:53 +00:00
self.data = RequestData(*args, **kwargs)
2015-09-25 22:39:04 +00:00
def __repr__(self):
if self.host and self.port:
hostport = "{}:{}".format(self.host, self.port)
else:
hostport = ""
path = self.path or ""
2015-09-26 15:39:50 +00:00
return "Request({} {}{})".format(
2015-09-25 22:39:04 +00:00
self.method, hostport, path
)
def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in the headers, the
request path and the body of the request. Encoded content will be
decoded before replacement, and re-encoded afterwards.
Returns:
The number of replacements made.
"""
if isinstance(pattern, str):
2016-07-01 21:10:48 +00:00
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, str):
2016-07-01 21:10:48 +00:00
repl = strutils.escaped_str_to_bytes(repl)
c = super(Request, self).replace(pattern, repl, flags, count)
2016-07-01 21:10:48 +00:00
self.path, pc = re.subn(
pattern, repl, self.data.path, flags=flags, count=count
)
c += pc
return c
2015-09-25 22:39:04 +00:00
@property
def first_line_format(self):
"""
HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_.
origin-form and asterisk-form are subsumed as "relative".
"""
return self.data.first_line_format
@first_line_format.setter
def first_line_format(self, first_line_format):
self.data.first_line_format = first_line_format
@property
def method(self):
"""
HTTP request method, e.g. "GET".
"""
return message._native(self.data.method).upper()
2015-09-25 22:39:04 +00:00
@method.setter
def method(self, method):
self.data.method = message._always_bytes(method)
2015-09-25 22:39:04 +00:00
@property
def scheme(self):
"""
HTTP request scheme, which should be "http" or "https".
"""
2016-07-02 00:17:16 +00:00
if not self.data.scheme:
return self.data.scheme
return message._native(self.data.scheme)
2015-09-25 22:39:04 +00:00
@scheme.setter
def scheme(self, scheme):
self.data.scheme = message._always_bytes(scheme)
2015-09-25 22:39:04 +00:00
@property
def host(self):
"""
2015-09-26 15:39:50 +00:00
Target host. This may be parsed from the raw request
(e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
2015-09-25 22:39:04 +00:00
or inferred from the proxy mode (e.g. an IP in transparent mode).
2015-11-01 17:20:00 +00:00
Setting the host attribute also updates the host header, if present.
2015-09-25 22:39:04 +00:00
"""
if not self.data.host:
return self.data.host
try:
return self.data.host.decode("idna")
except UnicodeError:
return self.data.host.decode("utf8", "surrogateescape")
@host.setter
def host(self, host):
if isinstance(host, str):
2015-09-25 22:39:04 +00:00
try:
# There's no non-strict mode for IDNA encoding.
# We don't want this operation to fail though, so we try
# utf8 as a last resort.
host = host.encode("idna", "strict")
except UnicodeError:
host = host.encode("utf8", "surrogateescape")
self.data.host = host
# Update host header
if "host" in self.headers:
if host:
self.headers["host"] = host
else:
self.headers.pop("host")
@property
def port(self):
"""
Target port
"""
return self.data.port
@port.setter
def port(self, port):
self.data.port = port
@property
def path(self):
"""
HTTP request path, e.g. "/index.html".
2016-05-09 18:15:20 +00:00
Guaranteed to start with a slash, except for OPTIONS requests, which may just be "*".
2015-09-25 22:39:04 +00:00
"""
2016-03-20 22:22:50 +00:00
if self.data.path is None:
return None
else:
return message._native(self.data.path)
2015-09-25 22:39:04 +00:00
@path.setter
def path(self, path):
self.data.path = message._always_bytes(path)
2015-09-25 22:39:04 +00:00
2015-09-26 15:39:50 +00:00
@property
def url(self):
"""
The URL string, constructed from the request's URL components
"""
2016-04-20 22:27:39 +00:00
if self.first_line_format == "authority":
return "%s:%d" % (self.host, self.port)
return netlib.http.url.unparse(self.scheme, self.host, self.port, self.path)
2015-09-26 15:39:50 +00:00
@url.setter
def url(self, url):
self.scheme, self.host, self.port, self.path = netlib.http.url.parse(url)
2015-09-26 15:39:50 +00:00
def _parse_host_header(self):
"""Extract the host and port from Host header"""
if "host" not in self.headers:
return None, None
host, port = self.headers["host"], None
m = host_header_re.match(host)
if m:
host = m.group("host").strip("[]")
if m.group("port"):
port = int(m.group("port"))
return host, port
2015-09-26 15:39:50 +00:00
@property
def pretty_host(self):
"""
Similar to :py:attr:`host`, but using the Host headers as an additional preferred data source.
This is useful in transparent mode where :py:attr:`host` is only an IP address,
but may not reflect the actual destination as the Host header could be spoofed.
"""
host, port = self._parse_host_header()
if not host:
return self.host
if not port:
port = 443 if self.scheme == 'https' else 80
# Prefer the original address if host header has an unexpected form
return host if port == self.port else self.host
2015-09-26 15:39:50 +00:00
@property
def pretty_url(self):
"""
Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`.
"""
if self.first_line_format == "authority":
return "%s:%d" % (self.pretty_host, self.port)
return netlib.http.url.unparse(self.scheme, self.pretty_host, self.port, self.path)
2015-09-26 15:39:50 +00:00
@property
def query(self):
# type: () -> multidict.MultiDictView
2015-09-26 15:39:50 +00:00
"""
2016-06-07 02:07:36 +00:00
The request query string as an :py:class:`~netlib.multidict.MultiDictView` object.
2015-09-26 15:39:50 +00:00
"""
return multidict.MultiDictView(
self._get_query,
self._set_query
)
def _get_query(self):
query = urllib.parse.urlparse(self.url).query
return tuple(netlib.http.url.decode(query))
2015-09-26 15:39:50 +00:00
def _set_query(self, query_data):
query = netlib.http.url.encode(query_data)
_, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
2015-09-26 15:39:50 +00:00
@query.setter
def query(self, value):
self._set_query(value)
2015-09-26 15:39:50 +00:00
@property
def cookies(self):
# type: () -> multidict.MultiDictView
2015-09-26 15:39:50 +00:00
"""
The request cookies.
2016-06-07 02:07:36 +00:00
An empty :py:class:`~netlib.multidict.MultiDictView` object if the cookie monster ate them all.
2015-09-26 15:39:50 +00:00
"""
return multidict.MultiDictView(
self._get_cookies,
self._set_cookies
)
def _get_cookies(self):
h = self.headers.get_all("Cookie")
return tuple(cookies.parse_cookie_headers(h))
2015-09-26 15:39:50 +00:00
def _set_cookies(self, value):
self.headers["cookie"] = cookies.format_cookie_header(value)
2015-09-26 15:39:50 +00:00
@cookies.setter
def cookies(self, value):
self._set_cookies(value)
2015-09-26 15:39:50 +00:00
@property
def path_components(self):
"""
The URL's path components as a tuple of strings.
2015-09-26 15:39:50 +00:00
Components are unquoted.
"""
path = urllib.parse.urlparse(self.url).path
# This needs to be a tuple so that it's immutable.
# Otherwise, this would fail silently:
# request.path_components.append("foo")
return tuple(netlib.http.url.unquote(i) for i in path.split("/") if i)
2015-09-26 15:39:50 +00:00
@path_components.setter
def path_components(self, components):
components = map(lambda x: netlib.http.url.quote(x, safe=""), components)
2015-09-26 15:39:50 +00:00
path = "/" + "/".join(components)
_, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
2015-09-26 15:39:50 +00:00
2015-09-25 22:39:04 +00:00
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
self.headers.pop(i, None)
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = "identity"
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
accept_encoding = self.headers.get("accept-encoding")
if accept_encoding:
self.headers["accept-encoding"] = (
', '.join(
e
2016-07-30 12:43:53 +00:00
for e in {"gzip", "identity", "deflate", "br"}
2015-09-25 22:39:04 +00:00
if e in accept_encoding
)
)
@property
def urlencoded_form(self):
"""
2016-06-07 02:07:36 +00:00
The URL-encoded form data as an :py:class:`~netlib.multidict.MultiDictView` object.
An empty multidict.MultiDictView if the content-type indicates non-form data
or the content could not be parsed.
2015-09-25 22:39:04 +00:00
"""
return multidict.MultiDictView(
self._get_urlencoded_form,
self._set_urlencoded_form
)
2015-09-25 22:39:04 +00:00
def _get_urlencoded_form(self):
is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
try:
return tuple(netlib.http.url.decode(self.content))
except ValueError:
pass
return ()
def _set_urlencoded_form(self, form_data):
2015-09-25 22:39:04 +00:00
"""
Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
This will overwrite the existing content if there is one.
"""
self.headers["content-type"] = "application/x-www-form-urlencoded"
self.content = netlib.http.url.encode(form_data).encode()
2015-09-25 22:39:04 +00:00
@urlencoded_form.setter
def urlencoded_form(self, value):
self._set_urlencoded_form(value)
2015-09-25 22:39:04 +00:00
@property
def multipart_form(self):
"""
2016-06-07 02:07:36 +00:00
The multipart form data as an :py:class:`~netlib.multidict.MultiDictView` object.
None if the content-type indicates non-form data.
2015-09-25 22:39:04 +00:00
"""
return multidict.MultiDictView(
self._get_multipart_form,
self._set_multipart_form
)
2015-09-25 22:39:04 +00:00
def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
try:
return multipart.decode(self.headers, self.content)
except ValueError:
pass
return ()
def _set_multipart_form(self, value):
raise NotImplementedError()
2015-09-25 22:39:04 +00:00
@multipart_form.setter
2016-02-08 01:10:48 +00:00
def multipart_form(self, value):
self._set_multipart_form(value)