""" Utility functions for decoding response bodies. """ from __future__ import absolute_import import codecs import collections from io import BytesIO import gzip import zlib from typing import Union # noqa # We have a shared single-element cache for encoding and decoding. # This is quite useful in practice, e.g. # flow.request.content = flow.request.content.replace(b"foo", b"bar") # does not require an .encode() call if content does not contain b"foo" CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors decoded") _cache = CachedDecode(None, None, None, None) def decode(encoded, encoding, errors='strict'): # type: (Union[str, bytes], str, str) -> Union[str, bytes] """ Decode the given input object Returns: The decoded value Raises: ValueError, if decoding fails. """ global _cache cached = ( isinstance(encoded, bytes) and _cache.encoded == encoded and _cache.encoding == encoding and _cache.errors == errors ) if cached: return _cache.decoded try: try: decoded = custom_decode[encoding](encoded) except KeyError: decoded = codecs.decode(encoded, encoding, errors) if encoding in ("gzip", "deflate"): _cache = CachedDecode(encoded, encoding, errors, decoded) return decoded except Exception as e: raise ValueError("{} when decoding {} with {}".format( type(e).__name__, repr(encoded)[:10], repr(encoding), )) def encode(decoded, encoding, errors='strict'): # type: (Union[str, bytes], str, str) -> Union[str, bytes] """ Encode the given input object Returns: The encoded value Raises: ValueError, if encoding fails. """ global _cache cached = ( isinstance(decoded, bytes) and _cache.decoded == decoded and _cache.encoding == encoding and _cache.errors == errors ) if cached: return _cache.encoded try: try: encoded = custom_encode[encoding](decoded) except KeyError: encoded = codecs.encode(decoded, encoding, errors) if encoding in ("gzip", "deflate"): _cache = CachedDecode(encoded, encoding, errors, decoded) return encoded except Exception as e: raise ValueError("{} when encoding {} with {}".format( type(e).__name__, repr(decoded)[:10], repr(encoding), )) def identity(content): """ Returns content unchanged. Identity is the default value of Accept-Encoding headers. """ return content def decode_gzip(content): gfile = gzip.GzipFile(fileobj=BytesIO(content)) return gfile.read() def encode_gzip(content): s = BytesIO() gf = gzip.GzipFile(fileobj=s, mode='wb') gf.write(content) gf.close() return s.getvalue() def decode_deflate(content): """ Returns decompressed data for DEFLATE. Some servers may respond with compressed data without a zlib header or checksum. An undocumented feature of zlib permits the lenient decompression of data missing both values. http://bugs.python.org/issue5784 """ try: return zlib.decompress(content) except zlib.error: return zlib.decompress(content, -15) def encode_deflate(content): """ Returns compressed content, always including zlib header and checksum. """ return zlib.compress(content) custom_decode = { "identity": identity, "gzip": decode_gzip, "deflate": decode_deflate, } custom_encode = { "identity": identity, "gzip": encode_gzip, "deflate": encode_deflate, } __all__ = ["encode", "decode"]