mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 03:14:22 +00:00
306 lines
8.3 KiB
Python
306 lines
8.3 KiB
Python
import os.path
|
|
import threading, Queue
|
|
import shutil, tempfile
|
|
import flask
|
|
from libmproxy.proxy.config import ProxyConfig
|
|
from libmproxy.proxy.server import ProxyServer
|
|
from libmproxy.proxy.primitives import TransparentUpstreamServerResolver
|
|
import libpathod.test, libpathod.pathoc
|
|
from libmproxy import flow, controller
|
|
from libmproxy.cmdline import APP_HOST, APP_PORT
|
|
import tutils
|
|
|
|
testapp = flask.Flask(__name__)
|
|
|
|
@testapp.route("/")
|
|
def hello():
|
|
return "testapp"
|
|
|
|
@testapp.route("/error")
|
|
def error():
|
|
raise ValueError("An exception...")
|
|
|
|
|
|
def errapp(environ, start_response):
|
|
raise ValueError("errapp")
|
|
|
|
|
|
class TestMaster(flow.FlowMaster):
|
|
def __init__(self, config):
|
|
s = ProxyServer(config, 0)
|
|
state = flow.State()
|
|
flow.FlowMaster.__init__(self, s, state)
|
|
self.apps.add(testapp, "testapp", 80)
|
|
self.apps.add(errapp, "errapp", 80)
|
|
self.clear_log()
|
|
|
|
def handle_request(self, m):
|
|
flow.FlowMaster.handle_request(self, m)
|
|
m.reply()
|
|
|
|
def handle_response(self, m):
|
|
flow.FlowMaster.handle_response(self, m)
|
|
m.reply()
|
|
|
|
def clear_log(self):
|
|
self.log = []
|
|
|
|
def handle_log(self, l):
|
|
self.log.append(l.msg)
|
|
l.reply()
|
|
|
|
|
|
class ProxyThread(threading.Thread):
|
|
def __init__(self, tmaster):
|
|
threading.Thread.__init__(self)
|
|
self.tmaster = tmaster
|
|
self.name = "ProxyThread (%s:%s)" % (tmaster.server.address.host, tmaster.server.address.port)
|
|
controller.should_exit = False
|
|
|
|
@property
|
|
def port(self):
|
|
return self.tmaster.server.address.port
|
|
|
|
@property
|
|
def log(self):
|
|
return self.tmaster.log
|
|
|
|
def run(self):
|
|
self.tmaster.run()
|
|
|
|
def shutdown(self):
|
|
self.tmaster.shutdown()
|
|
|
|
|
|
class ProxTestBase(object):
|
|
# Test Configuration
|
|
ssl = None
|
|
ssloptions = False
|
|
clientcerts = False
|
|
no_upstream_cert = False
|
|
authenticator = None
|
|
masterclass = TestMaster
|
|
externalapp = False
|
|
certforward = False
|
|
@classmethod
|
|
def setupAll(cls):
|
|
cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
|
|
cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
|
|
pconf = cls.get_proxy_config()
|
|
cls.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
|
config = ProxyConfig(
|
|
no_upstream_cert = cls.no_upstream_cert,
|
|
confdir = cls.confdir,
|
|
authenticator = cls.authenticator,
|
|
certforward = cls.certforward,
|
|
**pconf
|
|
)
|
|
tmaster = cls.masterclass(config)
|
|
tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
|
|
cls.proxy = ProxyThread(tmaster)
|
|
cls.proxy.start()
|
|
|
|
@classmethod
|
|
def tearDownAll(cls):
|
|
shutil.rmtree(cls.confdir)
|
|
|
|
@property
|
|
def master(cls):
|
|
return cls.proxy.tmaster
|
|
|
|
@classmethod
|
|
def teardownAll(cls):
|
|
cls.proxy.shutdown()
|
|
cls.server.shutdown()
|
|
cls.server2.shutdown()
|
|
|
|
def setUp(self):
|
|
self.master.clear_log()
|
|
self.master.state.clear()
|
|
self.server.clear_log()
|
|
self.server2.clear_log()
|
|
|
|
@property
|
|
def scheme(self):
|
|
return "https" if self.ssl else "http"
|
|
|
|
@property
|
|
def proxies(self):
|
|
"""
|
|
The URL base for the server instance.
|
|
"""
|
|
return (
|
|
(self.scheme, ("127.0.0.1", self.proxy.port))
|
|
)
|
|
|
|
@classmethod
|
|
def get_proxy_config(cls):
|
|
d = dict()
|
|
if cls.clientcerts:
|
|
d["clientcerts"] = tutils.test_data.path("data/clientcert")
|
|
return d
|
|
|
|
|
|
class HTTPProxTest(ProxTestBase):
|
|
def pathoc_raw(self):
|
|
return libpathod.pathoc.Pathoc(("127.0.0.1", self.proxy.port))
|
|
|
|
def pathoc(self, sni=None):
|
|
"""
|
|
Returns a connected Pathoc instance.
|
|
"""
|
|
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
|
|
if self.ssl:
|
|
p.connect(("127.0.0.1", self.server.port))
|
|
else:
|
|
p.connect()
|
|
return p
|
|
|
|
def pathod(self, spec, sni=None):
|
|
"""
|
|
Constructs a pathod GET request, with the appropriate base and proxy.
|
|
"""
|
|
p = self.pathoc(sni=sni)
|
|
spec = spec.encode("string_escape")
|
|
if self.ssl:
|
|
q = "get:'/p/%s'"%spec
|
|
else:
|
|
q = "get:'%s/p/%s'"%(self.server.urlbase, spec)
|
|
return p.request(q)
|
|
|
|
def app(self, page):
|
|
if self.ssl:
|
|
p = libpathod.pathoc.Pathoc(("127.0.0.1", self.proxy.port), True)
|
|
p.connect((APP_HOST, APP_PORT))
|
|
return p.request("get:'/%s'"%page)
|
|
else:
|
|
p = self.pathoc()
|
|
return p.request("get:'http://%s/%s'"%(APP_HOST, page))
|
|
|
|
|
|
class TResolver:
|
|
def __init__(self, port):
|
|
self.port = port
|
|
|
|
def original_addr(self, sock):
|
|
return ("127.0.0.1", self.port)
|
|
|
|
|
|
class TransparentProxTest(ProxTestBase):
|
|
ssl = None
|
|
resolver = TResolver
|
|
@classmethod
|
|
def get_proxy_config(cls):
|
|
d = ProxTestBase.get_proxy_config()
|
|
if cls.ssl:
|
|
ports = [cls.server.port, cls.server2.port]
|
|
else:
|
|
ports = []
|
|
d["get_upstream_server"] = TransparentUpstreamServerResolver(cls.resolver(cls.server.port), ports)
|
|
d["http_form_in"] = "relative"
|
|
d["http_form_out"] = "relative"
|
|
return d
|
|
|
|
def pathod(self, spec, sni=None):
|
|
"""
|
|
Constructs a pathod GET request, with the appropriate base and proxy.
|
|
"""
|
|
if self.ssl:
|
|
p = self.pathoc(sni=sni)
|
|
q = "get:'/p/%s'"%spec
|
|
else:
|
|
p = self.pathoc()
|
|
q = "get:'/p/%s'"%spec
|
|
return p.request(q)
|
|
|
|
def pathoc(self, sni=None):
|
|
"""
|
|
Returns a connected Pathoc instance.
|
|
"""
|
|
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
|
|
p.connect()
|
|
return p
|
|
|
|
|
|
class ReverseProxTest(ProxTestBase):
|
|
ssl = None
|
|
@classmethod
|
|
def get_proxy_config(cls):
|
|
d = ProxTestBase.get_proxy_config()
|
|
d["get_upstream_server"] = lambda c: (
|
|
True if cls.ssl else False,
|
|
True if cls.ssl else False,
|
|
"127.0.0.1",
|
|
cls.server.port
|
|
)
|
|
d["http_form_in"] = "relative"
|
|
d["http_form_out"] = "relative"
|
|
return d
|
|
|
|
def pathoc(self, sni=None):
|
|
"""
|
|
Returns a connected Pathoc instance.
|
|
"""
|
|
p = libpathod.pathoc.Pathoc(("localhost", self.proxy.port), ssl=self.ssl, sni=sni)
|
|
p.connect()
|
|
return p
|
|
|
|
def pathod(self, spec, sni=None):
|
|
"""
|
|
Constructs a pathod GET request, with the appropriate base and proxy.
|
|
"""
|
|
if self.ssl:
|
|
p = self.pathoc(sni=sni)
|
|
q = "get:'/p/%s'"%spec
|
|
else:
|
|
p = self.pathoc()
|
|
q = "get:'/p/%s'"%spec
|
|
return p.request(q)
|
|
|
|
|
|
class ChainProxTest(ProxTestBase):
|
|
"""
|
|
Chain n instances of mitmproxy in a row - because we can.
|
|
"""
|
|
n = 2
|
|
chain_config = [lambda port: ProxyConfig(
|
|
get_upstream_server = lambda c: (False, False, "127.0.0.1", port),
|
|
http_form_in = "absolute",
|
|
http_form_out = "absolute"
|
|
)] * n
|
|
@classmethod
|
|
def setupAll(cls):
|
|
super(ChainProxTest, cls).setupAll()
|
|
cls.chain = []
|
|
for i in range(cls.n):
|
|
config = cls.chain_config[i](cls.proxy.port if i == 0 else cls.chain[-1].port)
|
|
tmaster = cls.masterclass(config)
|
|
tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
|
|
cls.chain.append(ProxyThread(tmaster))
|
|
cls.chain[-1].start()
|
|
|
|
@classmethod
|
|
def teardownAll(cls):
|
|
super(ChainProxTest, cls).teardownAll()
|
|
for p in cls.chain:
|
|
p.tmaster.server.shutdown()
|
|
|
|
def setUp(self):
|
|
super(ChainProxTest, self).setUp()
|
|
for p in self.chain:
|
|
p.tmaster.clear_log()
|
|
p.tmaster.state.clear()
|
|
|
|
|
|
class HTTPChainProxyTest(ChainProxTest):
|
|
def pathoc(self, sni=None):
|
|
"""
|
|
Returns a connected Pathoc instance.
|
|
"""
|
|
p = libpathod.pathoc.Pathoc(("localhost", self.chain[-1].port), ssl=self.ssl, sni=sni)
|
|
if self.ssl:
|
|
p.connect(("127.0.0.1", self.server.port))
|
|
else:
|
|
p.connect()
|
|
return p
|