mitmproxy/test/tutils.py

185 lines
4.7 KiB
Python
Raw Normal View History

2015-05-30 00:03:28 +00:00
import os
import shutil
import tempfile
import argparse
2014-09-06 11:09:57 +00:00
import sys
2015-08-01 08:40:19 +00:00
from cStringIO import StringIO
from contextlib import contextmanager
2015-08-30 13:27:29 +00:00
2015-08-01 08:40:19 +00:00
from nose.plugins.skip import SkipTest
from mock import Mock
import netlib.tutils
2015-08-30 13:27:29 +00:00
from libmproxy import utils, controller
from libmproxy.models import (
ClientConnection, ServerConnection, Error, HTTPRequest, HTTPResponse, HTTPFlow
)
from libmproxy.console.flowview import FlowView
from libmproxy.console import ConsoleState
2015-05-30 00:03:28 +00:00
def _SkipWindows():
raise SkipTest("Skipped on Windows.")
2015-05-30 00:03:28 +00:00
def SkipWindows(fn):
if os.name == "nt":
return _SkipWindows
else:
return fn
def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
"""
@type client_conn: bool | None | libmproxy.proxy.connection.ClientConnection
@type server_conn: bool | None | libmproxy.proxy.connection.ServerConnection
@type req: bool | None | libmproxy.protocol.http.HTTPRequest
@type resp: bool | None | libmproxy.protocol.http.HTTPResponse
@type err: bool | None | libmproxy.protocol.primitives.Error
@return: bool | None | libmproxy.protocol.http.HTTPFlow
"""
if client_conn is True:
client_conn = tclient_conn()
if server_conn is True:
server_conn = tserver_conn()
if req is True:
2015-08-01 08:40:19 +00:00
req = netlib.tutils.treq()
if resp is True:
2015-08-01 08:40:19 +00:00
resp = netlib.tutils.tresp()
if err is True:
err = terr()
2015-08-01 08:40:19 +00:00
if req:
2015-08-30 13:27:29 +00:00
req = HTTPRequest.wrap(req)
2015-08-01 08:40:19 +00:00
if resp:
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(resp)
2015-08-01 08:40:19 +00:00
2015-08-30 13:27:29 +00:00
f = HTTPFlow(client_conn, server_conn)
f.request = req
f.response = resp
f.error = err
f.reply = controller.DummyReply()
return f
def tclient_conn():
"""
@return: libmproxy.proxy.connection.ClientConnection
"""
c = ClientConnection.from_state(dict(
address=dict(address=("address", 22), use_ipv6=True),
clientcert=None
))
2014-02-04 04:02:17 +00:00
c.reply = controller.DummyReply()
return c
def tserver_conn():
"""
@return: libmproxy.proxy.connection.ServerConnection
"""
c = ServerConnection.from_state(dict(
address=dict(address=("address", 22), use_ipv6=True),
state=[],
source_address=dict(address=("address", 22), use_ipv6=True),
cert=None
))
2014-02-04 04:02:17 +00:00
c.reply = controller.DummyReply()
return c
def terr(content="error"):
"""
@return: libmproxy.protocol.primitives.Error
"""
err = Error(content)
return err
2015-05-30 00:03:28 +00:00
def tflowview(request_contents=None):
2013-12-24 01:28:20 +00:00
m = Mock()
cs = ConsoleState()
2015-05-30 00:03:28 +00:00
if request_contents is None:
flow = tflow()
else:
2015-08-30 13:27:29 +00:00
flow = tflow(req=netlib.tutils.treq(request_contents))
2013-12-24 01:28:20 +00:00
fv = FlowView(m, cs, flow)
return fv
2015-05-30 00:03:28 +00:00
def get_body_line(last_displayed_body, line_nb):
return last_displayed_body.contents()[line_nb + 2]
2015-05-30 00:03:28 +00:00
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()
temp_workdir = tempfile.mkdtemp(*args, **kwargs)
os.chdir(temp_workdir)
yield temp_workdir
os.chdir(orig_workdir)
shutil.rmtree(temp_workdir)
class MockParser(argparse.ArgumentParser):
"""
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
2015-05-30 00:03:28 +00:00
def error(self, message):
raise Exception(message)
2012-06-08 22:57:00 +00:00
def raises(exc, obj, *args, **kwargs):
"""
Assert that a callable raises a specified exception.
:exc An exception class or a string. If a class, assert that an
exception of this type is raised. If a string, assert that the string
occurs in the string representation of the exception, based on a
case-insenstivie match.
:obj A callable object.
:args Arguments to be passsed to the callable.
:kwargs Arguments to be passed to the callable.
"""
try:
2015-05-30 00:03:28 +00:00
obj(*args, **kwargs)
except Exception as v:
2012-06-08 22:57:00 +00:00
if isinstance(exc, basestring):
if exc.lower() in str(v).lower():
return
else:
raise AssertionError(
2015-05-30 00:03:28 +00:00
"Expected %s, but caught %s" % (
2012-06-08 22:57:00 +00:00
repr(str(exc)), v
)
)
else:
if isinstance(v, exc):
return
else:
raise AssertionError(
2015-05-30 00:03:28 +00:00
"Expected %s, but caught %s %s" % (
2012-06-08 22:57:00 +00:00
exc.__name__, v.__class__.__name__, str(v)
)
)
raise AssertionError("No exception raised.")
2014-09-06 11:09:57 +00:00
@contextmanager
def capture_stderr(command, *args, **kwargs):
out, sys.stderr = sys.stderr, StringIO()
command(*args, **kwargs)
yield sys.stderr.getvalue()
sys.stderr = out
2015-08-30 13:27:29 +00:00
test_data = utils.Data(__name__)