mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
Encoding fixes and tests
This commit is contained in:
parent
2316c0fb74
commit
c622622c59
@ -7,6 +7,7 @@ import json
|
||||
import sys
|
||||
import base64
|
||||
import zlib
|
||||
import os
|
||||
|
||||
from datetime import datetime
|
||||
import pytz
|
||||
@ -166,7 +167,7 @@ def done():
|
||||
if dump_file.endswith('.zhar'):
|
||||
raw = zlib.compress(raw, 9)
|
||||
|
||||
with open(dump_file, "wb") as f:
|
||||
with open(os.path.expanduser(dump_file), "wb") as f:
|
||||
f.write(raw)
|
||||
|
||||
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
|
||||
|
@ -31,8 +31,8 @@ def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
|
||||
Raises:
|
||||
ValueError, if decoding fails.
|
||||
"""
|
||||
if len(encoded) == 0:
|
||||
return encoded
|
||||
if encoded is None:
|
||||
return None
|
||||
|
||||
global _cache
|
||||
cached = (
|
||||
@ -72,8 +72,8 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
|
||||
Raises:
|
||||
ValueError, if encoding fails.
|
||||
"""
|
||||
if len(decoded) == 0:
|
||||
return decoded
|
||||
if decoded is None:
|
||||
return None
|
||||
|
||||
global _cache
|
||||
cached = (
|
||||
@ -86,10 +86,7 @@ def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> U
|
||||
return _cache.encoded
|
||||
try:
|
||||
try:
|
||||
value = decoded
|
||||
if isinstance(value, str):
|
||||
value = decoded.encode()
|
||||
encoded = custom_encode[encoding](value)
|
||||
encoded = custom_encode[encoding](decoded)
|
||||
except KeyError:
|
||||
encoded = codecs.encode(decoded, encoding, errors)
|
||||
if encoding in ("gzip", "deflate", "br"):
|
||||
@ -114,12 +111,14 @@ def identity(content):
|
||||
return content
|
||||
|
||||
|
||||
def decode_gzip(content):
|
||||
def decode_gzip(content: bytes) -> bytes:
|
||||
if not content:
|
||||
return b""
|
||||
gfile = gzip.GzipFile(fileobj=BytesIO(content))
|
||||
return gfile.read()
|
||||
|
||||
|
||||
def encode_gzip(content):
|
||||
def encode_gzip(content: bytes) -> bytes:
|
||||
s = BytesIO()
|
||||
gf = gzip.GzipFile(fileobj=s, mode='wb')
|
||||
gf.write(content)
|
||||
@ -127,15 +126,17 @@ def encode_gzip(content):
|
||||
return s.getvalue()
|
||||
|
||||
|
||||
def decode_brotli(content):
|
||||
def decode_brotli(content: bytes) -> bytes:
|
||||
if not content:
|
||||
return b""
|
||||
return brotli.decompress(content)
|
||||
|
||||
|
||||
def encode_brotli(content):
|
||||
def encode_brotli(content: bytes) -> bytes:
|
||||
return brotli.compress(content)
|
||||
|
||||
|
||||
def decode_deflate(content):
|
||||
def decode_deflate(content: bytes) -> bytes:
|
||||
"""
|
||||
Returns decompressed data for DEFLATE. Some servers may respond with
|
||||
compressed data without a zlib header or checksum. An undocumented
|
||||
@ -144,13 +145,15 @@ def decode_deflate(content):
|
||||
|
||||
http://bugs.python.org/issue5784
|
||||
"""
|
||||
if not content:
|
||||
return b""
|
||||
try:
|
||||
return zlib.decompress(content)
|
||||
except zlib.error:
|
||||
return zlib.decompress(content, -15)
|
||||
|
||||
|
||||
def encode_deflate(content):
|
||||
def encode_deflate(content: bytes) -> bytes:
|
||||
"""
|
||||
Returns compressed content, always including zlib header and checksum.
|
||||
"""
|
||||
|
@ -21,16 +21,14 @@ def test_identity(encoder):
|
||||
'deflate',
|
||||
])
|
||||
def test_encoders(encoder):
|
||||
assert "" == encoding.decode("", encoder)
|
||||
"""
|
||||
This test is for testing byte->byte encoding/decoding
|
||||
"""
|
||||
assert encoding.decode(None, encoder) is None
|
||||
assert encoding.encode(None, encoder) is None
|
||||
|
||||
assert b"" == encoding.decode(b"", encoder)
|
||||
|
||||
assert "string" == encoding.decode(
|
||||
encoding.encode(
|
||||
"string",
|
||||
encoder
|
||||
),
|
||||
encoder
|
||||
)
|
||||
assert b"string" == encoding.decode(
|
||||
encoding.encode(
|
||||
b"string",
|
||||
@ -39,10 +37,41 @@ def test_encoders(encoder):
|
||||
encoder
|
||||
)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
encoding.encode("string", encoder)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
encoding.decode("string", encoder)
|
||||
with pytest.raises(ValueError):
|
||||
encoding.decode(b"foobar", encoder)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("encoder", [
|
||||
'utf8',
|
||||
'latin-1'
|
||||
])
|
||||
def test_encoders_strings(encoder):
|
||||
"""
|
||||
This test is for testing byte->str decoding
|
||||
and str->byte encoding
|
||||
"""
|
||||
assert "" == encoding.decode(b"", encoder)
|
||||
|
||||
assert "string" == encoding.decode(
|
||||
encoding.encode(
|
||||
"string",
|
||||
encoder
|
||||
),
|
||||
encoder
|
||||
)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
encoding.encode(b"string", encoder)
|
||||
|
||||
with pytest.raises(TypeError):
|
||||
encoding.decode("foobar", encoder)
|
||||
|
||||
|
||||
def test_cache():
|
||||
decode_gzip = mock.MagicMock()
|
||||
decode_gzip.return_value = b"decoded"
|
||||
|
Loading…
Reference in New Issue
Block a user