mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 03:14:22 +00:00
191 lines
5.8 KiB
Python
191 lines
5.8 KiB
Python
import mock
|
|
from OpenSSL import SSL
|
|
|
|
from libmproxy import cmdline
|
|
from libmproxy.proxy import ProxyConfig
|
|
from libmproxy.proxy.config import process_proxy_options
|
|
from libmproxy.models.connections import ServerConnection
|
|
from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
|
|
import tutils
|
|
from libpathod import test
|
|
from netlib import http, tcp
|
|
|
|
|
|
class TestServerConnection:
|
|
def setUp(self):
|
|
self.d = test.Daemon()
|
|
|
|
def tearDown(self):
|
|
self.d.shutdown()
|
|
|
|
def test_simple(self):
|
|
sc = ServerConnection((self.d.IFACE, self.d.port))
|
|
sc.connect()
|
|
f = tutils.tflow()
|
|
f.server_conn = sc
|
|
f.request.path = "/p/200:da"
|
|
|
|
# use this protocol just to assemble - not for actual sending
|
|
protocol = http.http1.HTTP1Protocol(rfile=sc.rfile)
|
|
sc.send(protocol.assemble(f.request))
|
|
|
|
protocol = http.http1.HTTP1Protocol(rfile=sc.rfile)
|
|
assert protocol.read_response(f.request.method, 1000)
|
|
assert self.d.last_log()
|
|
|
|
sc.finish()
|
|
|
|
def test_terminate_error(self):
|
|
sc = ServerConnection((self.d.IFACE, self.d.port))
|
|
sc.connect()
|
|
sc.connection = mock.Mock()
|
|
sc.connection.recv = mock.Mock(return_value=False)
|
|
sc.connection.flush = mock.Mock(side_effect=tcp.NetLibDisconnect)
|
|
sc.finish()
|
|
|
|
def test_repr(self):
|
|
sc = tutils.tserver_conn()
|
|
assert "address:22" in repr(sc)
|
|
assert "ssl" not in repr(sc)
|
|
sc.ssl_established = True
|
|
assert "ssl" in repr(sc)
|
|
sc.sni = "foo"
|
|
assert "foo" in repr(sc)
|
|
|
|
|
|
class TestProcessProxyOptions:
|
|
def p(self, *args):
|
|
parser = tutils.MockParser()
|
|
cmdline.common_options(parser)
|
|
opts = parser.parse_args(args=args)
|
|
return parser, process_proxy_options(parser, opts)
|
|
|
|
def assert_err(self, err, *args):
|
|
tutils.raises(err, self.p, *args)
|
|
|
|
def assert_noerr(self, *args):
|
|
m, p = self.p(*args)
|
|
assert p
|
|
return p
|
|
|
|
def test_simple(self):
|
|
assert self.p()
|
|
|
|
def test_cadir(self):
|
|
with tutils.tmpdir() as cadir:
|
|
self.assert_noerr("--cadir", cadir)
|
|
|
|
@mock.patch("libmproxy.platform.resolver", None)
|
|
def test_no_transparent(self):
|
|
self.assert_err("transparent mode not supported", "-T")
|
|
|
|
@mock.patch("libmproxy.platform.resolver")
|
|
def test_modes(self, _):
|
|
self.assert_noerr("-R", "http://localhost")
|
|
self.assert_err("expected one argument", "-R")
|
|
self.assert_err("Invalid server specification", "-R", "reverse")
|
|
|
|
self.assert_noerr("-T")
|
|
|
|
self.assert_noerr("-U", "http://localhost")
|
|
self.assert_err("expected one argument", "-U")
|
|
self.assert_err("Invalid server specification", "-U", "upstream")
|
|
|
|
self.assert_err("not allowed with", "-R", "http://localhost", "-T")
|
|
|
|
def test_socks_auth(self):
|
|
self.assert_err("Proxy Authentication not supported in SOCKS mode.", "--socks", "--nonanonymous")
|
|
|
|
def test_client_certs(self):
|
|
with tutils.tmpdir() as cadir:
|
|
self.assert_noerr("--client-certs", cadir)
|
|
self.assert_err(
|
|
"directory does not exist",
|
|
"--client-certs",
|
|
"nonexistent")
|
|
|
|
def test_certs(self):
|
|
with tutils.tmpdir() as cadir:
|
|
self.assert_noerr(
|
|
"--cert",
|
|
tutils.test_data.path("data/testkey.pem"))
|
|
self.assert_err("does not exist", "--cert", "nonexistent")
|
|
|
|
def test_auth(self):
|
|
p = self.assert_noerr("--nonanonymous")
|
|
assert p.authenticator
|
|
|
|
p = self.assert_noerr(
|
|
"--htpasswd",
|
|
tutils.test_data.path("data/htpasswd"))
|
|
assert p.authenticator
|
|
self.assert_err(
|
|
"malformed htpasswd file",
|
|
"--htpasswd",
|
|
tutils.test_data.path("data/htpasswd.invalid"))
|
|
|
|
p = self.assert_noerr("--singleuser", "test:test")
|
|
assert p.authenticator
|
|
self.assert_err(
|
|
"invalid single-user specification",
|
|
"--singleuser",
|
|
"test")
|
|
|
|
def test_verify_upstream_cert(self):
|
|
p = self.assert_noerr("--verify-upstream-cert")
|
|
assert p.openssl_verification_mode_server == SSL.VERIFY_PEER
|
|
|
|
def test_upstream_trusted_cadir(self):
|
|
expected_dir = "/path/to/a/ca/dir"
|
|
p = self.assert_noerr("--upstream-trusted-cadir", expected_dir)
|
|
assert p.openssl_trusted_cadir_server == expected_dir
|
|
|
|
def test_upstream_trusted_ca(self):
|
|
expected_file = "/path/to/a/cert/file"
|
|
p = self.assert_noerr("--upstream-trusted-ca", expected_file)
|
|
assert p.openssl_trusted_ca_server == expected_file
|
|
|
|
|
|
class TestProxyServer:
|
|
# binding to 0.0.0.0:1 works without special permissions on Windows
|
|
@tutils.SkipWindows
|
|
def test_err(self):
|
|
conf = ProxyConfig(
|
|
port=1
|
|
)
|
|
tutils.raises("error starting proxy server", ProxyServer, conf)
|
|
|
|
def test_err_2(self):
|
|
conf = ProxyConfig(
|
|
host="invalidhost"
|
|
)
|
|
tutils.raises("error starting proxy server", ProxyServer, conf)
|
|
|
|
|
|
class TestDummyServer:
|
|
def test_simple(self):
|
|
d = DummyServer(None)
|
|
d.start_slave()
|
|
d.shutdown()
|
|
|
|
|
|
class TestConnectionHandler:
|
|
def test_fatal_error(self):
|
|
config = mock.Mock()
|
|
root_layer = mock.Mock()
|
|
root_layer.side_effect = RuntimeError
|
|
config.mode.return_value = root_layer
|
|
channel = mock.Mock()
|
|
|
|
def ask(_, x):
|
|
return x
|
|
channel.ask = ask
|
|
c = ConnectionHandler(
|
|
mock.MagicMock(),
|
|
("127.0.0.1", 8080),
|
|
config,
|
|
channel
|
|
)
|
|
with tutils.capture_stderr(c.handle) as output:
|
|
assert "mitmproxy has crashed" in output
|