Merge pull request #4096 from tasn/msgpack

Add a msgpack content viewer.
This commit is contained in:
Maximilian Hils 2020-07-22 18:32:19 +02:00 committed by GitHub
commit a5e8b5516c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 1 deletions

View File

@ -2,6 +2,8 @@ Unreleased: mitmproxy next
** Full Changelog **
* Add MsgPack content viewer (@tasn)
* --- TODO: add new PRs above this line ---
* ... and various other fixes, documentation improvements, dependency version bumps, etc.

View File

@ -22,7 +22,7 @@ from mitmproxy.net import http
from mitmproxy.utils import strutils
from . import (
auto, raw, hex, json, xml_html, wbxml, javascript, css,
urlencoded, multipart, image, query, protobuf
urlencoded, multipart, image, query, protobuf, msgpack
)
from .base import View, KEY_MAX, format_text, format_dict, TViewResult
@ -175,6 +175,7 @@ add(multipart.ViewMultipart())
add(image.ViewImage())
add(query.ViewQuery())
add(protobuf.ViewProtobuf())
add(msgpack.ViewMsgPack())
__all__ = [
"View", "KEY_MAX", "format_text", "format_dict", "TViewResult",

View File

@ -0,0 +1,50 @@
import typing
import msgpack
from mitmproxy.contentviews import base
PARSE_ERROR = object()
def parse_msgpack(s: bytes) -> typing.Any:
try:
return msgpack.unpackb(s, raw=False)
except (ValueError, msgpack.ExtraData, msgpack.FormatError, msgpack.StackError):
return PARSE_ERROR
def pretty(value, htchar=" ", lfchar="\n", indent=0):
nlch = lfchar + htchar * (indent + 1)
if type(value) is dict:
items = [
nlch + repr(key) + ": " + pretty(value[key], htchar, lfchar, indent + 1)
for key in value
]
return "{%s}" % (",".join(items) + lfchar + htchar * indent)
elif type(value) is list:
items = [
nlch + pretty(item, htchar, lfchar, indent + 1)
for item in value
]
return "[%s]" % (",".join(items) + lfchar + htchar * indent)
else:
return repr(value)
def format_msgpack(data):
return base.format_text(pretty(data))
class ViewMsgPack(base.View):
name = "MsgPack"
content_types = [
"application/msgpack",
"application/x-msgpack",
]
def __call__(self, data, **metadata):
data = parse_msgpack(data)
if data is not PARSE_ERROR:
return "MsgPack", format_msgpack(data)

View File

@ -71,6 +71,7 @@ setup(
"hyperframe>=5.1.0,<6",
"kaitaistruct>=0.7,<0.9",
"ldap3>=2.6.1,<2.8",
"msgpack>=1.0.0, <1.1.0",
"passlib>=1.6.5, <1.8",
"protobuf>=3.6.0, <3.12",
"pyasn1>=0.3.1,<0.5",

View File

@ -0,0 +1,46 @@
from hypothesis import given
from hypothesis.strategies import binary
from msgpack import packb
from mitmproxy.contentviews import msgpack
from . import full_eval
def msgpack_encode(content):
return packb(content, use_bin_type=True)
def test_parse_msgpack():
assert msgpack.parse_msgpack(msgpack_encode({"foo": 1}))
assert msgpack.parse_msgpack(b"aoesuteoahu") is msgpack.PARSE_ERROR
assert msgpack.parse_msgpack(msgpack_encode({"foo": "\xe4\xb8\x96\xe7\x95\x8c"}))
def test_format_msgpack():
assert list(msgpack.format_msgpack({
"data": [
"str",
42,
True,
False,
None,
{},
[]
]
}))
def test_view_msgpack():
v = full_eval(msgpack.ViewMsgPack())
assert v(msgpack_encode({}))
assert not v(b"aoesuteoahu")
assert v(msgpack_encode([1, 2, 3, 4, 5]))
assert v(msgpack_encode({"foo": 3}))
assert v(msgpack_encode({"foo": True, "nullvalue": None}))
@given(binary())
def test_view_msgpack_doesnt_crash(data):
v = full_eval(msgpack.ViewMsgPack())
v(data)