Refactor pathoc

We're getting ready for websockets. All the output specifiers are now on the
Pathoc object itself - we can't assume that all input and output happens in
response to a method call any more. This has the upside that we can unify the
request/print_request methods.
This commit is contained in:
Aldo Cortesi 2015-04-19 18:04:27 +12:00
parent f8469a283b
commit 3891fe5638
3 changed files with 153 additions and 124 deletions

View File

@ -379,7 +379,6 @@ def args_pathod(argv, stdout=sys.stdout, stderr=sys.stderr):
if os.path.isfile(spec):
data = open(spec).read()
spec = data
try:
req = language.parse_response(spec)
except language.ParseException, v:

View File

@ -21,6 +21,40 @@ class SSLInfo:
def __init__(self, certchain, cipher):
self.certchain, self.cipher = certchain, cipher
def __str__(self):
parts = [
"Cipher: %s, %s bit, %s"%self.cipher,
"SSL certificate chain:"
]
for i in self.certchain:
parts.append("\tSubject: ")
for cn in i.get_subject().get_components():
parts.append("\t\t%s=%s"%cn)
parts.append("\tIssuer: ")
for cn in i.get_issuer().get_components():
parts.append("\t\t%s=%s"%cn)
parts.extend(
[
"\tVersion: %s"%i.get_version(),
"\tValidity: %s - %s"%(
i.get_notBefore(), i.get_notAfter()
),
"\tSerial: %s"%i.get_serial_number(),
"\tAlgorithm: %s"%i.get_signature_algorithm()
]
)
pk = i.get_pubkey()
types = {
OpenSSL.crypto.TYPE_RSA: "RSA",
OpenSSL.crypto.TYPE_DSA: "DSA"
}
t = types.get(pk.type(), "Uknown")
parts.append("\tPubkey: %s bit %s"%(pk.bits(), t))
s = certutils.SSLCert(i)
if s.altnames:
parts.append("\tSANs: %s"%" ".join(s.altnames))
return "\n".join(parts)
class Response:
def __init__(
@ -45,11 +79,34 @@ class Pathoc(tcp.TCPClient):
def __init__(
self,
address,
# SSL
ssl=None,
sni=None,
sslversion=4,
clientcert=None,
ciphers=None):
ciphers=None,
# Output control
showreq = False,
showresp = False,
explain = False,
hexdump = False,
ignorecodes = False,
ignoretimeout = False,
showsummary = False,
fp = sys.stderr
):
"""
spec: A request specification
showreq: Print requests
showresp: Print responses
explain: Print request explanation
showssl: Print info on SSL connection
hexdump: When printing requests or responses, use hex dump output
showsummary: Show a summary of requests
ignorecodes: Sequence of return codes to ignore
"""
tcp.TCPClient.__init__(self, address)
self.settings = dict(
staticdir = os.getcwd(),
@ -60,6 +117,15 @@ class Pathoc(tcp.TCPClient):
self.sslversion = utils.SSLVERSIONS[sslversion]
self.ciphers = ciphers
self.showreq = showreq
self.showresp = showresp
self.explain = explain
self.hexdump = hexdump
self.ignorecodes = ignorecodes
self.ignoretimeout = ignoretimeout
self.showsummary = showsummary
self.fp = fp
def http_connect(self, connect_to):
self.wfile.write(
'CONNECT %s:%s HTTP/1.1\r\n'%tuple(connect_to) +
@ -98,25 +164,11 @@ class Pathoc(tcp.TCPClient):
self.get_current_cipher()
)
if showssl:
self.print_sslinfo(self.sslinfo, fp)
print >> fp, str(self.sslinfo)
def request(self, spec):
"""
Return a Response object.
May raise language.ParseException, netlib.http.HttpError or
language.FileAccessDenied.
"""
r = language.parse_requests(spec)[0]
language.serve(r, self.wfile, self.settings, self.address.host)
self.wfile.flush()
ret = list(http.read_response(self.rfile, r.method.string(), None))
ret.append(self.sslinfo)
return Response(*ret)
def _show_summary(self, fp, httpversion, code, msg, headers, content):
def _show_summary(self, fp, resp):
print >> fp, "<< %s %s: %s bytes"%(
code, utils.xrepr(msg), len(content)
resp.status_code, utils.xrepr(resp.msg), len(resp.content)
)
def _show(self, fp, header, data, hexdump):
@ -128,65 +180,20 @@ class Pathoc(tcp.TCPClient):
print >> fp, "%s (unprintables escaped):"%header
print >> fp, netlib.utils.cleanBin(data)
def print_sslinfo(self, sslinfo, fp):
print >> fp, "Cipher: %s, %s bit, %s"%self.sslinfo.cipher
print >> fp, "SSL certificate chain:\n"
for i in self.sslinfo.certchain:
print >> fp, "\tSubject: ",
for cn in i.get_subject().get_components():
print >> fp, "%s=%s"%cn,
print >> fp
print >> fp, "\tIssuer: ",
for cn in i.get_issuer().get_components():
print >> fp, "%s=%s"%cn,
print >> fp
print >> fp, "\tVersion: %s"%i.get_version()
print >> fp, "\tValidity: %s - %s"%(
i.get_notBefore(), i.get_notAfter()
)
print >> fp, "\tSerial: %s"%i.get_serial_number()
print >> fp, "\tAlgorithm: %s"%i.get_signature_algorithm()
pk = i.get_pubkey()
types = {
OpenSSL.crypto.TYPE_RSA: "RSA",
OpenSSL.crypto.TYPE_DSA: "DSA"
}
t = types.get(pk.type(), "Uknown")
print >> fp, "\tPubkey: %s bit %s"%(pk.bits(), t)
s = certutils.SSLCert(i)
if s.altnames:
print >> fp, "\tSANs:", " ".join(s.altnames)
print >> fp
def print_request(
self,
r,
showreq,
showresp,
explain,
hexdump,
ignorecodes,
ignoretimeout,
fp=sys.stdout
):
def request(self, r):
"""
Performs a series of requests, and prints results to the specified
file descriptor.
Performs a single request.
spec: A request specification
showreq: Print requests
showresp: Print responses
explain: Print request explanation
showssl: Print info on SSL connection
hexdump: When printing requests or responses, use hex dump output
ignorecodes: Sequence of return codes to ignore
r: A language.Request object, or a string representing one request.
Returns True if we have a non-ignored response.
"""
if isinstance(r, basestring):
r = language.parse_requests(r)[0]
resp, req = None, None
if showreq:
if self.showreq:
self.wfile.start_log()
if showresp:
if self.showresp:
self.rfile.start_log()
try:
req = language.serve(
@ -196,32 +203,44 @@ class Pathoc(tcp.TCPClient):
self.address.host
)
self.wfile.flush()
resp = http.read_response(self.rfile, r.method.string(), None)
resp = list(http.read_response(self.rfile, r.method.string(), None))
resp.append(self.sslinfo)
resp = Response(*resp)
except http.HttpError, v:
print >> fp, "<< HTTP Error:", v.message
if self.showsummary:
print >> self.fp, "<< HTTP Error:", v.message
raise
except tcp.NetLibTimeout:
if ignoretimeout:
return
print >> fp, "<<", "Timeout"
if self.ignoretimeout:
return None
if self.showsummary:
print >> self.fp, "<<", "Timeout"
raise
except tcp.NetLibDisconnect: # pragma: nocover
print >> fp, "<<", "Disconnect"
if self.showsummary:
print >> self.fp, "<<", "Disconnect"
raise
finally:
if req:
if self.ignorecodes and resp and resp.status_code in self.ignorecodes:
return None
if req:
if ignorecodes and resp and resp[1] in ignorecodes:
return
if self.explain:
print >> self.fp, ">> Spec:", r.spec()
if explain:
print >> fp, ">> Spec:", r.spec()
if self.showreq:
self._show(
self.fp, ">> Request", self.wfile.get_log(), self.hexdump
)
if showreq:
self._show(fp, ">> Request", self.wfile.get_log(), hexdump)
if showresp:
self._show(fp, "<< Response", self.rfile.get_log(), hexdump)
else:
if resp:
self._show_summary(fp, *resp)
return True
if self.showsummary and resp:
self._show_summary(self.fp, resp)
if self.showresp:
self._show(
self.fp, "<< Response", self.rfile.get_log(), self.hexdump
)
return resp
def main(args): # pragma: nocover
@ -245,11 +264,18 @@ def main(args): # pragma: nocover
playlist = args.requests
p = Pathoc(
(args.host, args.port),
ssl=args.ssl,
sni=args.sni,
sslversion=args.sslversion,
clientcert=args.clientcert,
ciphers=args.ciphers
ssl = args.ssl,
sni = args.sni,
sslversion = args.sslversion,
clientcert = args.clientcert,
ciphers = args.ciphers,
showreq = args.showreq,
showresp = args.showresp,
explain = args.explain,
hexdump = args.hexdump,
ignorecodes = args.ignorecodes,
ignoretimeout = args.ignoretimeout,
showsummary = True
)
if args.explain or args.memo:
playlist = [
@ -279,17 +305,12 @@ def main(args): # pragma: nocover
if args.timeout:
p.settimeout(args.timeout)
for spec in playlist:
ret = p.print_request(
spec,
showreq=args.showreq,
showresp=args.showresp,
explain=args.explain,
hexdump=args.hexdump,
ignorecodes=args.ignorecodes,
ignoretimeout=args.ignoretimeout
)
sys.stdout.flush()
if ret and args.oneshot:
sys.exit(0)
try:
ret = p.request(spec)
sys.stdout.flush()
if ret and args.oneshot:
return
except (http.HttpError, tcp.NetlibError), v:
pass
except KeyboardInterrupt:
pass

View File

@ -2,6 +2,7 @@ import json
import cStringIO
import re
from netlib import tcp, http
from libpathod import pathoc, test, version, pathod, language
import tutils
@ -38,8 +39,8 @@ class _TestDaemon:
ssl = self.ssl
)
c.connect()
r = c.request("get:/api/info")
assert tuple(json.loads(r.content)["version"]) == version.IVERSION
resp = c.request("get:/api/info")
assert tuple(json.loads(resp.content)["version"]) == version.IVERSION
def tval(
self,
@ -51,10 +52,22 @@ class _TestDaemon:
hexdump=False,
timeout=None,
ignorecodes=None,
ignoretimeout=None
ignoretimeout=None,
showsummary=True
):
s = cStringIO.StringIO()
c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=self.ssl)
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = self.ssl,
showreq = showreq,
showresp = showresp,
explain = explain,
hexdump = hexdump,
ignorecodes = ignorecodes,
ignoretimeout = ignoretimeout,
showsummary = showsummary,
fp = s
)
c.connect(showssl=showssl, fp=s)
if timeout:
c.settimeout(timeout)
@ -62,16 +75,10 @@ class _TestDaemon:
r = language.parse_requests(i)[0]
if explain:
r = r.freeze({})
c.print_request(
r,
showreq = showreq,
showresp = showresp,
explain = explain,
hexdump = hexdump,
ignorecodes = ignorecodes,
ignoretimeout = ignoretimeout,
fp = s
)
try:
c.request(r)
except (http.HttpError, tcp.NetLibError), v:
pass
return s.getvalue()
@ -121,6 +128,8 @@ class TestDaemon(_TestDaemon):
assert not "certificate chain" in self.tval(["get:/p/200"], showssl=True)
def test_ignorecodes(self):
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200])
assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200, 201])
@ -129,7 +138,7 @@ class TestDaemon(_TestDaemon):
def test_timeout(self):
assert "Timeout" in self.tval(["get:'/p/200:p0,10'"], timeout=0.01)
assert "HTTP" in self.tval(["get:'/p/200:p5,10'"], showresp=True, timeout=0.01)
assert not "HTTP" in self.tval(["get:'/p/200:p5,10'"], showresp=True, timeout=0.01, ignoretimeout=True)
assert not "HTTP" in self.tval(["get:'/p/200:p3,10'"], showresp=True, timeout=0.01, ignoretimeout=True)
def test_showresp(self):
reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
@ -138,7 +147,7 @@ class TestDaemon(_TestDaemon):
assert self.tval(reqs, showresp=True, hexdump=True).count("hex dump") == 2
def test_showresp_httperr(self):
v = self.tval(["get:'/p/200:d20'"], showresp=True)
v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True)
assert "Invalid headers" in v
assert "HTTP/" in v