mitmproxy/netlib/http/headers.py

234 lines
7.2 KiB
Python

from __future__ import absolute_import, print_function, division
import re
import collections
import six
from netlib import multidict
from netlib import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
if six.PY2: # pragma: no cover
def _native(x):
return x
def _always_bytes(x):
strutils.always_bytes(x, "utf-8", "replace") # raises a TypeError if x != str/bytes/None.
return x
else:
# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
def _native(x):
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):
"""
Header class which allows both convenient access to individual headers as well as
direct access to the underlying raw data. Provides a full dictionary interface.
Example:
.. code-block:: python
# Create headers with keyword arguments
>>> h = Headers(host="example.com", content_type="application/xml")
# Headers mostly behave like a normal dict.
>>> h["Host"]
"example.com"
# HTTP Headers are case insensitive
>>> h["host"]
"example.com"
# Headers can also be created from a list of raw (header_name, header_value) byte tuples
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
# Multiple headers are folded into a single header as per RFC7230
>>> h["Accept"]
"text/html, application/xml"
# Setting a header removes all existing headers with the same name.
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
# bytes(h) returns a HTTP1 header block.
>>> print(bytes(h))
Host: example.com
Accept: application/text
# For full control, the raw header fields can be accessed
>>> h.fields
Caveats:
For use with the "Set-Cookie" header, see :py:meth:`get_all`.
"""
def __init__(self, fields=(), **headers):
"""
Args:
fields: (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
**headers: Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
If ``**headers`` contains multiple keys that have equal ``.lower()`` s,
the behavior is undefined.
"""
super(Headers, self).__init__(fields)
for key, value in self.fields:
if not isinstance(key, bytes) or not isinstance(value, bytes):
raise TypeError("Header fields must be bytes.")
# content_type -> content-type
headers = {
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
for name, value in six.iteritems(headers)
}
self.update(headers)
@staticmethod
def _reduce_values(values):
# Headers can be folded
return ", ".join(values)
@staticmethod
def _kconv(key):
# Headers are case-insensitive
return key.lower()
def __bytes__(self):
if self.fields:
return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
else:
return b""
if six.PY2: # pragma: no cover
__str__ = __bytes__
def __delitem__(self, key):
key = _always_bytes(key)
super(Headers, self).__delitem__(key)
def __iter__(self):
for x in super(Headers, self).__iter__():
yield _native(x)
def get_all(self, name):
"""
Like :py:meth:`get`, but does not fold multiple headers into a single one.
This is useful for Set-Cookie headers, which do not support folding.
See also: https://tools.ietf.org/html/rfc7230#section-3.2.2
"""
name = _always_bytes(name)
return [
_native(x) for x in
super(Headers, self).get_all(name)
]
def set_all(self, name, values):
"""
Explicitly set multiple headers for the given key.
See: :py:meth:`get_all`
"""
name = _always_bytes(name)
values = [_always_bytes(x) for x in values]
return super(Headers, self).set_all(name, values)
def insert(self, index, key, value):
key = _always_bytes(key)
value = _always_bytes(value)
super(Headers, self).insert(index, key, value)
def items(self, multi=False):
if multi:
return (
(_native(k), _native(v))
for k, v in self.fields
)
else:
return super(Headers, self).items()
def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in each "name: value"
header line.
Returns:
The number of replacements made.
"""
if isinstance(pattern, six.text_type):
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, six.text_type):
repl = strutils.escaped_str_to_bytes(repl)
pattern = re.compile(pattern, flags)
replacements = 0
flag_count = count > 0
fields = []
for name, value in self.fields:
line, n = pattern.subn(repl, name + b": " + value, count=count)
try:
name, value = line.split(b": ", 1)
except ValueError:
# We get a ValueError if the replacement removed the ": "
# There's not much we can do about this, so we just keep the header as-is.
pass
else:
replacements += n
if flag_count:
count -= n
if count == 0:
break
fields.append((name, value))
self.fields = tuple(fields)
return replacements
def parse_content_type(c):
"""
A simple parser for content-type values. Returns a (type, subtype,
parameters) tuple, where type and subtype are strings, and parameters
is a dict. If the string could not be parsed, return None.
E.g. the following string:
text/html; charset=UTF-8
Returns:
("text", "html", {"charset": "UTF-8"})
"""
parts = c.split(";", 1)
ts = parts[0].split("/", 1)
if len(ts) != 2:
return None
d = collections.OrderedDict()
if len(parts) == 2:
for i in parts[1].split(";"):
clause = i.split("=", 1)
if len(clause) == 2:
d[clause[0].strip()] = clause[1].strip()
return ts[0].lower(), ts[1].lower(), d
def assemble_content_type(type, subtype, parameters):
if not parameters:
return "{}/{}".format(type, subtype)
params = "; ".join(
"{}={}".format(k, v)
for k, v in parameters.items()
)
return "{}/{}; {}".format(
type, subtype, params
)