import os import shutil import tempfile import argparse import sys from cStringIO import StringIO from contextlib import contextmanager from nose.plugins.skip import SkipTest from mock import Mock import netlib.tutils from libmproxy import utils, controller from libmproxy.models import ( ClientConnection, ServerConnection, Error, HTTPRequest, HTTPResponse, HTTPFlow ) from libmproxy.console.flowview import FlowView from libmproxy.console import ConsoleState def _SkipWindows(): raise SkipTest("Skipped on Windows.") def SkipWindows(fn): if os.name == "nt": return _SkipWindows else: return fn def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None): """ @type client_conn: bool | None | libmproxy.proxy.connection.ClientConnection @type server_conn: bool | None | libmproxy.proxy.connection.ServerConnection @type req: bool | None | libmproxy.protocol.http.HTTPRequest @type resp: bool | None | libmproxy.protocol.http.HTTPResponse @type err: bool | None | libmproxy.protocol.primitives.Error @return: bool | None | libmproxy.protocol.http.HTTPFlow """ if client_conn is True: client_conn = tclient_conn() if server_conn is True: server_conn = tserver_conn() if req is True: req = netlib.tutils.treq() if resp is True: resp = netlib.tutils.tresp() if err is True: err = terr() if req: req = HTTPRequest.wrap(req) if resp: resp = HTTPResponse.wrap(resp) f = HTTPFlow(client_conn, server_conn) f.request = req f.response = resp f.error = err f.reply = controller.DummyReply() return f def tclient_conn(): """ @return: libmproxy.proxy.connection.ClientConnection """ c = ClientConnection.from_state(dict( address=dict(address=("address", 22), use_ipv6=True), clientcert=None )) c.reply = controller.DummyReply() return c def tserver_conn(): """ @return: libmproxy.proxy.connection.ServerConnection """ c = ServerConnection.from_state(dict( address=dict(address=("address", 22), use_ipv6=True), state=[], source_address=dict(address=("address", 22), use_ipv6=True), cert=None )) c.reply = controller.DummyReply() return c def terr(content="error"): """ @return: libmproxy.protocol.primitives.Error """ err = Error(content) return err def tflowview(request_contents=None): m = Mock() cs = ConsoleState() if request_contents is None: flow = tflow() else: flow = tflow(req=netlib.tutils.treq(body=request_contents)) fv = FlowView(m, cs, flow) return fv def get_body_line(last_displayed_body, line_nb): return last_displayed_body.contents()[line_nb + 2] @contextmanager def tmpdir(*args, **kwargs): orig_workdir = os.getcwd() temp_workdir = tempfile.mkdtemp(*args, **kwargs) os.chdir(temp_workdir) yield temp_workdir os.chdir(orig_workdir) shutil.rmtree(temp_workdir) class MockParser(argparse.ArgumentParser): """ argparse.ArgumentParser sys.exits() by default. Make it more testable by throwing an exception instead. """ def error(self, message): raise Exception(message) def raises(exc, obj, *args, **kwargs): """ Assert that a callable raises a specified exception. :exc An exception class or a string. If a class, assert that an exception of this type is raised. If a string, assert that the string occurs in the string representation of the exception, based on a case-insenstivie match. :obj A callable object. :args Arguments to be passsed to the callable. :kwargs Arguments to be passed to the callable. """ try: obj(*args, **kwargs) except Exception as v: if isinstance(exc, basestring): if exc.lower() in str(v).lower(): return else: raise AssertionError( "Expected %s, but caught %s" % ( repr(str(exc)), v ) ) else: if isinstance(v, exc): return else: raise AssertionError( "Expected %s, but caught %s %s" % ( exc.__name__, v.__class__.__name__, str(v) ) ) raise AssertionError("No exception raised.") @contextmanager def capture_stderr(command, *args, **kwargs): out, sys.stderr = sys.stderr, StringIO() command(*args, **kwargs) yield sys.stderr.getvalue() sys.stderr = out test_data = utils.Data(__name__)