From c622622c5924b291db302aa22f4d82608677ceed Mon Sep 17 00:00:00 2001 From: Ujjwal Verma Date: Sun, 5 Feb 2017 18:59:01 +0530 Subject: [PATCH] Encoding fixes and tests --- examples/complex/har_dump.py | 3 +- mitmproxy/net/http/encoding.py | 31 ++++++++-------- test/mitmproxy/net/http/test_encoding.py | 45 +++++++++++++++++++----- 3 files changed, 56 insertions(+), 23 deletions(-) diff --git a/examples/complex/har_dump.py b/examples/complex/har_dump.py index 62011903f..f7c1e6589 100644 --- a/examples/complex/har_dump.py +++ b/examples/complex/har_dump.py @@ -7,6 +7,7 @@ import json import sys import base64 import zlib +import os from datetime import datetime import pytz @@ -166,7 +167,7 @@ def done(): if dump_file.endswith('.zhar'): raw = zlib.compress(raw, 9) - with open(dump_file, "wb") as f: + with open(os.path.expanduser(dump_file), "wb") as f: f.write(raw) mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump)) diff --git a/mitmproxy/net/http/encoding.py b/mitmproxy/net/http/encoding.py index e123a0336..5da070995 100644 --- a/mitmproxy/net/http/encoding.py +++ b/mitmproxy/net/http/encoding.py @@ -31,8 +31,8 @@ def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> U Raises: ValueError, if decoding fails. """ - if len(encoded) == 0: - return encoded + if encoded is None: + return None global _cache cached = ( @@ -72,8 +72,8 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U Raises: ValueError, if encoding fails. """ - if len(decoded) == 0: - return decoded + if decoded is None: + return None global _cache cached = ( @@ -86,10 +86,7 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U return _cache.encoded try: try: - value = decoded - if isinstance(value, str): - value = decoded.encode() - encoded = custom_encode[encoding](value) + encoded = custom_encode[encoding](decoded) except KeyError: encoded = codecs.encode(decoded, encoding, errors) if encoding in ("gzip", "deflate", "br"): @@ -114,12 +111,14 @@ def identity(content): return content -def decode_gzip(content): +def decode_gzip(content: bytes) -> bytes: + if not content: + return b"" gfile = gzip.GzipFile(fileobj=BytesIO(content)) return gfile.read() -def encode_gzip(content): +def encode_gzip(content: bytes) -> bytes: s = BytesIO() gf = gzip.GzipFile(fileobj=s, mode='wb') gf.write(content) @@ -127,15 +126,17 @@ def encode_gzip(content): return s.getvalue() -def decode_brotli(content): +def decode_brotli(content: bytes) -> bytes: + if not content: + return b"" return brotli.decompress(content) -def encode_brotli(content): +def encode_brotli(content: bytes) -> bytes: return brotli.compress(content) -def decode_deflate(content): +def decode_deflate(content: bytes) -> bytes: """ Returns decompressed data for DEFLATE. Some servers may respond with compressed data without a zlib header or checksum. An undocumented @@ -144,13 +145,15 @@ def decode_deflate(content): http://bugs.python.org/issue5784 """ + if not content: + return b"" try: return zlib.decompress(content) except zlib.error: return zlib.decompress(content, -15) -def encode_deflate(content): +def encode_deflate(content: bytes) -> bytes: """ Returns compressed content, always including zlib header and checksum. """ diff --git a/test/mitmproxy/net/http/test_encoding.py b/test/mitmproxy/net/http/test_encoding.py index 11619c443..8dac12cbc 100644 --- a/test/mitmproxy/net/http/test_encoding.py +++ b/test/mitmproxy/net/http/test_encoding.py @@ -21,16 +21,14 @@ def test_identity(encoder): 'deflate', ]) def test_encoders(encoder): - assert "" == encoding.decode("", encoder) + """ + This test is for testing byte->byte encoding/decoding + """ + assert encoding.decode(None, encoder) is None + assert encoding.encode(None, encoder) is None + assert b"" == encoding.decode(b"", encoder) - assert "string" == encoding.decode( - encoding.encode( - "string", - encoder - ), - encoder - ) assert b"string" == encoding.decode( encoding.encode( b"string", @@ -39,10 +37,41 @@ def test_encoders(encoder): encoder ) + with pytest.raises(TypeError): + encoding.encode("string", encoder) + + with pytest.raises(TypeError): + encoding.decode("string", encoder) with pytest.raises(ValueError): encoding.decode(b"foobar", encoder) +@pytest.mark.parametrize("encoder", [ + 'utf8', + 'latin-1' +]) +def test_encoders_strings(encoder): + """ + This test is for testing byte->str decoding + and str->byte encoding + """ + assert "" == encoding.decode(b"", encoder) + + assert "string" == encoding.decode( + encoding.encode( + "string", + encoder + ), + encoder + ) + + with pytest.raises(TypeError): + encoding.encode(b"string", encoder) + + with pytest.raises(TypeError): + encoding.decode("foobar", encoder) + + def test_cache(): decode_gzip = mock.MagicMock() decode_gzip.return_value = b"decoded"