Substantially rewrite AMF decoding.

This is tricky, but we should now handle a lot more corner-cases.
This commit is contained in:
Aldo Cortesi 2012-11-26 13:25:07 +13:00
parent 64bf97bfb0
commit 8c976ac7f0
5 changed files with 88 additions and 31 deletions

View File

@ -15,11 +15,10 @@ from ..contrib import jsbeautifier, html2text
try: try:
import pyamf import pyamf
from pyamf import remoting from pyamf import remoting, flex
except ImportError: # pragma nocover except ImportError: # pragma nocover
pyamf = None pyamf = None
VIEW_CUTOFF = 1024*50 VIEW_CUTOFF = 1024*50
@ -236,30 +235,75 @@ class ViewMultipart:
return "Multipart form", r return "Multipart form", r
class ViewAMF: if pyamf:
name = "AMF" class DummyObject(dict):
prompt = ("amf", "f") def __init__(self, alias):
content_types = ["application/x-amf"] dict.__init__(self)
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content)
if not envelope:
return None
data = {} def __readamf__(self, input):
data['amfVersion'] = envelope.amfVersion data = input.readObject()
for target, message in iter(envelope): self["data"] = data
one_message = {}
if hasattr(message, 'status'): def pyamf_class_loader(s):
one_message['status'] = message.status for i in pyamf.CLASS_LOADERS:
if i != pyamf_class_loader:
v = i(s)
if v:
return v
return DummyObject
if hasattr(message, 'target'): pyamf.register_class_loader(pyamf_class_loader)
one_message['target'] = message.target
one_message['body'] = message.body class ViewAMF:
data[target] = one_message name = "AMF"
s = json.dumps(data, indent=4) prompt = ("amf", "f")
return "AMF", _view_text(s[:limit], len(s), limit) content_types = ["application/x-amf"]
def unpack(self, b, seen=set([])):
if hasattr(b, "body"):
return self.unpack(b.body, seen)
if isinstance(b, DummyObject):
if id(b) in seen:
return "<recursion>"
else:
seen.add(id(b))
for k, v in b.items():
b[k] = self.unpack(v, seen)
return b
elif isinstance(b, dict):
for k, v in b.items():
b[k] = self.unpack(v, seen)
return b
elif isinstance(b, list):
return [self.unpack(i) for i in b]
elif isinstance(b, flex.ArrayCollection):
return [self.unpack(i, seen) for i in b]
else:
return b
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content, strict=False)
if not envelope:
return None
txt = []
for target, message in iter(envelope):
if isinstance(message, pyamf.remoting.Request):
txt.append(urwid.Text([
("header", "Request: "),
("text", str(target)),
]))
else:
txt.append(urwid.Text([
("header", "Response: "),
("text", "%s, code %s"%(target, message.status)),
]))
s = json.dumps(self.unpack(message), indent=4)
txt.extend(_view_text(s[:limit], len(s), limit))
return "AMF v%s"%envelope.amfVersion, txt
class ViewJavaScript: class ViewJavaScript:

BIN
test/data/amf02 Normal file

Binary file not shown.

BIN
test/data/amf03 Normal file

Binary file not shown.

View File

@ -3,6 +3,12 @@ import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding from libmproxy import utils, flow, encoding
import tutils import tutils
try:
import pyamf
except ImportError:
pyamf = None
class TestContentView: class TestContentView:
def test_trailer(self): def test_trailer(self):
txt = [] txt = []
@ -119,15 +125,6 @@ class TestContentView:
assert not v([], "flibble", sys.maxint) assert not v([], "flibble", sys.maxint)
def test_view_amf(self):
try:
import pyamf
v = cv.ViewAMF()
p = tutils.test_data.path("data/test.amf")
assert v([], file(p).read(), sys.maxint)
except ImportError:
pass
def test_view_multipart(self): def test_view_multipart(self):
view = cv.ViewMultipart() view = cv.ViewMultipart()
v = """ v = """
@ -220,5 +217,21 @@ Larry
assert "Raw" in r[0] assert "Raw" in r[0]
if pyamf:
def test_view_amf_request():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf01")
assert v([], file(p).read(), sys.maxint)
p = tutils.test_data.path("data/amf02")
assert v([], file(p).read(), sys.maxint)
def test_view_amf_response():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf03")
assert v([], file(p).read(), sys.maxint)
def test_get_by_shortcut(): def test_get_by_shortcut():
assert cv.get_by_shortcut("h") assert cv.get_by_shortcut("h")