diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 2c4c55133..9238cfbf0 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -197,6 +197,16 @@ class decoded(object): class HTTPMsg(controller.Msg): + def get_decoded_content(self): + """ + Returns the decoded content based on the current Content-Encoding header. + Doesn't change the message iteself or its headers. + """ + ce = self.headers.get_first("content-encoding") + if not self.content or ce not in encoding.ENCODINGS: + return self.content + return encoding.decode(ce, self.content) + def decode(self): """ Decodes content based on the current Content-Encoding header, then @@ -232,7 +242,15 @@ class HTTPMsg(controller.Msg): else: return hl + def get_content_type(self): + return self.headers.get_first("content-type") + def get_transmitted_size(self): + # FIXME: this is inprecise in case chunking is used + # (we should count the chunking headers) + if not self.content: + return 0 + return len(self.content) class Request(HTTPMsg): """ @@ -459,6 +477,28 @@ class Request(HTTPMsg): self.scheme, self.host, self.port, self.path = parts return True + def get_cookies(self): + cookie_headers = self.headers.get("cookie") + if not cookie_headers: + return None + + cookies = [] + for header in cookie_headers: + pairs = [pair.partition("=") for pair in header.split(';')] + cookies.extend((pair[0],(pair[2],{})) for pair in pairs) + return dict(cookies) + + def get_header_size(self): + FMT = '%s %s HTTP/%s.%s\r\n%s\r\n' + assembled_header = FMT % ( + self.method, + self.path, + self.httpversion[0], + self.httpversion[1], + str(self.headers) + ) + return len(assembled_header) + def _assemble_head(self, proxy=False): FMT = '%s %s HTTP/%s.%s\r\n%s\r\n' FMT_PROXY = '%s %s://%s:%s%s HTTP/%s.%s\r\n%s\r\n' @@ -713,6 +753,25 @@ class Response(HTTPMsg): c += self.headers.replace(pattern, repl, *args, **kwargs) return c + def get_header_size(self): + FMT = '%s\r\n%s\r\n' + proto = "HTTP/%s.%s %s %s"%(self.httpversion[0], self.httpversion[1], self.code, str(self.msg)) + assembled_header = FMT % (proto, str(self.headers)) + return len(assembled_header) + + def get_cookies(self): + cookie_headers = self.headers.get("set-cookie") + if not cookie_headers: + return None + + cookies = [] + for header in cookie_headers: + pairs = [pair.partition("=") for pair in header.split(';')] + cookie_name = pairs[0][0] # the key of the first key/value pairs + cookie_value = pairs[0][2] # the value of the first key/value pairs + cookie_parameters = {key.strip().lower():value.strip() for key,sep,value in pairs[1:]} + cookies.append((cookie_name, (cookie_value, cookie_parameters))) + return dict(cookies) class ClientDisconnect(controller.Msg): """ diff --git a/test/test_flow.py b/test/test_flow.py index 2af702ce4..ed3a988f8 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -892,6 +892,57 @@ class TestRequest: assert not r.headers["content-encoding"] assert r.content == "falafel" + def test_get_cookies_single(self): + h = flow.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + result = r.get_cookies() + assert len(result)==1 + assert result['cookiename']==('cookievalue',{}) + + def test_get_cookies_double(self): + h = flow.ODictCaseless() + h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + result = r.get_cookies() + assert len(result)==2 + assert result['cookiename']==('cookievalue',{}) + assert result['othercookiename']==('othercookievalue',{}) + + def test_get_cookies_withequalsign(self): + h = flow.ODictCaseless() + h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + result = r.get_cookies() + assert len(result)==2 + assert result['cookiename']==('coo=kievalue',{}) + assert result['othercookiename']==('othercookievalue',{}) + + def test_get_header_size(self): + h = flow.ODictCaseless() + h["headername"] = ["headervalue"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + result = r.get_header_size() + assert result==43 + + def test_get_transmitted_size(self): + h = flow.ODictCaseless() + h["headername"] = ["headervalue"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + result = r.get_transmitted_size() + assert result==len("content") + + def test_get_content_type(self): + h = flow.ODictCaseless() + h["Content-Type"] = ["text/plain"] + c = flow.ClientConnect(("addr", 2222)) + r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + assert r.get_content_type()=="text/plain" class TestResponse: def test_simple(self): @@ -989,6 +1040,68 @@ class TestResponse: assert not r.headers["content-encoding"] assert r.content == "falafel" + def test_get_header_size(self): + r = tutils.tresp() + result = r.get_header_size() + assert result==49 + + def test_get_transmitted_size(self): + r = tutils.tresp() + r.headers["content-encoding"] = ["identity"] + r.content = "falafel" + result = r.get_transmitted_size() + assert result==len("falafel") + + def test_get_cookies_simple(self): + h = flow.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue"] + resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + result = resp.get_cookies() + assert len(result)==1 + assert "cookiename" in result + assert result["cookiename"] == ("cookievalue", {}) + + def test_get_cookies_with_parameters(self): + h = flow.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"] + resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + result = resp.get_cookies() + assert len(result)==1 + assert "cookiename" in result + assert result["cookiename"][0] == "cookievalue" + assert len(result["cookiename"][1])==4 + assert result["cookiename"][1]["domain"]=="example.com" + assert result["cookiename"][1]["expires"]=="Wed Oct 21 16:29:41 2015" + assert result["cookiename"][1]["path"]=="/" + assert result["cookiename"][1]["httponly"]=="" + + def test_get_cookies_no_value(self): + h = flow.ODictCaseless() + h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"] + resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + result = resp.get_cookies() + assert len(result)==1 + assert "cookiename" in result + assert result["cookiename"][0] == "" + assert len(result["cookiename"][1])==2 + + def test_get_cookies_twocookies(self): + h = flow.ODictCaseless() + h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"] + resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + result = resp.get_cookies() + assert len(result)==2 + assert "cookiename" in result + assert result["cookiename"] == ("cookievalue", {}) + assert "othercookie" in result + assert result["othercookie"] == ("othervalue", {}) + + def test_get_content_type(self): + h = flow.ODictCaseless() + h["Content-Type"] = ["text/plain"] + resp = flow.Response(None, (1, 1), 200, "OK", h, "content", None) + assert resp.get_content_type()=="text/plain" + class TestError: def test_getset_state(self):