mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
test libmproxy.app, increase coverage
This commit is contained in:
parent
66090f9aea
commit
9f5f2b7071
@ -4,9 +4,11 @@ import os.path
|
||||
mapp = flask.Flask(__name__)
|
||||
mapp.debug = True
|
||||
|
||||
|
||||
def master():
|
||||
return flask.request.environ["mitmproxy.master"]
|
||||
|
||||
|
||||
@mapp.route("/")
|
||||
def index():
|
||||
return flask.render_template("index.html", section="home")
|
||||
@ -16,12 +18,12 @@ def index():
|
||||
def certs_pem():
|
||||
capath = master().server.config.cacert
|
||||
p = os.path.splitext(capath)[0] + "-cert.pem"
|
||||
return flask.Response(open(p).read(), mimetype='application/x-x509-ca-cert')
|
||||
return flask.Response(open(p, "rb").read(), mimetype='application/x-x509-ca-cert')
|
||||
|
||||
|
||||
@mapp.route("/cert/p12")
|
||||
def certs_p12():
|
||||
capath = master().server.config.cacert
|
||||
p = os.path.splitext(capath)[0] + "-cert.p12"
|
||||
return flask.Response(open(p).read(), mimetype='application/x-pkcs12')
|
||||
return flask.Response(open(p, "rb").read(), mimetype='application/x-pkcs12')
|
||||
|
||||
|
@ -2,14 +2,15 @@
|
||||
This module provides more sophisticated flow tracking. These match requests
|
||||
with their responses, and provide filtering and interception facilities.
|
||||
"""
|
||||
import hashlib, Cookie, cookielib, copy, re, urlparse, threading
|
||||
import time, urllib
|
||||
import types
|
||||
import tnetstring, filt, script, utils, encoding
|
||||
from email.utils import parsedate_tz, formatdate, mktime_tz
|
||||
from netlib import odict, http, certutils, wsgi
|
||||
from .proxy import ClientConnection, ServerConnection
|
||||
import controller, version, protocol, stateobject
|
||||
import base64
|
||||
import hashlib, Cookie, cookielib, re, threading
|
||||
import os
|
||||
from flask import request
|
||||
import requests
|
||||
import tnetstring, filt, script
|
||||
from netlib import odict, wsgi
|
||||
from .proxy import ClientConnection, ServerConnection # FIXME: remove circular dependency
|
||||
import controller, version, protocol
|
||||
import app
|
||||
from .protocol import KILL
|
||||
from .protocol.http import HTTPResponse, CONTENT_MISSING
|
||||
@ -19,7 +20,6 @@ ODict = odict.ODict
|
||||
ODictCaseless = odict.ODictCaseless
|
||||
|
||||
|
||||
|
||||
class AppRegistry:
|
||||
def __init__(self):
|
||||
self.apps = {}
|
||||
@ -453,7 +453,28 @@ class FlowMaster(controller.Master):
|
||||
port
|
||||
)
|
||||
else:
|
||||
threading.Thread(target=app.mapp.run,kwargs={
|
||||
@app.mapp.before_request
|
||||
def patch_environ(*args, **kwargs):
|
||||
request.environ["mitmproxy.master"] = self
|
||||
|
||||
# the only absurd way to shut down a flask/werkzeug server.
|
||||
# http://flask.pocoo.org/snippets/67/
|
||||
shutdown_secret = base64.b32encode(os.urandom(30))
|
||||
|
||||
@app.mapp.route('/shutdown/<secret>')
|
||||
def shutdown(secret):
|
||||
if secret == shutdown_secret:
|
||||
request.environ.get('werkzeug.server.shutdown')()
|
||||
|
||||
# Workaround: Monkey-patch shutdown function to stop the app.
|
||||
# Improve this when we switch flask werkzeug for something useful.
|
||||
_shutdown = self.shutdown
|
||||
def _shutdownwrap():
|
||||
_shutdown()
|
||||
requests.get("http://%s:%s/shutdown/%s" % (host, port, shutdown_secret))
|
||||
self.shutdown = _shutdownwrap
|
||||
|
||||
threading.Thread(target=app.mapp.run, kwargs={
|
||||
"use_reloader": False,
|
||||
"host": host,
|
||||
"port": port}).start()
|
||||
|
@ -15,14 +15,14 @@ class ProtocolHandler(object):
|
||||
self.c = c
|
||||
"""@type: libmproxy.proxy.ConnectionHandler"""
|
||||
|
||||
def handle_messages(self):
|
||||
def handle_messages(self): # pragma: nocover
|
||||
"""
|
||||
This method gets called if a client connection has been made. Depending on the proxy settings,
|
||||
a server connection might already exist as well.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def handle_error(self, error):
|
||||
def handle_error(self, error): # pragma: nocover
|
||||
"""
|
||||
This method gets called should there be an uncaught exception during the connection.
|
||||
This might happen outside of handle_messages, e.g. if the initial SSL handshake fails in transparent mode.
|
||||
@ -90,7 +90,7 @@ def _handler(conntype, connection_handler):
|
||||
if conntype in protocols:
|
||||
return protocols[conntype]["handler"](connection_handler)
|
||||
|
||||
raise NotImplementedError
|
||||
raise NotImplementedError # pragma: nocover
|
||||
|
||||
|
||||
def handle_messages(conntype, connection_handler):
|
||||
|
@ -824,7 +824,7 @@ class HttpAuthenticationError(Exception):
|
||||
def __init__(self, auth_headers=None):
|
||||
self.auth_headers = auth_headers
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self): # pragma: nocover
|
||||
return "HttpAuthenticationError"
|
||||
|
||||
|
||||
|
@ -88,12 +88,6 @@ class Flow(stateobject.SimpleStateObject, BackreferenceMixin):
|
||||
d.update(version=version.IVERSION)
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def _from_state(cls, state):
|
||||
f = cls(None, None, None)
|
||||
f._load_state(state)
|
||||
return f
|
||||
|
||||
def __eq__(self, other):
|
||||
return self is other
|
||||
|
||||
|
@ -1,13 +1,17 @@
|
||||
class StateObject(object):
|
||||
def _get_state(self):
|
||||
def _get_state(self): # pragma: nocover
|
||||
raise NotImplementedError
|
||||
|
||||
def _load_state(self, state):
|
||||
def _load_state(self, state): # pragma: nocover
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def _from_state(cls, state):
|
||||
def _from_state(cls, state): # pragma: nocover
|
||||
raise NotImplementedError
|
||||
# Usually, this function roughly equals to the following code:
|
||||
# f = cls()
|
||||
# f._load_state(state)
|
||||
# return f
|
||||
|
||||
def __eq__(self, other):
|
||||
try:
|
||||
@ -66,10 +70,4 @@ class SimpleStateObject(StateObject):
|
||||
elif hasattr(cls, "_from_state"):
|
||||
setattr(self, attr, cls._from_state(state[attr]))
|
||||
else:
|
||||
setattr(self, attr, cls(state[attr]))
|
||||
|
||||
@classmethod
|
||||
def _from_state(cls, state):
|
||||
f = cls() # the default implementation assumes an empty constructor. Override accordingly.
|
||||
f._load_state(state)
|
||||
return f
|
||||
setattr(self, attr, cls(state[attr]))
|
50
test/test_app.py
Normal file
50
test/test_app.py
Normal file
@ -0,0 +1,50 @@
|
||||
import mock, socket, os, sys
|
||||
from libmproxy import dump
|
||||
from netlib import certutils
|
||||
from libpathod.pathoc import Pathoc
|
||||
import tutils
|
||||
|
||||
def get_free_port():
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(("127.0.0.1", 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
class AppTestMixin(object):
|
||||
def request(self, spec):
|
||||
p = Pathoc(("127.0.0.1", self.port))
|
||||
p.connect()
|
||||
return p.request(spec)
|
||||
|
||||
def test_basic(self):
|
||||
assert self.request("get:/").status_code == 200
|
||||
assert self.request("get:/").status_code == 200 # Check for connection close
|
||||
assert len(self.m.apps.apps) == 0
|
||||
|
||||
def test_cert(self):
|
||||
with tutils.tmpdir() as d:
|
||||
# Create Certs
|
||||
path = os.path.join(d, "test")
|
||||
assert certutils.dummy_ca(path)
|
||||
self.m.server.config.cacert = path
|
||||
|
||||
for ext in ["pem", "p12"]:
|
||||
resp = self.request("get:/cert/%s" % ext)
|
||||
assert resp.status_code == 200
|
||||
with open(path + "-cert.%s" % ext, "rb") as f:
|
||||
assert resp.content == f.read()
|
||||
|
||||
class TestAppExternal(AppTestMixin):
|
||||
@classmethod
|
||||
def setupAll(cls):
|
||||
cls.port = get_free_port()
|
||||
o = dump.Options(app=True, app_external=True, app_host="127.0.0.1", app_port=cls.port)
|
||||
s = mock.MagicMock()
|
||||
cls.m = dump.DumpMaster(s, o, None)
|
||||
|
||||
|
||||
@classmethod
|
||||
def teardownAll(cls):
|
||||
cls.m.shutdown()
|
Loading…
Reference in New Issue
Block a user