Add accessor method for SSLCert object on Response.

This commit is contained in:
Aldo Cortesi 2012-04-02 17:01:43 +12:00
parent ab1d8fa350
commit 4979a22d3e
2 changed files with 20 additions and 6 deletions

View File

@ -21,7 +21,7 @@ import hashlib, Cookie, cookielib, copy, re, urlparse
import time import time
import tnetstring, filt, script, utils, encoding, proxy import tnetstring, filt, script, utils, encoding, proxy
from email.utils import parsedate_tz, formatdate, mktime_tz from email.utils import parsedate_tz, formatdate, mktime_tz
import controller, version import controller, version, certutils
HDR_FORM_URLENCODED = "application/x-www-form-urlencoded" HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
@ -566,12 +566,12 @@ class Response(HTTPMsg):
content: Response content content: Response content
timestamp: Seconds since the epoch timestamp: Seconds since the epoch
""" """
def __init__(self, request, code, msg, headers, content, peercert, timestamp=None): def __init__(self, request, code, msg, headers, content, der_cert, timestamp=None):
assert isinstance(headers, ODictCaseless) assert isinstance(headers, ODictCaseless)
self.request = request self.request = request
self.code, self.msg = code, msg self.code, self.msg = code, msg
self.headers, self.content = headers, content self.headers, self.content = headers, content
self.peercert = peercert self.der_cert = der_cert
self.timestamp = timestamp or utils.timestamp() self.timestamp = timestamp or utils.timestamp()
controller.Msg.__init__(self) controller.Msg.__init__(self)
self.replay = False self.replay = False
@ -641,7 +641,14 @@ class Response(HTTPMsg):
self.headers = ODictCaseless._from_state(state["headers"]) self.headers = ODictCaseless._from_state(state["headers"])
self.content = state["content"] self.content = state["content"]
self.timestamp = state["timestamp"] self.timestamp = state["timestamp"]
self.peercert = state["peercert"] self.der_cert = state["der_cert"]
def get_cert(self):
"""
Returns a certutils.SSLCert object, or None.
"""
if self.der_cert:
return certutils.SSLCert.from_der(self.der_cert)
def _get_state(self): def _get_state(self):
return dict( return dict(
@ -649,7 +656,7 @@ class Response(HTTPMsg):
msg = self.msg, msg = self.msg,
headers = self.headers._get_state(), headers = self.headers._get_state(),
timestamp = self.timestamp, timestamp = self.timestamp,
peercert = self.peercert, der_cert = self.der_cert,
content = self.content content = self.content
) )
@ -661,7 +668,7 @@ class Response(HTTPMsg):
str(state["msg"]), str(state["msg"]),
ODictCaseless._from_state(state["headers"]), ODictCaseless._from_state(state["headers"]),
state["content"], state["content"],
state.get("peercert"), state.get("der_cert"),
state["timestamp"], state["timestamp"],
) )

View File

@ -835,6 +835,13 @@ class uResponse(libpry.AutoTree):
c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
assert "00:21:38" in r._refresh_cookie(c, 60) assert "00:21:38" in r._refresh_cookie(c, 60)
def test_get_cert(self):
req = tutils.treq()
resp = flow.Response(req, 200, "msg", flow.ODictCaseless(), "content", file("data/dercert").read())
assert resp.get_cert()
resp = tutils.tresp()
assert not resp.get_cert()
def test_getset_state(self): def test_getset_state(self):
h = flow.ODictCaseless() h = flow.ODictCaseless()