import threading, Queue import os, shutil, tempfile from contextlib import contextmanager from libmproxy import proxy, flow, controller, utils from netlib import certutils import human_curl as hurl import libpathod.test, libpathod.pathoc def treq(conn=None): if not conn: conn = flow.ClientConnect(("address", 22)) headers = flow.ODictCaseless() headers["header"] = ["qvalue"] return flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers, "content") def tresp(req=None): if not req: req = treq() headers = flow.ODictCaseless() headers["header_response"] = ["svalue"] cert = certutils.SSLCert.from_der(file(test_data.path("data/dercert")).read()) return flow.Response(req, (1, 1), 200, "message", headers, "content_response", cert) def tflow(): r = treq() return flow.Flow(r) def tflow_full(): r = treq() f = flow.Flow(r) f.response = tresp(r) return f def tflow_err(): r = treq() f = flow.Flow(r) f.error = flow.Error(r, "error") return f class TestMaster(flow.FlowMaster): def __init__(self, testq, config): s = proxy.ProxyServer(config, 0) state = flow.State() flow.FlowMaster.__init__(self, s, state) self.testq = testq def handle(self, m): flow.FlowMaster.handle(self, m) m._ack() class ProxyThread(threading.Thread): def __init__(self, testq, config): self.tmaster = TestMaster(testq, config) controller.should_exit = False threading.Thread.__init__(self) @property def port(self): return self.tmaster.server.port def run(self): self.tmaster.run() def shutdown(self): self.tmaster.shutdown() class ProxTestBase: @classmethod def setupAll(cls): cls.tqueue = Queue.Queue() cls.server = libpathod.test.Daemon(ssl=cls.ssl) pconf = cls.get_proxy_config() config = proxy.ProxyConfig( certfile=test_data.path("data/testkey.pem"), **pconf ) cls.proxy = ProxyThread(cls.tqueue, config) cls.proxy.start() @property def master(cls): return cls.proxy.tmaster @classmethod def teardownAll(cls): cls.proxy.shutdown() cls.server.shutdown() def setUp(self): self.master.state.clear() @property def scheme(self): return "https" if self.ssl else "http" @property def proxies(self): """ The URL base for the server instance. """ return ( (self.scheme, ("127.0.0.1", self.proxy.port)) ) @property def urlbase(self): """ The URL base for the server instance. """ return self.server.urlbase def last_log(self): return self.server.last_log() class HTTPProxTest(ProxTestBase): ssl = None clientcerts = False @classmethod def get_proxy_config(cls): d = dict() if cls.clientcerts: d["clientcerts"] = test_data.path("data/clientcert") return d def pathoc(self, connect_to = None): p = libpathod.pathoc.Pathoc("localhost", self.proxy.port) p.connect(connect_to) return p def pathod(self, spec): """ Constructs a pathod request, with the appropriate base and proxy. """ return hurl.get( self.urlbase + "/p/" + spec, proxy=self.proxies, validate_cert=False, #debug=hurl.utils.stdout_debug ) class TResolver: def __init__(self, port): self.port = port def original_addr(self, sock): return ("127.0.0.1", self.port) class TransparentProxTest(ProxTestBase): ssl = None @classmethod def get_proxy_config(cls): return dict( transparent_proxy = dict( resolver = TResolver(cls.server.port), sslports = [] ) ) def pathod(self, spec): """ Constructs a pathod request, with the appropriate base and proxy. """ r = hurl.get( "http://127.0.0.1:%s"%self.proxy.port + "/p/" + spec, validate_cert=False, #debug=hurl.utils.stdout_debug ) return r class ReverseProxTest(ProxTestBase): ssl = None @classmethod def get_proxy_config(cls): return dict( reverse_proxy = ( "https" if cls.ssl else "http", "127.0.0.1", cls.server.port ) ) def pathod(self, spec): """ Constructs a pathod request, with the appropriate base and proxy. """ r = hurl.get( "http://127.0.0.1:%s"%self.proxy.port + "/p/" + spec, validate_cert=False, #debug=hurl.utils.stdout_debug ) return r @contextmanager def tmpdir(*args, **kwargs): orig_workdir = os.getcwd() temp_workdir = tempfile.mkdtemp(*args, **kwargs) os.chdir(temp_workdir) yield temp_workdir os.chdir(orig_workdir) shutil.rmtree(temp_workdir) def raises(exc, obj, *args, **kwargs): """ Assert that a callable raises a specified exception. :exc An exception class or a string. If a class, assert that an exception of this type is raised. If a string, assert that the string occurs in the string representation of the exception, based on a case-insenstivie match. :obj A callable object. :args Arguments to be passsed to the callable. :kwargs Arguments to be passed to the callable. """ try: apply(obj, args, kwargs) except Exception, v: if isinstance(exc, basestring): if exc.lower() in str(v).lower(): return else: raise AssertionError( "Expected %s, but caught %s"%( repr(str(exc)), v ) ) else: if isinstance(v, exc): return else: raise AssertionError( "Expected %s, but caught %s %s"%( exc.__name__, v.__class__.__name__, str(v) ) ) raise AssertionError("No exception raised.") test_data = utils.Data(__name__)