mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-29 11:03:13 +00:00
Merge branch 'master' into no-dev-scripts
This commit is contained in:
commit
0a4e2c9b59
@ -41,6 +41,8 @@ If you depend on these features, please raise your voice in
|
|||||||
* mitmproxy's command line interface now supports Windows (@mhils)
|
* mitmproxy's command line interface now supports Windows (@mhils)
|
||||||
* The `clientconnect`, `clientdisconnect`, `serverconnect`, `serverdisconnect`, and `log`
|
* The `clientconnect`, `clientdisconnect`, `serverconnect`, `serverdisconnect`, and `log`
|
||||||
events have been replaced with new events, see addon documentation for details (@mhils)
|
events have been replaced with new events, see addon documentation for details (@mhils)
|
||||||
|
* Contentviews now implement `render_priority` instead of `should_render`, allowing more specialization (@mhils)
|
||||||
|
* Automatic JSON view mode when `+json` suffix in content type (@kam800)
|
||||||
* Use pyca/cryptography to generate certificates, not pyOpenSSL (@mhils)
|
* Use pyca/cryptography to generate certificates, not pyOpenSSL (@mhils)
|
||||||
* Remove the legacy protocol stack (@Kriechi)
|
* Remove the legacy protocol stack (@Kriechi)
|
||||||
* Remove all deprecated pathod and pathoc tools and modules (@Kriechi)
|
* Remove all deprecated pathod and pathoc tools and modules (@Kriechi)
|
||||||
|
@ -5,16 +5,32 @@ This example shows how one can add a custom contentview to mitmproxy,
|
|||||||
which is used to pretty-print HTTP bodies for example.
|
which is used to pretty-print HTTP bodies for example.
|
||||||
The content view API is explained in the mitmproxy.contentviews module.
|
The content view API is explained in the mitmproxy.contentviews module.
|
||||||
"""
|
"""
|
||||||
from mitmproxy import contentviews
|
from typing import Optional
|
||||||
|
|
||||||
|
from mitmproxy import contentviews, flow
|
||||||
|
from mitmproxy.net import http
|
||||||
|
|
||||||
|
|
||||||
class ViewSwapCase(contentviews.View):
|
class ViewSwapCase(contentviews.View):
|
||||||
name = "swapcase"
|
name = "swapcase"
|
||||||
content_types = ["text/plain"]
|
|
||||||
|
|
||||||
def __call__(self, data, **metadata) -> contentviews.TViewResult:
|
def __call__(self, data, **metadata) -> contentviews.TViewResult:
|
||||||
return "case-swapped text", contentviews.format_text(data.swapcase())
|
return "case-swapped text", contentviews.format_text(data.swapcase())
|
||||||
|
|
||||||
|
def render_priority(
|
||||||
|
self,
|
||||||
|
data: bytes,
|
||||||
|
*,
|
||||||
|
content_type: Optional[str] = None,
|
||||||
|
flow: Optional[flow.Flow] = None,
|
||||||
|
http_message: Optional[http.Message] = None,
|
||||||
|
**unknown_metadata,
|
||||||
|
) -> float:
|
||||||
|
if content_type == "text/plain":
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
view = ViewSwapCase()
|
view = ViewSwapCase()
|
||||||
|
|
||||||
|
@ -1,19 +1,20 @@
|
|||||||
import itertools
|
import itertools
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
from typing import Optional, TextIO
|
from typing import Optional, TextIO, Union
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from mitmproxy import contentviews
|
from mitmproxy import contentviews
|
||||||
from mitmproxy import ctx
|
from mitmproxy import ctx
|
||||||
from mitmproxy import flow
|
|
||||||
from mitmproxy import exceptions
|
from mitmproxy import exceptions
|
||||||
from mitmproxy import flowfilter
|
from mitmproxy import flowfilter
|
||||||
from mitmproxy import http
|
from mitmproxy import http
|
||||||
from mitmproxy.net import http as net_http
|
from mitmproxy.net import http as net_http
|
||||||
|
from mitmproxy.tcp import TCPFlow, TCPMessage
|
||||||
from mitmproxy.utils import human
|
from mitmproxy.utils import human
|
||||||
from mitmproxy.utils import strutils
|
from mitmproxy.utils import strutils
|
||||||
|
from mitmproxy.websocket import WebSocketFlow, WebSocketMessage
|
||||||
|
|
||||||
|
|
||||||
def indent(n: int, text: str) -> str:
|
def indent(n: int, text: str) -> str:
|
||||||
@ -94,7 +95,11 @@ class Dumper:
|
|||||||
self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4)
|
self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4)
|
||||||
self._echo_headers(trailers)
|
self._echo_headers(trailers)
|
||||||
|
|
||||||
def _echo_message(self, message, flow: flow.Flow):
|
def _echo_message(
|
||||||
|
self,
|
||||||
|
message: Union[net_http.Message, TCPMessage, WebSocketMessage],
|
||||||
|
flow: Union[http.HTTPFlow, TCPFlow, WebSocketFlow]
|
||||||
|
):
|
||||||
_, lines, error = contentviews.get_message_content_view(
|
_, lines, error = contentviews.get_message_content_view(
|
||||||
ctx.options.dumper_default_contentview,
|
ctx.options.dumper_default_contentview,
|
||||||
message,
|
message,
|
||||||
@ -165,7 +170,7 @@ class Dumper:
|
|||||||
|
|
||||||
http_version = ""
|
http_version = ""
|
||||||
if (
|
if (
|
||||||
flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0")
|
not (flow.request.is_http10 or flow.request.is_http11)
|
||||||
or flow.request.http_version != getattr(flow.response, "http_version", "HTTP/1.1")
|
or flow.request.http_version != getattr(flow.response, "http_version", "HTTP/1.1")
|
||||||
):
|
):
|
||||||
# Hide version for h1 <-> h1 connections.
|
# Hide version for h1 <-> h1 connections.
|
||||||
@ -215,7 +220,7 @@ class Dumper:
|
|||||||
|
|
||||||
http_version = ""
|
http_version = ""
|
||||||
if (
|
if (
|
||||||
flow.response.http_version not in ("HTTP/1.1", "HTTP/1.0")
|
not (flow.response.is_http10 or flow.response.is_http11)
|
||||||
or flow.request.http_version != flow.response.http_version
|
or flow.request.http_version != flow.response.http_version
|
||||||
):
|
):
|
||||||
# Hide version for h1 <-> h1 connections.
|
# Hide version for h1 <-> h1 connections.
|
||||||
@ -226,7 +231,8 @@ class Dumper:
|
|||||||
# This aligns the HTTP response code with the HTTP request method:
|
# This aligns the HTTP response code with the HTTP request method:
|
||||||
# 127.0.0.1:59519: GET http://example.com/
|
# 127.0.0.1:59519: GET http://example.com/
|
||||||
# << 304 Not Modified 0b
|
# << 304 Not Modified 0b
|
||||||
pad = max(0, len(human.format_address(flow.client_conn.peername)) - (2 + len(http_version) + len(replay_str)))
|
pad = max(0,
|
||||||
|
len(human.format_address(flow.client_conn.peername)) - (2 + len(http_version) + len(replay_str)))
|
||||||
arrows = " " * pad + arrows
|
arrows = " " * pad + arrows
|
||||||
|
|
||||||
self.echo(f"{replay}{arrows} {http_version}{code} {reason} {size}")
|
self.echo(f"{replay}{arrows} {http_version}{code} {reason} {size}")
|
||||||
|
@ -3,21 +3,19 @@ Mitmproxy Content Views
|
|||||||
=======================
|
=======================
|
||||||
|
|
||||||
mitmproxy includes a set of content views which can be used to
|
mitmproxy includes a set of content views which can be used to
|
||||||
format/decode/highlight data. While they are currently used for HTTP message
|
format/decode/highlight data. While they are mostly used for HTTP message
|
||||||
bodies only, the may be used in other contexts in the future, e.g. to decode
|
bodies, the may be used in other contexts, e.g. to decode WebSocket messages.
|
||||||
protobuf messages sent as WebSocket frames.
|
|
||||||
|
|
||||||
Thus, the View API is very minimalistic. The only arguments are `data` and
|
Thus, the View API is very minimalistic. The only arguments are `data` and
|
||||||
`**metadata`, where `data` is the actual content (as bytes). The contents on
|
`**metadata`, where `data` is the actual content (as bytes). The contents on
|
||||||
metadata depend on the protocol in use. For HTTP, the message headers and
|
metadata depend on the protocol in use. Known attributes can be found in
|
||||||
message trailers are passed as the ``headers`` and ``trailers`` keyword
|
`base.View`.
|
||||||
argument. For HTTP requests, the query parameters are passed as the ``query``
|
|
||||||
keyword argument.
|
|
||||||
"""
|
"""
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Dict, Optional # noqa
|
from typing import List, Union
|
||||||
from typing import List # noqa
|
from typing import Optional
|
||||||
|
|
||||||
|
from mitmproxy import flow
|
||||||
from mitmproxy.net import http
|
from mitmproxy.net import http
|
||||||
from mitmproxy.utils import strutils
|
from mitmproxy.utils import strutils
|
||||||
from . import (
|
from . import (
|
||||||
@ -25,9 +23,11 @@ from . import (
|
|||||||
urlencoded, multipart, image, query, protobuf, msgpack
|
urlencoded, multipart, image, query, protobuf, msgpack
|
||||||
)
|
)
|
||||||
from .base import View, KEY_MAX, format_text, format_dict, TViewResult
|
from .base import View, KEY_MAX, format_text, format_dict, TViewResult
|
||||||
|
from ..http import HTTPFlow
|
||||||
|
from ..tcp import TCPMessage, TCPFlow
|
||||||
|
from ..websocket import WebSocketMessage, WebSocketFlow
|
||||||
|
|
||||||
views: List[View] = []
|
views: List[View] = []
|
||||||
content_types_map: Dict[str, List[View]] = {}
|
|
||||||
|
|
||||||
|
|
||||||
def get(name: str) -> Optional[View]:
|
def get(name: str) -> Optional[View]:
|
||||||
@ -45,19 +45,8 @@ def add(view: View) -> None:
|
|||||||
|
|
||||||
views.append(view)
|
views.append(view)
|
||||||
|
|
||||||
for ct in view.content_types:
|
|
||||||
l = content_types_map.setdefault(ct, [])
|
|
||||||
l.append(view)
|
|
||||||
|
|
||||||
|
|
||||||
def remove(view: View) -> None:
|
def remove(view: View) -> None:
|
||||||
for ct in view.content_types:
|
|
||||||
l = content_types_map.setdefault(ct, [])
|
|
||||||
l.remove(view)
|
|
||||||
|
|
||||||
if not len(l):
|
|
||||||
del content_types_map[ct]
|
|
||||||
|
|
||||||
views.remove(view)
|
views.remove(view)
|
||||||
|
|
||||||
|
|
||||||
@ -75,16 +64,24 @@ def safe_to_print(lines, encoding="utf8"):
|
|||||||
yield clean_line
|
yield clean_line
|
||||||
|
|
||||||
|
|
||||||
def get_message_content_view(viewname, message, flow):
|
def get_message_content_view(
|
||||||
|
viewname: str,
|
||||||
|
message: Union[http.Message, TCPMessage, WebSocketMessage],
|
||||||
|
flow: Union[HTTPFlow, TCPFlow, WebSocketFlow],
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Like get_content_view, but also handles message encoding.
|
Like get_content_view, but also handles message encoding.
|
||||||
"""
|
"""
|
||||||
viewmode = get(viewname)
|
viewmode = get(viewname)
|
||||||
if not viewmode:
|
if not viewmode:
|
||||||
viewmode = get("auto")
|
viewmode = get("auto")
|
||||||
|
assert viewmode
|
||||||
|
|
||||||
|
content: Optional[bytes]
|
||||||
try:
|
try:
|
||||||
content = message.content
|
content = message.content # type: ignore
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
assert isinstance(message, http.Message)
|
||||||
content = message.raw_content
|
content = message.raw_content
|
||||||
enc = "[cannot decode]"
|
enc = "[cannot decode]"
|
||||||
else:
|
else:
|
||||||
@ -93,22 +90,24 @@ def get_message_content_view(viewname, message, flow):
|
|||||||
message.headers.get("content-encoding")
|
message.headers.get("content-encoding")
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
enc = None
|
enc = ""
|
||||||
|
|
||||||
if content is None:
|
if content is None:
|
||||||
return "", iter([[("error", "content missing")]]), None
|
return "", iter([[("error", "content missing")]]), None
|
||||||
|
|
||||||
metadata = {}
|
content_type = None
|
||||||
if isinstance(message, http.Request):
|
http_message = None
|
||||||
metadata["query"] = message.query
|
|
||||||
if isinstance(message, http.Message):
|
if isinstance(message, http.Message):
|
||||||
metadata["headers"] = message.headers
|
http_message = message
|
||||||
metadata["trailers"] = message.trailers
|
if ctype := message.headers.get("content-type"):
|
||||||
metadata["message"] = message
|
if ct := http.parse_content_type(ctype):
|
||||||
metadata["flow"] = flow
|
content_type = f"{ct[0]}/{ct[1]}"
|
||||||
|
|
||||||
description, lines, error = get_content_view(
|
description, lines, error = get_content_view(
|
||||||
viewmode, content, **metadata
|
viewmode, content,
|
||||||
|
content_type=content_type,
|
||||||
|
flow=flow,
|
||||||
|
http_message=http_message,
|
||||||
)
|
)
|
||||||
|
|
||||||
if enc:
|
if enc:
|
||||||
@ -117,7 +116,11 @@ def get_message_content_view(viewname, message, flow):
|
|||||||
return description, lines, error
|
return description, lines, error
|
||||||
|
|
||||||
|
|
||||||
def get_tcp_content_view(viewname: str, data: bytes):
|
def get_tcp_content_view(
|
||||||
|
viewname: str,
|
||||||
|
data: bytes,
|
||||||
|
flow: TCPFlow,
|
||||||
|
):
|
||||||
viewmode = get(viewname)
|
viewmode = get(viewname)
|
||||||
if not viewmode:
|
if not viewmode:
|
||||||
viewmode = get("auto")
|
viewmode = get("auto")
|
||||||
@ -125,12 +128,19 @@ def get_tcp_content_view(viewname: str, data: bytes):
|
|||||||
# https://github.com/mitmproxy/mitmproxy/pull/3970#issuecomment-623024447
|
# https://github.com/mitmproxy/mitmproxy/pull/3970#issuecomment-623024447
|
||||||
assert viewmode
|
assert viewmode
|
||||||
|
|
||||||
description, lines, error = get_content_view(viewmode, data)
|
description, lines, error = get_content_view(viewmode, data, flow=flow)
|
||||||
|
|
||||||
return description, lines, error
|
return description, lines, error
|
||||||
|
|
||||||
|
|
||||||
def get_content_view(viewmode: View, data: bytes, **metadata):
|
def get_content_view(
|
||||||
|
viewmode: View,
|
||||||
|
data: bytes,
|
||||||
|
*,
|
||||||
|
content_type: Optional[str] = None,
|
||||||
|
flow: Optional[flow.Flow] = None,
|
||||||
|
http_message: Optional[http.Message] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
viewmode: the view to use.
|
viewmode: the view to use.
|
||||||
@ -143,9 +153,11 @@ def get_content_view(viewmode: View, data: bytes, **metadata):
|
|||||||
In contrast to calling the views directly, text is always safe-to-print unicode.
|
In contrast to calling the views directly, text is always safe-to-print unicode.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
ret = viewmode(data, **metadata)
|
ret = viewmode(data, content_type=content_type, flow=flow, http_message=http_message)
|
||||||
if ret is None:
|
if ret is None:
|
||||||
ret = "Couldn't parse: falling back to Raw", get("Raw")(data, **metadata)[1]
|
ret = "Couldn't parse: falling back to Raw", get("Raw")(
|
||||||
|
data, content_type=content_type, flow=flow, http_message=http_message
|
||||||
|
)[1]
|
||||||
desc, content = ret
|
desc, content = ret
|
||||||
error = None
|
error = None
|
||||||
# Third-party viewers can fail in unexpected ways...
|
# Third-party viewers can fail in unexpected ways...
|
||||||
@ -153,11 +165,8 @@ def get_content_view(viewmode: View, data: bytes, **metadata):
|
|||||||
desc = "Couldn't parse: falling back to Raw"
|
desc = "Couldn't parse: falling back to Raw"
|
||||||
raw = get("Raw")
|
raw = get("Raw")
|
||||||
assert raw
|
assert raw
|
||||||
content = raw(data, **metadata)[1]
|
content = raw(data, content_type=content_type, flow=flow, http_message=http_message)[1]
|
||||||
error = "{} Content viewer failed: \n{}".format(
|
error = f"{getattr(viewmode, 'name')} content viewer failed: \n{traceback.format_exc()}"
|
||||||
getattr(viewmode, "name"),
|
|
||||||
traceback.format_exc()
|
|
||||||
)
|
|
||||||
|
|
||||||
return desc, safe_to_print(content), error
|
return desc, safe_to_print(content), error
|
||||||
|
|
||||||
|
@ -1,6 +1,4 @@
|
|||||||
from mitmproxy import contentviews
|
from mitmproxy import contentviews
|
||||||
from mitmproxy.net import http
|
|
||||||
from mitmproxy.utils import strutils
|
|
||||||
from . import base
|
from . import base
|
||||||
|
|
||||||
|
|
||||||
@ -8,21 +6,15 @@ class ViewAuto(base.View):
|
|||||||
name = "Auto"
|
name = "Auto"
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
headers = metadata.get("headers", {})
|
# TODO: The auto view has little justification now that views implement render_priority,
|
||||||
ctype = headers.get("content-type")
|
# but we keep it around for now to not touch more parts.
|
||||||
if data and ctype:
|
priority, view = max(
|
||||||
ct = http.parse_content_type(ctype) if ctype else None
|
(v.render_priority(data, **metadata), v)
|
||||||
ct = "{}/{}".format(ct[0], ct[1])
|
for v in contentviews.views
|
||||||
if ct in contentviews.content_types_map:
|
)
|
||||||
return contentviews.content_types_map[ct][0](data, **metadata)
|
if priority == 0 and not data:
|
||||||
elif strutils.is_xml(data):
|
|
||||||
return contentviews.get("XML/HTML")(data, **metadata)
|
|
||||||
elif ct.startswith("image/"):
|
|
||||||
return contentviews.get("Image")(data, **metadata)
|
|
||||||
if metadata.get("query"):
|
|
||||||
return contentviews.get("Query")(data, **metadata)
|
|
||||||
if data and strutils.is_mostly_bin(data):
|
|
||||||
return contentviews.get("Hex")(data)
|
|
||||||
if not data:
|
|
||||||
return "No content", []
|
return "No content", []
|
||||||
return contentviews.get("Raw")(data)
|
return view(data, **metadata)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, **metadata) -> float:
|
||||||
|
return -1 # don't recurse.
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
# Default view cutoff *in lines*
|
# Default view cutoff *in lines*
|
||||||
import typing
|
import typing
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from mitmproxy import flow
|
||||||
|
from mitmproxy.net import http
|
||||||
|
|
||||||
KEY_MAX = 30
|
KEY_MAX = 30
|
||||||
|
|
||||||
@ -8,37 +12,62 @@ TViewLine = typing.List[typing.Tuple[str, TTextType]]
|
|||||||
TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
|
TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
|
||||||
|
|
||||||
|
|
||||||
class View:
|
class View(ABC):
|
||||||
name: typing.ClassVar[str]
|
name: typing.ClassVar[str]
|
||||||
content_types: typing.ClassVar[typing.List[str]] = []
|
|
||||||
|
|
||||||
def __call__(self, data: bytes, **metadata) -> TViewResult:
|
@abstractmethod
|
||||||
|
def __call__(
|
||||||
|
self,
|
||||||
|
data: bytes,
|
||||||
|
*,
|
||||||
|
content_type: typing.Optional[str] = None,
|
||||||
|
flow: typing.Optional[flow.Flow] = None,
|
||||||
|
http_message: typing.Optional[http.Message] = None,
|
||||||
|
**unknown_metadata,
|
||||||
|
) -> TViewResult:
|
||||||
"""
|
"""
|
||||||
Transform raw data into human-readable output.
|
Transform raw data into human-readable output.
|
||||||
|
|
||||||
Args:
|
Returns a (description, content generator) tuple.
|
||||||
data: the data to decode/format.
|
|
||||||
metadata: optional keyword-only arguments for metadata. Implementations must not
|
|
||||||
rely on a given argument being present.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A (description, content generator) tuple.
|
|
||||||
|
|
||||||
The content generator yields lists of (style, text) tuples, where each list represents
|
The content generator yields lists of (style, text) tuples, where each list represents
|
||||||
a single line. ``text`` is a unfiltered byte string which may need to be escaped,
|
a single line. ``text`` is a unfiltered string which may need to be escaped,
|
||||||
depending on the used output.
|
depending on the used output. For example, it may contain terminal control sequences
|
||||||
|
or unfiltered HTML.
|
||||||
|
|
||||||
Caveats:
|
Except for `data`, implementations must not rely on any given argument to be present.
|
||||||
The content generator must not yield tuples of tuples,
|
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
|
||||||
because urwid cannot process that. You have to yield a *list* of tuples per line.
|
|
||||||
|
The content generator must not yield tuples of tuples, because urwid cannot process that.
|
||||||
|
You have to yield a *list* of tuples per line.
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError() # pragma: no cover
|
||||||
|
|
||||||
|
def render_priority(
|
||||||
|
self,
|
||||||
|
data: bytes,
|
||||||
|
*,
|
||||||
|
content_type: typing.Optional[str] = None,
|
||||||
|
flow: typing.Optional[flow.Flow] = None,
|
||||||
|
http_message: typing.Optional[http.Message] = None,
|
||||||
|
**unknown_metadata,
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Return the priority of this view for rendering `data`.
|
||||||
|
If no particular view is chosen by the user, the view with the highest priority is selected.
|
||||||
|
|
||||||
|
Except for `data`, implementations must not rely on any given argument to be present.
|
||||||
|
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
|
||||||
|
"""
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
assert isinstance(other, View)
|
||||||
|
return self.name.__lt__(other.name)
|
||||||
|
|
||||||
|
|
||||||
def format_pairs(
|
def format_pairs(
|
||||||
items: typing.Iterable[typing.Tuple[TTextType, TTextType]]
|
items: typing.Iterable[typing.Tuple[TTextType, TTextType]]
|
||||||
) -> typing.Iterator[TViewLine]:
|
) -> typing.Iterator[TViewLine]:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Helper function that accepts a list of (k,v) pairs into a list of
|
Helper function that accepts a list of (k,v) pairs into a list of
|
||||||
[
|
[
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.contentviews import base
|
from mitmproxy.contentviews import base
|
||||||
from mitmproxy.utils import strutils
|
from mitmproxy.utils import strutils
|
||||||
@ -50,15 +51,15 @@ def beautify(data: str, indent: str = " "):
|
|||||||
|
|
||||||
class ViewCSS(base.View):
|
class ViewCSS(base.View):
|
||||||
name = "CSS"
|
name = "CSS"
|
||||||
content_types = [
|
|
||||||
"text/css"
|
|
||||||
]
|
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
data = data.decode("utf8", "surrogateescape")
|
data = data.decode("utf8", "surrogateescape")
|
||||||
beautified = beautify(data)
|
beautified = beautify(data)
|
||||||
return "CSS", base.format_text(beautified)
|
return "CSS", base.format_text(beautified)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type == "text/css")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": # pragma: no cover
|
if __name__ == "__main__": # pragma: no cover
|
||||||
with open("../tools/web/static/vendor.css") as f:
|
with open("../tools/web/static/vendor.css") as f:
|
||||||
|
@ -16,3 +16,6 @@ class ViewHex(base.View):
|
|||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
return "Hex", self._format(data)
|
return "Hex", self._format(data)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, **metadata) -> float:
|
||||||
|
return 0.2 * strutils.is_mostly_bin(data)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import imghdr
|
import imghdr
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.contentviews import base
|
from mitmproxy.contentviews import base
|
||||||
from mitmproxy.coretypes import multidict
|
from mitmproxy.coretypes import multidict
|
||||||
@ -16,16 +17,6 @@ imghdr.tests.append(test_ico)
|
|||||||
class ViewImage(base.View):
|
class ViewImage(base.View):
|
||||||
name = "Image"
|
name = "Image"
|
||||||
|
|
||||||
# there is also a fallback in the auto view for image/*.
|
|
||||||
content_types = [
|
|
||||||
"image/png",
|
|
||||||
"image/jpeg",
|
|
||||||
"image/gif",
|
|
||||||
"image/vnd.microsoft.icon",
|
|
||||||
"image/x-icon",
|
|
||||||
"image/webp",
|
|
||||||
]
|
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
image_type = imghdr.what('', h=data)
|
image_type = imghdr.what('', h=data)
|
||||||
if image_type == 'png':
|
if image_type == 'png':
|
||||||
@ -45,3 +36,10 @@ class ViewImage(base.View):
|
|||||||
else:
|
else:
|
||||||
view_name = "Unknown Image"
|
view_name = "Unknown Image"
|
||||||
return view_name, base.format_dict(multidict.MultiDict(image_metadata))
|
return view_name, base.format_dict(multidict.MultiDict(image_metadata))
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(bool(
|
||||||
|
content_type
|
||||||
|
and content_type.startswith("image/")
|
||||||
|
and content_type != "image/svg+xml"
|
||||||
|
))
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import io
|
import io
|
||||||
import re
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.utils import strutils
|
from mitmproxy.utils import strutils
|
||||||
from mitmproxy.contentviews import base
|
from mitmproxy.contentviews import base
|
||||||
@ -46,13 +47,16 @@ def beautify(data):
|
|||||||
|
|
||||||
class ViewJavaScript(base.View):
|
class ViewJavaScript(base.View):
|
||||||
name = "JavaScript"
|
name = "JavaScript"
|
||||||
content_types = [
|
__content_types = (
|
||||||
"application/x-javascript",
|
"application/x-javascript",
|
||||||
"application/javascript",
|
"application/javascript",
|
||||||
"text/javascript"
|
"text/javascript"
|
||||||
]
|
)
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
data = data.decode("utf-8", "replace")
|
data = data.decode("utf-8", "replace")
|
||||||
res = beautify(data)
|
res = beautify(data)
|
||||||
return "JavaScript", base.format_text(res)
|
return "JavaScript", base.format_text(res)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type in self.__content_types)
|
||||||
|
@ -38,13 +38,18 @@ def format_json(data: typing.Any) -> typing.Iterator[base.TViewLine]:
|
|||||||
|
|
||||||
class ViewJSON(base.View):
|
class ViewJSON(base.View):
|
||||||
name = "JSON"
|
name = "JSON"
|
||||||
content_types = [
|
|
||||||
"application/json",
|
|
||||||
"application/json-rpc",
|
|
||||||
"application/vnd.api+json"
|
|
||||||
]
|
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
data = parse_json(data)
|
data = parse_json(data)
|
||||||
if data is not PARSE_ERROR:
|
if data is not PARSE_ERROR:
|
||||||
return "JSON", format_json(data)
|
return "JSON", format_json(data)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: typing.Optional[str] = None, **metadata) -> float:
|
||||||
|
if content_type in (
|
||||||
|
"application/json",
|
||||||
|
"application/json-rpc",
|
||||||
|
):
|
||||||
|
return 1
|
||||||
|
if content_type and content_type.startswith("application/") and content_type.endswith("+json"):
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
@ -39,12 +39,15 @@ def format_msgpack(data):
|
|||||||
|
|
||||||
class ViewMsgPack(base.View):
|
class ViewMsgPack(base.View):
|
||||||
name = "MsgPack"
|
name = "MsgPack"
|
||||||
content_types = [
|
__content_types = (
|
||||||
"application/msgpack",
|
"application/msgpack",
|
||||||
"application/x-msgpack",
|
"application/x-msgpack",
|
||||||
]
|
)
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
data = parse_msgpack(data)
|
data = parse_msgpack(data)
|
||||||
if data is not PARSE_ERROR:
|
if data is not PARSE_ERROR:
|
||||||
return "MsgPack", format_msgpack(data)
|
return "MsgPack", format_msgpack(data)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: typing.Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type in self.__content_types)
|
||||||
|
@ -1,19 +1,24 @@
|
|||||||
from mitmproxy.net import http
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.coretypes import multidict
|
from mitmproxy.coretypes import multidict
|
||||||
|
from mitmproxy.net import http
|
||||||
from . import base
|
from . import base
|
||||||
|
|
||||||
|
|
||||||
class ViewMultipart(base.View):
|
class ViewMultipart(base.View):
|
||||||
name = "Multipart Form"
|
name = "Multipart Form"
|
||||||
content_types = ["multipart/form-data"]
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _format(v):
|
def _format(v):
|
||||||
yield [("highlight", "Form data:\n")]
|
yield [("highlight", "Form data:\n")]
|
||||||
yield from base.format_dict(multidict.MultiDict(v))
|
yield from base.format_dict(multidict.MultiDict(v))
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data: bytes, content_type: Optional[str] = None, **metadata):
|
||||||
headers = metadata.get("headers", {})
|
if content_type is None:
|
||||||
v = http.multipart.decode(headers, data)
|
return
|
||||||
|
v = http.multipart.decode(content_type, data)
|
||||||
if v:
|
if v:
|
||||||
return "Multipart form", self._format(v)
|
return "Multipart form", self._format(v)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type == "multipart/form-data")
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import io
|
import io
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
from kaitaistruct import KaitaiStream
|
from kaitaistruct import KaitaiStream
|
||||||
from . import base
|
from . import base
|
||||||
@ -66,7 +67,7 @@ class ViewProtobuf(base.View):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
name = "Protocol Buffer"
|
name = "Protocol Buffer"
|
||||||
content_types = [
|
__content_types = [
|
||||||
"application/x-protobuf",
|
"application/x-protobuf",
|
||||||
"application/x-protobuffer",
|
"application/x-protobuffer",
|
||||||
]
|
]
|
||||||
@ -77,3 +78,6 @@ class ViewProtobuf(base.View):
|
|||||||
raise ValueError("Failed to parse input.")
|
raise ValueError("Failed to parse input.")
|
||||||
|
|
||||||
return "Protobuf", base.format_text(decoded)
|
return "Protobuf", base.format_text(decoded)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type in self.__content_types)
|
||||||
|
@ -1,14 +1,18 @@
|
|||||||
from typing import List # noqa
|
from typing import Optional
|
||||||
|
|
||||||
from . import base
|
from . import base
|
||||||
|
from ..net import http
|
||||||
|
|
||||||
|
|
||||||
class ViewQuery(base.View):
|
class ViewQuery(base.View):
|
||||||
name = "Query"
|
name = "Query"
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data: bytes, http_message: Optional[http.Message] = None, **metadata):
|
||||||
query = metadata.get("query")
|
query = getattr(http_message, "query", None)
|
||||||
if query:
|
if query:
|
||||||
return "Query", base.format_pairs(query.items(multi=True))
|
return "Query", base.format_pairs(query.items(multi=True))
|
||||||
else:
|
else:
|
||||||
return "Query", base.format_text("")
|
return "Query", base.format_text("")
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, http_message: Optional[http.Message] = None, **metadata) -> float:
|
||||||
|
return 0.3 * float(bool(getattr(http_message, "query", False)))
|
||||||
|
@ -9,3 +9,6 @@ class ViewRaw(base.View):
|
|||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
return "Raw", base.format_text(strutils.bytes_to_escaped_str(data, True))
|
return "Raw", base.format_text(strutils.bytes_to_escaped_str(data, True))
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, **metadata) -> float:
|
||||||
|
return 0.1 * float(bool(data))
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.net.http import url
|
from mitmproxy.net.http import url
|
||||||
from . import base
|
from . import base
|
||||||
|
|
||||||
|
|
||||||
class ViewURLEncoded(base.View):
|
class ViewURLEncoded(base.View):
|
||||||
name = "URL-encoded"
|
name = "URL-encoded"
|
||||||
content_types = ["application/x-www-form-urlencoded"]
|
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
try:
|
try:
|
||||||
@ -13,3 +14,6 @@ class ViewURLEncoded(base.View):
|
|||||||
return None
|
return None
|
||||||
d = url.decode(data)
|
d = url.decode(data)
|
||||||
return "URLEncoded form", base.format_pairs(d)
|
return "URLEncoded form", base.format_pairs(d)
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type == "application/x-www-form-urlencoded")
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
from mitmproxy.contrib.wbxml import ASCommandResponse
|
from mitmproxy.contrib.wbxml import ASCommandResponse
|
||||||
from . import base
|
from . import base
|
||||||
|
|
||||||
|
|
||||||
class ViewWBXML(base.View):
|
class ViewWBXML(base.View):
|
||||||
name = "WBXML"
|
name = "WBXML"
|
||||||
content_types = [
|
__content_types = (
|
||||||
"application/vnd.wap.wbxml",
|
"application/vnd.wap.wbxml",
|
||||||
"application/vnd.ms-sync.wbxml"
|
"application/vnd.ms-sync.wbxml"
|
||||||
]
|
)
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
try:
|
try:
|
||||||
@ -17,3 +19,6 @@ class ViewWBXML(base.View):
|
|||||||
return "WBXML", base.format_text(parsedContent)
|
return "WBXML", base.format_text(parsedContent)
|
||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
return float(content_type in self.__content_types)
|
||||||
|
@ -4,7 +4,7 @@ import textwrap
|
|||||||
from typing import Iterable, Optional
|
from typing import Iterable, Optional
|
||||||
|
|
||||||
from mitmproxy.contentviews import base
|
from mitmproxy.contentviews import base
|
||||||
from mitmproxy.utils import sliding_window
|
from mitmproxy.utils import sliding_window, strutils
|
||||||
|
|
||||||
"""
|
"""
|
||||||
A custom XML/HTML prettifier. Compared to other prettifiers, its main features are:
|
A custom XML/HTML prettifier. Compared to other prettifiers, its main features are:
|
||||||
@ -214,7 +214,7 @@ def format_xml(tokens: Iterable[Token]) -> str:
|
|||||||
|
|
||||||
class ViewXmlHtml(base.View):
|
class ViewXmlHtml(base.View):
|
||||||
name = "XML/HTML"
|
name = "XML/HTML"
|
||||||
content_types = ["text/xml", "text/html"]
|
__content_types = ("text/xml", "text/html")
|
||||||
|
|
||||||
def __call__(self, data, **metadata):
|
def __call__(self, data, **metadata):
|
||||||
# TODO:
|
# TODO:
|
||||||
@ -233,3 +233,10 @@ class ViewXmlHtml(base.View):
|
|||||||
else:
|
else:
|
||||||
t = "XML"
|
t = "XML"
|
||||||
return t, pretty
|
return t, pretty
|
||||||
|
|
||||||
|
def render_priority(self, data: bytes, *, content_type: Optional[str] = None, **metadata) -> float:
|
||||||
|
if content_type in self.__content_types:
|
||||||
|
return 1
|
||||||
|
elif strutils.is_xml(data):
|
||||||
|
return 0.4
|
||||||
|
return float(content_type in self.__content_types)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import re
|
import re
|
||||||
|
import warnings
|
||||||
from dataclasses import dataclass, is_dataclass, fields
|
from dataclasses import dataclass, is_dataclass, fields
|
||||||
from typing import ClassVar, Any, Dict, Type, Set, List, TYPE_CHECKING, Sequence
|
from typing import ClassVar, Any, Dict, Type, Set, List, TYPE_CHECKING, Sequence
|
||||||
|
|
||||||
@ -32,7 +33,7 @@ class Hook:
|
|||||||
cls.name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
|
cls.name = re.sub('(?!^)([A-Z]+)', r'_\1', name).lower()
|
||||||
if cls.name in all_hooks:
|
if cls.name in all_hooks:
|
||||||
other = all_hooks[cls.name]
|
other = all_hooks[cls.name]
|
||||||
raise RuntimeError(f"Two conflicting event classes for {cls.name}: {cls} and {other}")
|
warnings.warn(f"Two conflicting event classes for {cls.name}: {cls} and {other}", RuntimeWarning)
|
||||||
if cls.name == "":
|
if cls.name == "":
|
||||||
return # don't register Hook class.
|
return # don't register Hook class.
|
||||||
all_hooks[cls.name] = cls
|
all_hooks[cls.name] = cls
|
||||||
|
@ -69,7 +69,7 @@ class Master:
|
|||||||
print(exc, file=sys.stderr)
|
print(exc, file=sys.stderr)
|
||||||
print("mitmproxy has crashed!", file=sys.stderr)
|
print("mitmproxy has crashed!", file=sys.stderr)
|
||||||
print("Please lodge a bug report at:", file=sys.stderr)
|
print("Please lodge a bug report at:", file=sys.stderr)
|
||||||
print("\thttps://github.com/mitmproxy/mitmproxy", file=sys.stderr)
|
print("\thttps://github.com/mitmproxy/mitmproxy/issues", file=sys.stderr)
|
||||||
|
|
||||||
self.addons.trigger(hooks.DoneHook())
|
self.addons.trigger(hooks.DoneHook())
|
||||||
|
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
import re
|
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
import re
|
||||||
|
from typing import Tuple, List, Optional
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
|
||||||
from mitmproxy.net.http import headers
|
from mitmproxy.net.http import headers
|
||||||
|
|
||||||
|
|
||||||
def encode(head, l):
|
def encode(head, l):
|
||||||
|
|
||||||
k = head.get("content-type")
|
k = head.get("content-type")
|
||||||
if k:
|
if k:
|
||||||
k = headers.parse_content_type(k)
|
k = headers.parse_content_type(k)
|
||||||
@ -38,17 +39,16 @@ def encode(head, l):
|
|||||||
return temp
|
return temp
|
||||||
|
|
||||||
|
|
||||||
def decode(hdrs, content):
|
def decode(content_type: Optional[str], content: bytes) -> List[Tuple[bytes, bytes]]:
|
||||||
"""
|
"""
|
||||||
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
|
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
|
||||||
"""
|
"""
|
||||||
v = hdrs.get("content-type")
|
if content_type:
|
||||||
if v:
|
ct = headers.parse_content_type(content_type)
|
||||||
v = headers.parse_content_type(v)
|
if not ct:
|
||||||
if not v:
|
|
||||||
return []
|
return []
|
||||||
try:
|
try:
|
||||||
boundary = v[2]["boundary"].encode("ascii")
|
boundary = ct[2]["boundary"].encode("ascii")
|
||||||
except (KeyError, UnicodeError):
|
except (KeyError, UnicodeError):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
@ -449,7 +449,7 @@ class Request(message.Message):
|
|||||||
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
|
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
|
||||||
if is_valid_content_type:
|
if is_valid_content_type:
|
||||||
try:
|
try:
|
||||||
return multipart.decode(self.headers, self.content)
|
return multipart.decode(self.headers.get("content-type"), self.content)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
return ()
|
return ()
|
||||||
|
@ -4,6 +4,7 @@ from abc import ABCMeta
|
|||||||
from enum import Flag
|
from enum import Flag
|
||||||
from typing import List, Literal, Optional, Sequence, Tuple, Union, TYPE_CHECKING
|
from typing import List, Literal, Optional, Sequence, Tuple, Union, TYPE_CHECKING
|
||||||
|
|
||||||
|
import mitmproxy
|
||||||
from mitmproxy import certs
|
from mitmproxy import certs
|
||||||
from mitmproxy.coretypes import serializable
|
from mitmproxy.coretypes import serializable
|
||||||
from mitmproxy.net import server_spec
|
from mitmproxy.net import server_spec
|
||||||
|
@ -5,6 +5,7 @@ The counterpart to events are commands.
|
|||||||
"""
|
"""
|
||||||
import socket
|
import socket
|
||||||
import typing
|
import typing
|
||||||
|
import warnings
|
||||||
from dataclasses import dataclass, is_dataclass
|
from dataclasses import dataclass, is_dataclass
|
||||||
|
|
||||||
from mitmproxy.proxy import commands
|
from mitmproxy.proxy import commands
|
||||||
@ -78,7 +79,7 @@ class CommandCompleted(Event):
|
|||||||
raise RuntimeError(f"{command_cls} needs a properly annotated command attribute.")
|
raise RuntimeError(f"{command_cls} needs a properly annotated command attribute.")
|
||||||
if command_cls in command_reply_subclasses:
|
if command_cls in command_reply_subclasses:
|
||||||
other = command_reply_subclasses[command_cls]
|
other = command_reply_subclasses[command_cls]
|
||||||
raise RuntimeError(f"Two conflicting subclasses for {command_cls}: {cls} and {other}")
|
warnings.warn(f"Two conflicting subclasses for {command_cls}: {cls} and {other}", RuntimeWarning)
|
||||||
command_reply_subclasses[command_cls] = cls
|
command_reply_subclasses[command_cls] = cls
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
@ -222,7 +222,7 @@ class NextLayer(Layer):
|
|||||||
# 3. Some layers may however still have a reference to the old .handle_event.
|
# 3. Some layers may however still have a reference to the old .handle_event.
|
||||||
# ._handle is just an optimization to reduce the callstack in these cases.
|
# ._handle is just an optimization to reduce the callstack in these cases.
|
||||||
self.handle_event = self.layer.handle_event
|
self.handle_event = self.layer.handle_event
|
||||||
self._handle_event = self.layer._handle_event
|
self._handle_event = self.layer.handle_event
|
||||||
self._handle = self.layer.handle_event
|
self._handle = self.layer.handle_event
|
||||||
|
|
||||||
# Utility methods for whoever decides what the next layer is going to be.
|
# Utility methods for whoever decides what the next layer is going to be.
|
||||||
|
@ -152,7 +152,7 @@ class FlowDetails(tabs.Tabs):
|
|||||||
|
|
||||||
from_client = flow.messages[0].from_client
|
from_client = flow.messages[0].from_client
|
||||||
for m in messages:
|
for m in messages:
|
||||||
_, lines, _ = contentviews.get_tcp_content_view(viewmode, m)
|
_, lines, _ = contentviews.get_tcp_content_view(viewmode, m, flow)
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
if from_client:
|
if from_client:
|
||||||
|
@ -5,7 +5,7 @@ import shutil
|
|||||||
import time
|
import time
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from mitmproxy import contentviews
|
from mitmproxy import contentviews, http
|
||||||
from mitmproxy import ctx
|
from mitmproxy import ctx
|
||||||
from mitmproxy import flowfilter
|
from mitmproxy import flowfilter
|
||||||
from mitmproxy import io, flow
|
from mitmproxy import io, flow
|
||||||
@ -49,6 +49,7 @@ def save_flows(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
|
|||||||
|
|
||||||
def save_flows_content(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
|
def save_flows_content(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
|
||||||
for f in flows:
|
for f in flows:
|
||||||
|
assert isinstance(f, http.HTTPFlow)
|
||||||
for m in ('request', 'response'):
|
for m in ('request', 'response'):
|
||||||
message = getattr(f, m)
|
message = getattr(f, m)
|
||||||
message_path = path / "flows" / f.id / m
|
message_path = path / "flows" / f.id / m
|
||||||
|
@ -156,7 +156,7 @@ def check():
|
|||||||
|
|
||||||
# Check for underscores in the options. Options always follow '--'.
|
# Check for underscores in the options. Options always follow '--'.
|
||||||
for argument in args:
|
for argument in args:
|
||||||
underscoreParam = re.search('[-]{2}((.*?_)(.*?(\s|$)))+', argument)
|
underscoreParam = re.search(r'[-]{2}((.*?_)(.*?(\s|$)))+', argument)
|
||||||
if underscoreParam is not None:
|
if underscoreParam is not None:
|
||||||
print("{} uses underscores, please use hyphens {}".format(
|
print("{} uses underscores, please use hyphens {}".format(
|
||||||
argument,
|
argument,
|
||||||
|
@ -133,7 +133,11 @@ def is_mostly_bin(s: bytes) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def is_xml(s: bytes) -> bool:
|
def is_xml(s: bytes) -> bool:
|
||||||
return s.strip().startswith(b"<")
|
for char in s:
|
||||||
|
if char in (9, 10, 32): # is space?
|
||||||
|
continue
|
||||||
|
return char == 60 # is a "<"?
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def clean_hanging_newline(t):
|
def clean_hanging_newline(t):
|
||||||
|
2
setup.py
2
setup.py
@ -97,7 +97,7 @@ setup(
|
|||||||
"pydivert>=2.0.3,<2.2",
|
"pydivert>=2.0.3,<2.2",
|
||||||
],
|
],
|
||||||
'dev': [
|
'dev': [
|
||||||
"hypothesis>=5.8,<6",
|
"hypothesis>=5.8,<6.1",
|
||||||
"parver>=0.1,<2.0",
|
"parver>=0.1,<2.0",
|
||||||
"pytest-asyncio>=0.10.0,<0.14,!=0.14",
|
"pytest-asyncio>=0.10.0,<0.14,!=0.14",
|
||||||
"pytest-cov>=2.7.1,<3",
|
"pytest-cov>=2.7.1,<3",
|
||||||
|
@ -15,3 +15,15 @@ def test_view_image(tdata):
|
|||||||
assert img.split(".")[-1].upper() in viewname
|
assert img.split(".")[-1].upper() in viewname
|
||||||
|
|
||||||
assert v(b"flibble") == ('Unknown Image', [[('header', 'Image Format: '), ('text', 'unknown')]])
|
assert v(b"flibble") == ('Unknown Image', [[('header', 'Image Format: '), ('text', 'unknown')]])
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = image.ViewImage()
|
||||||
|
assert v.render_priority(b"", content_type="image/png")
|
||||||
|
assert v.render_priority(b"", content_type="image/jpeg")
|
||||||
|
assert v.render_priority(b"", content_type="image/gif")
|
||||||
|
assert v.render_priority(b"", content_type="image/vnd.microsoft.icon")
|
||||||
|
assert v.render_priority(b"", content_type="image/x-icon")
|
||||||
|
assert v.render_priority(b"", content_type="image/webp")
|
||||||
|
assert v.render_priority(b"", content_type="image/future-unknown-format-42")
|
||||||
|
assert not v.render_priority(b"", content_type="image/svg+xml")
|
||||||
|
@ -3,14 +3,18 @@ from unittest import mock
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from mitmproxy import contentviews
|
from mitmproxy import contentviews
|
||||||
from mitmproxy.net.http import Headers
|
|
||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.test import tutils
|
from mitmproxy.test import tutils
|
||||||
|
|
||||||
|
|
||||||
class TestContentView(contentviews.View):
|
class TestContentView(contentviews.View):
|
||||||
name = "test"
|
name = "test"
|
||||||
content_types = ["test/123"]
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def should_render(self, content_type):
|
||||||
|
return content_type == "test/123"
|
||||||
|
|
||||||
|
|
||||||
def test_add_remove():
|
def test_add_remove():
|
||||||
@ -38,7 +42,7 @@ def test_get_content_view():
|
|||||||
desc, lines, err = contentviews.get_content_view(
|
desc, lines, err = contentviews.get_content_view(
|
||||||
contentviews.get("Auto"),
|
contentviews.get("Auto"),
|
||||||
b"[1, 2, 3]",
|
b"[1, 2, 3]",
|
||||||
headers=Headers(content_type="application/json")
|
content_type="application/json",
|
||||||
)
|
)
|
||||||
assert desc == "JSON"
|
assert desc == "JSON"
|
||||||
assert list(lines)
|
assert list(lines)
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
from mitmproxy.contentviews import auto
|
from mitmproxy.contentviews import auto
|
||||||
from mitmproxy.net import http
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.coretypes import multidict
|
|
||||||
from . import full_eval
|
from . import full_eval
|
||||||
|
|
||||||
|
|
||||||
@ -8,37 +7,42 @@ def test_view_auto():
|
|||||||
v = full_eval(auto.ViewAuto())
|
v = full_eval(auto.ViewAuto())
|
||||||
f = v(
|
f = v(
|
||||||
b"foo",
|
b"foo",
|
||||||
headers=http.Headers()
|
|
||||||
)
|
)
|
||||||
assert f[0] == "Raw"
|
assert f[0] == "Raw"
|
||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"<html></html>",
|
b"<html></html>",
|
||||||
headers=http.Headers(content_type="text/html")
|
content_type="text/html",
|
||||||
)
|
)
|
||||||
assert f[0] == "HTML"
|
assert f[0] == "HTML"
|
||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"foo",
|
b"foo",
|
||||||
headers=http.Headers(content_type="text/flibble")
|
content_type="text/flibble",
|
||||||
)
|
)
|
||||||
assert f[0] == "Raw"
|
assert f[0] == "Raw"
|
||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"<xml></xml>",
|
b"<xml></xml>",
|
||||||
headers=http.Headers(content_type="text/flibble")
|
content_type="text/flibble",
|
||||||
)
|
)
|
||||||
assert f[0].startswith("XML")
|
assert f[0].startswith("XML")
|
||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"<svg></svg>",
|
b"<svg></svg>",
|
||||||
headers=http.Headers(content_type="image/svg+xml")
|
content_type="image/svg+xml",
|
||||||
)
|
)
|
||||||
assert f[0].startswith("XML")
|
assert f[0].startswith("XML")
|
||||||
|
|
||||||
|
f = v(
|
||||||
|
b"{}",
|
||||||
|
content_type="application/acme+json",
|
||||||
|
)
|
||||||
|
assert f[0].startswith("JSON")
|
||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"verybinary",
|
b"verybinary",
|
||||||
headers=http.Headers(content_type="image/new-magic-image-format")
|
content_type="image/new-magic-image-format",
|
||||||
)
|
)
|
||||||
assert f[0] == "Unknown Image"
|
assert f[0] == "Unknown Image"
|
||||||
|
|
||||||
@ -47,13 +51,14 @@ def test_view_auto():
|
|||||||
|
|
||||||
f = v(
|
f = v(
|
||||||
b"",
|
b"",
|
||||||
headers=http.Headers()
|
|
||||||
)
|
)
|
||||||
assert f[0] == "No content"
|
assert f[0] == "No content"
|
||||||
|
|
||||||
|
flow = tflow.tflow()
|
||||||
|
flow.request.query = [("foo", "bar")]
|
||||||
f = v(
|
f = v(
|
||||||
b"",
|
b"",
|
||||||
headers=http.Headers(),
|
flow=flow,
|
||||||
query=multidict.MultiDict([("foo", "bar")]),
|
http_message=flow.request,
|
||||||
)
|
)
|
||||||
assert f[0] == "Query"
|
assert f[0] == "Query"
|
||||||
|
@ -37,3 +37,9 @@ def test_simple():
|
|||||||
assert v(b"console.log('not really css')") == (
|
assert v(b"console.log('not really css')") == (
|
||||||
'CSS', [[('text', "console.log('not really css')")]]
|
'CSS', [[('text', "console.log('not really css')")]]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = css.ViewCSS()
|
||||||
|
assert v.render_priority(b"", content_type="text/css")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -5,3 +5,10 @@ from . import full_eval
|
|||||||
def test_view_hex():
|
def test_view_hex():
|
||||||
v = full_eval(hex.ViewHex())
|
v = full_eval(hex.ViewHex())
|
||||||
assert v(b"foo")
|
assert v(b"foo")
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = hex.ViewHex()
|
||||||
|
assert not v.render_priority(b"ascii")
|
||||||
|
assert v.render_priority(b"\xFF")
|
||||||
|
assert not v.render_priority(b"")
|
||||||
|
@ -27,3 +27,11 @@ def test_format_xml(filename, tdata):
|
|||||||
expected = f.read()
|
expected = f.read()
|
||||||
js = javascript.beautify(input)
|
js = javascript.beautify(input)
|
||||||
assert js == expected
|
assert js == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = javascript.ViewJavaScript()
|
||||||
|
assert v.render_priority(b"", content_type="application/x-javascript")
|
||||||
|
assert v.render_priority(b"", content_type="application/javascript")
|
||||||
|
assert v.render_priority(b"", content_type="text/javascript")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -41,3 +41,12 @@ def test_view_json():
|
|||||||
def test_view_json_doesnt_crash(data):
|
def test_view_json_doesnt_crash(data):
|
||||||
v = full_eval(json.ViewJSON())
|
v = full_eval(json.ViewJSON())
|
||||||
v(data)
|
v(data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = json.ViewJSON()
|
||||||
|
assert v.render_priority(b"", content_type="application/json")
|
||||||
|
assert v.render_priority(b"", content_type="application/json-rpc")
|
||||||
|
assert v.render_priority(b"", content_type="application/vnd.api+json")
|
||||||
|
assert v.render_priority(b"", content_type="application/acme+json")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -44,3 +44,10 @@ def test_view_msgpack():
|
|||||||
def test_view_msgpack_doesnt_crash(data):
|
def test_view_msgpack_doesnt_crash(data):
|
||||||
v = full_eval(msgpack.ViewMsgPack())
|
v = full_eval(msgpack.ViewMsgPack())
|
||||||
v(data)
|
v(data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = msgpack.ViewMsgPack()
|
||||||
|
assert v.render_priority(b"", content_type="application/msgpack")
|
||||||
|
assert v.render_priority(b"", content_type="application/x-msgpack")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
from mitmproxy.contentviews import multipart
|
from mitmproxy.contentviews import multipart
|
||||||
from mitmproxy.net import http
|
|
||||||
from . import full_eval
|
from . import full_eval
|
||||||
|
|
||||||
|
|
||||||
@ -12,14 +11,16 @@ Content-Disposition: form-data; name="submit-name"
|
|||||||
Larry
|
Larry
|
||||||
--AaB03x
|
--AaB03x
|
||||||
""".strip()
|
""".strip()
|
||||||
h = http.Headers(content_type="multipart/form-data; boundary=AaB03x")
|
assert view(v, content_type="multipart/form-data; boundary=AaB03x")
|
||||||
assert view(v, headers=h)
|
|
||||||
|
|
||||||
h = http.Headers()
|
assert not view(v)
|
||||||
assert not view(v, headers=h)
|
|
||||||
|
|
||||||
h = http.Headers(content_type="multipart/form-data")
|
assert not view(v, content_type="multipart/form-data")
|
||||||
assert not view(v, headers=h)
|
|
||||||
|
|
||||||
h = http.Headers(content_type="unparseable")
|
assert not view(v, content_type="unparseable")
|
||||||
assert not view(v, headers=h)
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = multipart.ViewMultipart()
|
||||||
|
assert v.render_priority(b"", content_type="multipart/form-data")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -28,3 +28,10 @@ def test_format_pbuf(filename, tdata):
|
|||||||
expected = f.read()
|
expected = f.read()
|
||||||
|
|
||||||
assert protobuf.format_pbuf(input) == expected
|
assert protobuf.format_pbuf(input) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = protobuf.ViewProtobuf()
|
||||||
|
assert v.render_priority(b"", content_type="application/x-protobuf")
|
||||||
|
assert v.render_priority(b"", content_type="application/x-protobuffer")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -1,13 +1,23 @@
|
|||||||
from mitmproxy.contentviews import query
|
from mitmproxy.contentviews import query
|
||||||
from mitmproxy.coretypes import multidict
|
from mitmproxy.test import tutils
|
||||||
from . import full_eval
|
from . import full_eval
|
||||||
|
|
||||||
|
|
||||||
def test_view_query():
|
def test_view_query():
|
||||||
d = ""
|
d = ""
|
||||||
v = full_eval(query.ViewQuery())
|
v = full_eval(query.ViewQuery())
|
||||||
f = v(d, query=multidict.MultiDict([("foo", "bar"), ("foo", "baz")]))
|
req = tutils.treq()
|
||||||
|
req.query = [("foo", "bar"), ("foo", "baz")]
|
||||||
|
f = v(d, http_message=req)
|
||||||
assert f[0] == "Query"
|
assert f[0] == "Query"
|
||||||
assert f[1] == [[("header", "foo: "), ("text", "bar")], [("header", "foo: "), ("text", "baz")]]
|
assert f[1] == [[("header", "foo: "), ("text", "bar")], [("header", "foo: "), ("text", "baz")]]
|
||||||
|
|
||||||
assert v(d) == ("Query", [])
|
assert v(d) == ("Query", [])
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
view = query.ViewQuery()
|
||||||
|
req = tutils.treq()
|
||||||
|
req.query = [("foo", "bar"), ("foo", "baz")]
|
||||||
|
assert view.render_priority(b"", http_message=req)
|
||||||
|
assert not view.render_priority(b"")
|
||||||
|
@ -5,3 +5,9 @@ from . import full_eval
|
|||||||
def test_view_raw():
|
def test_view_raw():
|
||||||
v = full_eval(raw.ViewRaw())
|
v = full_eval(raw.ViewRaw())
|
||||||
assert v(b"foo")
|
assert v(b"foo")
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = raw.ViewRaw()
|
||||||
|
assert v.render_priority(b"anything")
|
||||||
|
assert not v.render_priority(b"")
|
||||||
|
@ -13,3 +13,9 @@ def test_view_urlencoded():
|
|||||||
assert v(d)
|
assert v(d)
|
||||||
|
|
||||||
assert not v(b"\xFF\x00")
|
assert not v(b"\xFF\x00")
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = urlencoded.ViewURLEncoded()
|
||||||
|
assert v.render_priority(b"", content_type="application/x-www-form-urlencoded")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -18,3 +18,10 @@ def test_wbxml(tdata):
|
|||||||
|
|
||||||
p = wbxml.ASCommandResponse.ASCommandResponse(input)
|
p = wbxml.ASCommandResponse.ASCommandResponse(input)
|
||||||
assert p.xmlString == expected
|
assert p.xmlString == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = wbxml.ViewWBXML()
|
||||||
|
assert v.render_priority(b"", content_type="application/vnd.wap.wbxml")
|
||||||
|
assert v.render_priority(b"", content_type="application/vnd.ms-sync.wbxml")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
@ -34,3 +34,12 @@ def test_format_xml(filename, tdata):
|
|||||||
expected = f.read()
|
expected = f.read()
|
||||||
tokens = xml_html.tokenize(input)
|
tokens = xml_html.tokenize(input)
|
||||||
assert xml_html.format_xml(tokens) == expected
|
assert xml_html.format_xml(tokens) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_render_priority():
|
||||||
|
v = xml_html.ViewXmlHtml()
|
||||||
|
assert v.render_priority(b"", content_type="text/xml")
|
||||||
|
assert v.render_priority(b"", content_type="text/xml")
|
||||||
|
assert v.render_priority(b"", content_type="text/html")
|
||||||
|
assert not v.render_priority(b"", content_type="text/plain")
|
||||||
|
assert v.render_priority(b"<html/>")
|
||||||
|
@ -1,13 +1,11 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
from mitmproxy.net.http import Headers
|
from mitmproxy.net.http import Headers
|
||||||
from mitmproxy.net.http import multipart
|
from mitmproxy.net.http import multipart
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
def test_decode():
|
def test_decode():
|
||||||
boundary = 'somefancyboundary'
|
boundary = 'somefancyboundary'
|
||||||
headers = Headers(
|
|
||||||
content_type='multipart/form-data; boundary=' + boundary
|
|
||||||
)
|
|
||||||
content = (
|
content = (
|
||||||
"--{0}\n"
|
"--{0}\n"
|
||||||
"Content-Disposition: form-data; name=\"field1\"\n\n"
|
"Content-Disposition: form-data; name=\"field1\"\n\n"
|
||||||
@ -17,24 +15,17 @@ def test_decode():
|
|||||||
"value2\n"
|
"value2\n"
|
||||||
"--{0}--".format(boundary).encode()
|
"--{0}--".format(boundary).encode()
|
||||||
)
|
)
|
||||||
|
form = multipart.decode(f'multipart/form-data; boundary={boundary}', content)
|
||||||
form = multipart.decode(headers, content)
|
|
||||||
|
|
||||||
assert len(form) == 2
|
assert len(form) == 2
|
||||||
assert form[0] == (b"field1", b"value1")
|
assert form[0] == (b"field1", b"value1")
|
||||||
assert form[1] == (b"field2", b"value2")
|
assert form[1] == (b"field2", b"value2")
|
||||||
|
|
||||||
boundary = 'boundary茅莽'
|
boundary = 'boundary茅莽'
|
||||||
headers = Headers(
|
result = multipart.decode(f'multipart/form-data; boundary={boundary}', content)
|
||||||
content_type='multipart/form-data; boundary=' + boundary
|
|
||||||
)
|
|
||||||
result = multipart.decode(headers, content)
|
|
||||||
assert result == []
|
assert result == []
|
||||||
|
|
||||||
headers = Headers(
|
assert multipart.decode("", content) == []
|
||||||
content_type=''
|
|
||||||
)
|
|
||||||
assert multipart.decode(headers, content) == []
|
|
||||||
|
|
||||||
|
|
||||||
def test_encode():
|
def test_encode():
|
||||||
|
@ -31,6 +31,6 @@ def test_command_completed():
|
|||||||
class FooCompleted1(events.CommandCompleted):
|
class FooCompleted1(events.CommandCompleted):
|
||||||
command: FooCommand
|
command: FooCommand
|
||||||
|
|
||||||
with pytest.raises(RuntimeError, match="conflicting subclasses"):
|
with pytest.warns(RuntimeWarning, match="conflicting subclasses"):
|
||||||
class FooCompleted2(events.CommandCompleted):
|
class FooCompleted2(events.CommandCompleted):
|
||||||
command: FooCommand
|
command: FooCommand
|
||||||
|
@ -24,7 +24,7 @@ def test_hook():
|
|||||||
assert e.args() == [b"foo"]
|
assert e.args() == [b"foo"]
|
||||||
assert FooHook in hooks.all_hooks.values()
|
assert FooHook in hooks.all_hooks.values()
|
||||||
|
|
||||||
with pytest.raises(RuntimeError, match="Two conflicting event classes"):
|
with pytest.warns(RuntimeWarning, match="Two conflicting event classes"):
|
||||||
@dataclass
|
@dataclass
|
||||||
class FooHook2(hooks.Hook):
|
class FooHook2(hooks.Hook):
|
||||||
name = "foo"
|
name = "foo"
|
||||||
|
@ -83,6 +83,7 @@ def test_is_mostly_bin():
|
|||||||
|
|
||||||
|
|
||||||
def test_is_xml():
|
def test_is_xml():
|
||||||
|
assert not strutils.is_xml(b"")
|
||||||
assert not strutils.is_xml(b"foo")
|
assert not strutils.is_xml(b"foo")
|
||||||
assert strutils.is_xml(b"<foo")
|
assert strutils.is_xml(b"<foo")
|
||||||
assert strutils.is_xml(b" \n<foo")
|
assert strutils.is_xml(b" \n<foo")
|
||||||
|
Loading…
Reference in New Issue
Block a user