Adds support for content encoding, namely gip and deflate

This commit is contained in:
alts 2011-07-16 02:47:06 -07:00
parent 94ae720a22
commit 6dc0f105cc
7 changed files with 122 additions and 26 deletions

View File

@ -111,7 +111,7 @@ def common_options(parser):
)
parser.add_option(
"-z",
action="store_false", dest="anticomp", default=True,
action="store_false", dest="anticomp", default=False,
help="Try to convince servers to send us un-compressed data."
)

View File

@ -18,7 +18,7 @@ import os.path, sys
import cStringIO
import urwid.raw_display
import urwid
import controller, utils, filt, proxy, flow
import controller, utils, filt, proxy, flow, encoding
VIEW_CUTOFF = 1024*100
@ -97,6 +97,11 @@ def format_flow(f, focus, extended=False, padding=2):
if t:
t = t[0].split(";")[0]
txt.append(("text", " %s"%t))
e = f.response.headers["content-encoding"]
if e:
e = e[0]
else:
e = "identity"
if f.response.content:
txt.append(", %s"%utils.pretty_size(len(f.response.content)))
elif f.error:
@ -295,8 +300,13 @@ class ConnectionView(WWrap):
def _conn_text(self, conn, viewmode):
if conn:
e = conn.headers["content-encoding"]
if e:
e = e[0]
else:
e = "identity"
return self.master._cached_conn_text(
conn.content,
encoding.decode(e, conn.content),
tuple([tuple(i) for i in conn.headers.lst]),
viewmode
)

43
libmproxy/encoding.py Normal file
View File

@ -0,0 +1,43 @@
"""
Utility functions for decoding response bodies.
"""
import cStringIO
import gzip, zlib
__ALL__ = ["ENCODINGS"]
ENCODINGS = set(["identity", "gzip", "deflate"])
def decode(encoding, content):
encoding_map = {
"identity": decode_identity,
"gzip": decode_gzip,
"deflate": decode_deflate,
}
return encoding_map.get(encoding, decode_identity)(content)
def decode_identity(content):
"""
Returns content unchanged. Identity is the default value of
Accept-Encoding headers.
"""
return content
def decode_gzip(content):
gfile = gzip.GzipFile(fileobj=cStringIO.StringIO(content))
return gfile.read()
def decode_deflate(content):
"""
Returns decompress data for DEFLATE. Some servers may respond with
compressed data without a zlib header or checksum. An undocumented
feature of zlib permits the lenient decompression of data missing both
values.
http://bugs.python.org/issue5784
"""
try:
return zlib.decompress(content)
except zlib.error:
return zlib.decompress(content, -15)

View File

@ -568,6 +568,9 @@ class FlowMaster(controller.Master):
f.request.anticache()
if self.anticomp:
f.request.anticomp()
else:
f.request.constrain_encoding()
if self.server_playback:
pb = self.do_server_playback(f)
if not pb:

View File

@ -9,7 +9,7 @@ import sys, os, string, socket, urlparse, re, select, copy, base64, time, Cookie
from email.utils import parsedate_tz, formatdate, mktime_tz
import shutil, tempfile
import optparse, SocketServer, ssl
import utils, controller
import utils, controller, encoding
NAME = "mitmproxy"
@ -156,11 +156,21 @@ class Request(controller.Msg):
def anticomp(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = ["identity"]
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
if self.headers["accept-encoding"]:
self.headers["accept-encoding"] = [', '.join([
e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0]
])]
def set_replay(self):
self.client_conn = None
@ -381,7 +391,6 @@ class Response(controller.Msg):
modifications to make sure interception works properly.
"""
headers = self.headers.copy()
utils.try_del(headers, 'accept-encoding')
utils.try_del(headers, 'proxy-connection')
utils.try_del(headers, 'connection')
utils.try_del(headers, 'keep-alive')

31
test/test_encoding.py Normal file
View File

@ -0,0 +1,31 @@
from libmproxy import encoding
import libpry
import cStringIO
import gzip, zlib
class udecode_identity(libpry.AutoTree):
def test_decode(self):
assert 'string' == encoding.decode('identity', 'string')
def test_fallthrough(self):
assert 'string' == encoding.decode('nonexistent encoding', 'string')
class udecode_gzip(libpry.AutoTree):
def test_simple(self):
s = cStringIO.StringIO()
gf = gzip.GzipFile(fileobj=s, mode='wb')
gf.write('string')
gf.close()
assert 'string' == encoding.decode('gzip', s.getvalue())
class udecode_deflate(libpry.AutoTree):
def test_simple(self):
assert 'string' == encoding.decode('deflate', zlib.compress('string'))
assert 'string' == encoding.decode('deflate', zlib.compress('string')[2:-4])
tests = [
udecode_identity(),
udecode_gzip(),
udecode_deflate()
]