mitmproxy/netlib/http/headers.py

222 lines
6.7 KiB
Python
Raw Normal View History

import re
2016-07-16 06:46:12 +00:00
import collections
from netlib import multidict
from mitmproxy.utils import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
2016-05-28 20:17:02 +00:00
# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
def _native(x):
return x.decode("utf-8", "surrogateescape")
2016-05-28 20:17:02 +00:00
def _always_bytes(x):
return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):
"""
Header class which allows both convenient access to individual headers as well as
direct access to the underlying raw data. Provides a full dictionary interface.
Example:
.. code-block:: python
2015-09-26 15:39:50 +00:00
# Create headers with keyword arguments
>>> h = Headers(host="example.com", content_type="application/xml")
# Headers mostly behave like a normal dict.
>>> h["Host"]
"example.com"
# HTTP Headers are case insensitive
>>> h["host"]
"example.com"
# Headers can also be created from a list of raw (header_name, header_value) byte tuples
2015-09-26 15:39:50 +00:00
>>> h = Headers([
2016-05-20 16:37:13 +00:00
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
2015-09-26 15:39:50 +00:00
])
# Multiple headers are folded into a single header as per RFC7230
>>> h["Accept"]
"text/html, application/xml"
# Setting a header removes all existing headers with the same name.
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
2015-09-26 15:39:50 +00:00
# bytes(h) returns a HTTP1 header block.
>>> print(bytes(h))
Host: example.com
Accept: application/text
# For full control, the raw header fields can be accessed
>>> h.fields
Caveats:
For use with the "Set-Cookie" header, see :py:meth:`get_all`.
"""
2016-05-29 02:31:43 +00:00
def __init__(self, fields=(), **headers):
"""
Args:
2015-09-26 15:39:50 +00:00
fields: (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
**headers: Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
If ``**headers`` contains multiple keys that have equal ``.lower()`` s,
the behavior is undefined.
"""
2016-10-17 04:34:46 +00:00
super().__init__(fields)
for key, value in self.fields:
if not isinstance(key, bytes) or not isinstance(value, bytes):
raise TypeError("Header fields must be bytes.")
# content_type -> content-type
headers = {
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
for name, value in headers.items()
2016-05-29 11:33:20 +00:00
}
self.update(headers)
@staticmethod
def _reduce_values(values):
# Headers can be folded
return ", ".join(values)
@staticmethod
def _kconv(key):
# Headers are case-insensitive
return key.lower()
def __bytes__(self):
if self.fields:
return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
else:
return b""
def __delitem__(self, key):
key = _always_bytes(key)
2016-10-17 04:34:46 +00:00
super().__delitem__(key)
def __iter__(self):
2016-10-17 04:34:46 +00:00
for x in super().__iter__():
yield _native(x)
def get_all(self, name):
"""
Like :py:meth:`get`, but does not fold multiple headers into a single one.
This is useful for Set-Cookie headers, which do not support folding.
See also: https://tools.ietf.org/html/rfc7230#section-3.2.2
"""
name = _always_bytes(name)
return [
_native(x) for x in
2016-10-17 04:34:46 +00:00
super().get_all(name)
]
def set_all(self, name, values):
"""
Explicitly set multiple headers for the given key.
See: :py:meth:`get_all`
"""
name = _always_bytes(name)
values = [_always_bytes(x) for x in values]
2016-10-17 04:34:46 +00:00
return super().set_all(name, values)
def insert(self, index, key, value):
key = _always_bytes(key)
value = _always_bytes(value)
2016-10-17 04:34:46 +00:00
super().insert(index, key, value)
def items(self, multi=False):
if multi:
return (
(_native(k), _native(v))
for k, v in self.fields
)
else:
2016-10-17 04:34:46 +00:00
return super().items()
def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in each "name: value"
header line.
Returns:
The number of replacements made.
"""
if isinstance(pattern, str):
2016-07-01 21:10:48 +00:00
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, str):
2016-07-01 21:10:48 +00:00
repl = strutils.escaped_str_to_bytes(repl)
pattern = re.compile(pattern, flags)
replacements = 0
2016-08-31 10:41:59 +00:00
flag_count = count > 0
fields = []
for name, value in self.fields:
line, n = pattern.subn(repl, name + b": " + value, count=count)
try:
name, value = line.split(b": ", 1)
except ValueError:
# We get a ValueError if the replacement removed the ": "
# There's not much we can do about this, so we just keep the header as-is.
pass
else:
replacements += n
2016-08-31 10:41:59 +00:00
if flag_count:
count -= n
if count == 0:
2016-08-31 11:09:04 +00:00
break
fields.append((name, value))
self.fields = tuple(fields)
return replacements
2016-05-31 07:58:28 +00:00
def parse_content_type(c):
"""
A simple parser for content-type values. Returns a (type, subtype,
parameters) tuple, where type and subtype are strings, and parameters
is a dict. If the string could not be parsed, return None.
E.g. the following string:
text/html; charset=UTF-8
Returns:
("text", "html", {"charset": "UTF-8"})
"""
parts = c.split(";", 1)
ts = parts[0].split("/", 1)
if len(ts) != 2:
return None
2016-07-16 06:46:12 +00:00
d = collections.OrderedDict()
if len(parts) == 2:
for i in parts[1].split(";"):
clause = i.split("=", 1)
if len(clause) == 2:
d[clause[0].strip()] = clause[1].strip()
return ts[0].lower(), ts[1].lower(), d
2016-07-16 05:50:33 +00:00
def assemble_content_type(type, subtype, parameters):
if not parameters:
return "{}/{}".format(type, subtype)
params = "; ".join(
"{}={}".format(k, v)
for k, v in parameters.items()
)
return "{}/{}; {}".format(
type, subtype, params
)