Merge pull request #1655 from mhils/fix-encoding

Encoding Fixes
This commit is contained in:
Maximilian Hils 2016-10-22 19:10:44 -07:00 committed by GitHub
commit c09cedd0f8
5 changed files with 40 additions and 16 deletions

View File

@ -3,7 +3,6 @@ This inline script can be used to dump flows as HAR files.
""" """
import pprint
import json import json
import sys import sys
import base64 import base64
@ -128,19 +127,22 @@ def response(flow):
"timings": timings, "timings": timings,
} }
# Store binay data as base64 # Store binary data as base64
if strutils.is_mostly_bin(flow.response.content): if strutils.is_mostly_bin(flow.response.content):
b64 = base64.b64encode(flow.response.content) entry["response"]["content"]["text"] = base64.b64encode(flow.response.content).decode()
entry["response"]["content"]["text"] = b64.decode('ascii')
entry["response"]["content"]["encoding"] = "base64" entry["response"]["content"]["encoding"] = "base64"
else: else:
entry["response"]["content"]["text"] = flow.response.text entry["response"]["content"]["text"] = flow.response.get_text(strict=False)
if flow.request.method in ["POST", "PUT", "PATCH"]: if flow.request.method in ["POST", "PUT", "PATCH"]:
params = [
{"name": a.decode("utf8", "surrogateescape"), "value": b.decode("utf8", "surrogateescape")}
for a, b in flow.request.urlencoded_form.items(multi=True)
]
entry["request"]["postData"] = { entry["request"]["postData"] = {
"mimeType": flow.request.headers.get("Content-Type", "").split(";")[0], "mimeType": flow.request.headers.get("Content-Type", "").split(";")[0],
"text": flow.request.content, "text": flow.request.get_text(strict=False),
"params": name_value(flow.request.urlencoded_form) "params": params
} }
if flow.server_conn: if flow.server_conn:
@ -155,16 +157,17 @@ def done():
""" """
dump_file = sys.argv[1] dump_file = sys.argv[1]
json_dump = json.dumps(HAR, indent=2) # type: str
if dump_file == '-': if dump_file == '-':
mitmproxy.ctx.log(pprint.pformat(HAR)) mitmproxy.ctx.log(json_dump)
else: else:
json_dump = json.dumps(HAR, indent=2) raw = json_dump.encode() # type: bytes
if dump_file.endswith('.zhar'): if dump_file.endswith('.zhar'):
json_dump = zlib.compress(json_dump, 9) raw = zlib.compress(raw, 9)
with open(dump_file, "w") as f: with open(dump_file, "wb") as f:
f.write(json_dump) f.write(raw)
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump)) mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))

View File

@ -14,7 +14,7 @@ DATATYPES = dict(
punctuation=string.punctuation.encode(), punctuation=string.punctuation.encode(),
whitespace=string.whitespace.encode(), whitespace=string.whitespace.encode(),
ascii=string.printable.encode(), ascii=string.printable.encode(),
bytes=bytes(bytearray(range(256))) bytes=bytes(range(256))
) )

View File

@ -160,3 +160,24 @@ class TestHARDump:
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires'] assert f['expires']
def test_binary(self):
f = self.flow()
f.request.method = "POST"
f.request.headers["content-type"] = "application/x-www-form-urlencoded"
f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f"
f.response.headers["random-junk"] = bytes(range(256))
f.response.content = bytes(range(256))
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
m, sc = tscript("har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", f)
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
assert len(har["log"]["entries"]) == 1

View File

@ -346,7 +346,7 @@ class TestSerialize:
sio = io.BytesIO() sio = io.BytesIO()
f = tutils.tflow() f = tutils.tflow()
f.marked = True f.marked = True
f.request.content = bytes(bytearray(range(256))) f.request.content = bytes(range(256))
w = mitmproxy.io.FlowWriter(sio) w = mitmproxy.io.FlowWriter(sio)
w.add(f) w.add(f)

View File

@ -3,7 +3,7 @@ from mitmproxy.test import tutils
def test_always_bytes(): def test_always_bytes():
assert strutils.always_bytes(bytes(bytearray(range(256)))) == bytes(bytearray(range(256))) assert strutils.always_bytes(bytes(range(256))) == bytes(range(256))
assert strutils.always_bytes("foo") == b"foo" assert strutils.always_bytes("foo") == b"foo"
with tutils.raises(ValueError): with tutils.raises(ValueError):
strutils.always_bytes(u"\u2605", "ascii") strutils.always_bytes(u"\u2605", "ascii")