Merge pull request #99 from rouli/master

Adding helper functions to make HAR export easier
This commit is contained in:
Aldo Cortesi 2013-01-28 13:28:49 -08:00
commit 6f157d936f
2 changed files with 172 additions and 0 deletions

View File

@ -197,6 +197,16 @@ class decoded(object):
class HTTPMsg(controller.Msg):
def get_decoded_content(self):
"""
Returns the decoded content based on the current Content-Encoding header.
Doesn't change the message iteself or its headers.
"""
ce = self.headers.get_first("content-encoding")
if not self.content or ce not in encoding.ENCODINGS:
return self.content
return encoding.decode(ce, self.content)
def decode(self):
"""
Decodes content based on the current Content-Encoding header, then
@ -232,7 +242,15 @@ class HTTPMsg(controller.Msg):
else:
return hl
def get_content_type(self):
return self.headers.get_first("content-type")
def get_transmitted_size(self):
# FIXME: this is inprecise in case chunking is used
# (we should count the chunking headers)
if not self.content:
return 0
return len(self.content)
class Request(HTTPMsg):
"""
@ -459,6 +477,28 @@ class Request(HTTPMsg):
self.scheme, self.host, self.port, self.path = parts
return True
def get_cookies(self):
cookie_headers = self.headers.get("cookie")
if not cookie_headers:
return None
cookies = []
for header in cookie_headers:
pairs = [pair.partition("=") for pair in header.split(';')]
cookies.extend((pair[0],(pair[2],{})) for pair in pairs)
return dict(cookies)
def get_header_size(self):
FMT = '%s %s HTTP/%s.%s\r\n%s\r\n'
assembled_header = FMT % (
self.method,
self.path,
self.httpversion[0],
self.httpversion[1],
str(self.headers)
)
return len(assembled_header)
def _assemble_head(self, proxy=False):
FMT = '%s %s HTTP/%s.%s\r\n%s\r\n'
FMT_PROXY = '%s %s://%s:%s%s HTTP/%s.%s\r\n%s\r\n'
@ -713,6 +753,25 @@ class Response(HTTPMsg):
c += self.headers.replace(pattern, repl, *args, **kwargs)
return c
def get_header_size(self):
FMT = '%s\r\n%s\r\n'
proto = "HTTP/%s.%s %s %s"%(self.httpversion[0], self.httpversion[1], self.code, str(self.msg))
assembled_header = FMT % (proto, str(self.headers))
return len(assembled_header)
def get_cookies(self):
cookie_headers = self.headers.get("set-cookie")
if not cookie_headers:
return None
cookies = []
for header in cookie_headers:
pairs = [pair.partition("=") for pair in header.split(';')]
cookie_name = pairs[0][0] # the key of the first key/value pairs
cookie_value = pairs[0][2] # the value of the first key/value pairs
cookie_parameters = {key.strip().lower():value.strip() for key,sep,value in pairs[1:]}
cookies.append((cookie_name, (cookie_value, cookie_parameters)))
return dict(cookies)
class ClientDisconnect(controller.Msg):
"""

View File

@ -892,6 +892,57 @@ class TestRequest:
assert not r.headers["content-encoding"]
assert r.content == "falafel"
def test_get_cookies_single(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
result = r.get_cookies()
assert len(result)==1
assert result['cookiename']==('cookievalue',{})
def test_get_cookies_double(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('cookievalue',{})
assert result['othercookiename']==('othercookievalue',{})
def test_get_cookies_withequalsign(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('coo=kievalue',{})
assert result['othercookiename']==('othercookievalue',{})
def test_get_header_size(self):
h = flow.ODictCaseless()
h["headername"] = ["headervalue"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
result = r.get_header_size()
assert result==43
def test_get_transmitted_size(self):
h = flow.ODictCaseless()
h["headername"] = ["headervalue"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
result = r.get_transmitted_size()
assert result==len("content")
def test_get_content_type(self):
h = flow.ODictCaseless()
h["Content-Type"] = ["text/plain"]
c = flow.ClientConnect(("addr", 2222))
r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content")
assert r.get_content_type()=="text/plain"
class TestResponse:
def test_simple(self):
@ -989,6 +1040,68 @@ class TestResponse:
assert not r.headers["content-encoding"]
assert r.content == "falafel"
def test_get_header_size(self):
r = tutils.tresp()
result = r.get_header_size()
assert result==49
def test_get_transmitted_size(self):
r = tutils.tresp()
r.headers["content-encoding"] = ["identity"]
r.content = "falafel"
result = r.get_transmitted_size()
assert result==len("falafel")
def test_get_cookies_simple(self):
h = flow.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue"]
resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
result = resp.get_cookies()
assert len(result)==1
assert "cookiename" in result
assert result["cookiename"] == ("cookievalue", {})
def test_get_cookies_with_parameters(self):
h = flow.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
result = resp.get_cookies()
assert len(result)==1
assert "cookiename" in result
assert result["cookiename"][0] == "cookievalue"
assert len(result["cookiename"][1])==4
assert result["cookiename"][1]["domain"]=="example.com"
assert result["cookiename"][1]["expires"]=="Wed Oct 21 16:29:41 2015"
assert result["cookiename"][1]["path"]=="/"
assert result["cookiename"][1]["httponly"]==""
def test_get_cookies_no_value(self):
h = flow.ODictCaseless()
h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"]
resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
result = resp.get_cookies()
assert len(result)==1
assert "cookiename" in result
assert result["cookiename"][0] == ""
assert len(result["cookiename"][1])==2
def test_get_cookies_twocookies(self):
h = flow.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"]
resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
result = resp.get_cookies()
assert len(result)==2
assert "cookiename" in result
assert result["cookiename"] == ("cookievalue", {})
assert "othercookie" in result
assert result["othercookie"] == ("othervalue", {})
def test_get_content_type(self):
h = flow.ODictCaseless()
h["Content-Type"] = ["text/plain"]
resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None)
assert resp.get_content_type()=="text/plain"
class TestError:
def test_getset_state(self):