mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
improve proxyauth tests
This commit is contained in:
parent
f55df034e6
commit
0c5b56f7ce
@ -10,197 +10,242 @@ from mitmproxy.test import tflow
|
|||||||
from mitmproxy.test import tutils
|
from mitmproxy.test import tutils
|
||||||
|
|
||||||
|
|
||||||
def test_parse_http_basic_auth():
|
class TestMkauth:
|
||||||
assert proxyauth.parse_http_basic_auth(
|
def test_mkauth_scheme(self):
|
||||||
proxyauth.mkauth("test", "test")
|
assert proxyauth.mkauth('username', 'password') == 'basic dXNlcm5hbWU6cGFzc3dvcmQ=\n'
|
||||||
) == ("basic", "test", "test")
|
|
||||||
with pytest.raises(ValueError):
|
@pytest.mark.parametrize('scheme, expected', [
|
||||||
proxyauth.parse_http_basic_auth("")
|
('', ' dXNlcm5hbWU6cGFzc3dvcmQ=\n'),
|
||||||
with pytest.raises(ValueError):
|
('basic', 'basic dXNlcm5hbWU6cGFzc3dvcmQ=\n'),
|
||||||
proxyauth.parse_http_basic_auth("foo bar")
|
('foobar', 'foobar dXNlcm5hbWU6cGFzc3dvcmQ=\n'),
|
||||||
with pytest.raises(ValueError):
|
])
|
||||||
proxyauth.parse_http_basic_auth("basic abc")
|
def test_mkauth(self, scheme, expected):
|
||||||
with pytest.raises(ValueError):
|
assert proxyauth.mkauth('username', 'password', scheme) == expected
|
||||||
v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
|
|
||||||
proxyauth.parse_http_basic_auth(v)
|
|
||||||
|
|
||||||
|
|
||||||
def test_configure():
|
class TestParseHttpBasicAuth:
|
||||||
up = proxyauth.ProxyAuth()
|
@pytest.mark.parametrize('input', [
|
||||||
with taddons.context() as ctx:
|
'',
|
||||||
with pytest.raises(exceptions.OptionsError):
|
'foo bar',
|
||||||
ctx.configure(up, proxyauth="foo")
|
'basic abc',
|
||||||
|
'basic ' + binascii.b2a_base64(b"foo").decode("ascii"),
|
||||||
|
])
|
||||||
|
def test_parse_http_basic_auth_error(self, input):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
proxyauth.parse_http_basic_auth(input)
|
||||||
|
|
||||||
ctx.configure(up, proxyauth="foo:bar")
|
def test_parse_http_basic_auth(self):
|
||||||
assert up.singleuser == ["foo", "bar"]
|
input = proxyauth.mkauth("test", "test")
|
||||||
|
assert proxyauth.parse_http_basic_auth(input) == ("basic", "test", "test")
|
||||||
|
|
||||||
ctx.configure(up, proxyauth=None)
|
|
||||||
assert up.singleuser is None
|
|
||||||
|
|
||||||
ctx.configure(up, proxyauth="any")
|
class TestProxyAuth:
|
||||||
assert up.nonanonymous
|
@pytest.mark.parametrize('mode, expected', [
|
||||||
ctx.configure(up, proxyauth=None)
|
('', False),
|
||||||
assert not up.nonanonymous
|
('foobar', False),
|
||||||
|
('regular', True),
|
||||||
|
('upstream:', True),
|
||||||
|
('upstream:foobar', True),
|
||||||
|
])
|
||||||
|
def test_is_proxy_auth(self, mode, expected):
|
||||||
|
up = proxyauth.ProxyAuth()
|
||||||
|
with taddons.context() as ctx:
|
||||||
|
ctx.options.mode = mode
|
||||||
|
assert up.is_proxy_auth() is expected
|
||||||
|
|
||||||
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
@pytest.mark.parametrize('is_proxy_auth, expected', [
|
||||||
with mock.patch('ldap3.Connection', return_value="test"):
|
(True, 'Proxy-Authorization'),
|
||||||
ctx.configure(up, proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
(False, 'Authorization'),
|
||||||
assert up.ldapserver
|
])
|
||||||
ctx.configure(up, proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
def test_which_auth_header(self, is_proxy_auth, expected):
|
||||||
assert up.ldapserver
|
up = proxyauth.ProxyAuth()
|
||||||
|
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||||
|
assert up.which_auth_header() == expected
|
||||||
|
|
||||||
with pytest.raises(exceptions.OptionsError):
|
@pytest.mark.parametrize('is_proxy_auth, expected_status_code, expected_header', [
|
||||||
ctx.configure(up, proxyauth="ldap:test:test:test")
|
(True, 407, 'Proxy-Authenticate'),
|
||||||
|
(False, 401, 'WWW-Authenticate'),
|
||||||
|
])
|
||||||
|
def test_auth_required_response(self, is_proxy_auth, expected_status_code, expected_header):
|
||||||
|
up = proxyauth.ProxyAuth()
|
||||||
|
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||||
|
resp = up.auth_required_response()
|
||||||
|
assert resp.status_code == expected_status_code
|
||||||
|
assert expected_header in resp.headers.keys()
|
||||||
|
|
||||||
with pytest.raises(IndexError):
|
def test_check(self):
|
||||||
ctx.configure(up, proxyauth="ldap:fake_serveruid=?dc=example,dc=com:person")
|
up = proxyauth.ProxyAuth()
|
||||||
|
with taddons.context() as ctx:
|
||||||
|
ctx.configure(up, proxyauth="any", mode="regular")
|
||||||
|
f = tflow.tflow()
|
||||||
|
assert not up.check(f)
|
||||||
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
|
"test", "test"
|
||||||
|
)
|
||||||
|
assert up.check(f)
|
||||||
|
|
||||||
with pytest.raises(exceptions.OptionsError):
|
f.request.headers["Proxy-Authorization"] = "invalid"
|
||||||
ctx.configure(up, proxyauth="ldapssssssss:fake_server:dn:password:tree")
|
assert not up.check(f)
|
||||||
|
|
||||||
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
|
"test", "test", scheme="unknown"
|
||||||
|
)
|
||||||
|
assert not up.check(f)
|
||||||
|
|
||||||
|
ctx.configure(up, proxyauth="test:test")
|
||||||
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
|
"test", "test"
|
||||||
|
)
|
||||||
|
assert up.check(f)
|
||||||
|
ctx.configure(up, proxyauth="test:foo")
|
||||||
|
assert not up.check(f)
|
||||||
|
|
||||||
with pytest.raises(exceptions.OptionsError):
|
|
||||||
ctx.configure(
|
ctx.configure(
|
||||||
up,
|
up,
|
||||||
proxyauth= "@" + tutils.test_data.path("mitmproxy/net/data/server.crt")
|
proxyauth="@" + tutils.test_data.path(
|
||||||
|
"mitmproxy/net/data/htpasswd"
|
||||||
|
)
|
||||||
)
|
)
|
||||||
with pytest.raises(exceptions.OptionsError):
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
ctx.configure(up, proxyauth="@nonexistent")
|
"test", "test"
|
||||||
|
|
||||||
ctx.configure(
|
|
||||||
up,
|
|
||||||
proxyauth= "@" + tutils.test_data.path(
|
|
||||||
"mitmproxy/net/data/htpasswd"
|
|
||||||
)
|
)
|
||||||
)
|
assert up.check(f)
|
||||||
assert up.htpasswd
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
assert up.htpasswd.check_password("test", "test")
|
"test", "foo"
|
||||||
assert not up.htpasswd.check_password("test", "foo")
|
|
||||||
ctx.configure(up, proxyauth=None)
|
|
||||||
assert not up.htpasswd
|
|
||||||
|
|
||||||
with pytest.raises(exceptions.OptionsError):
|
|
||||||
ctx.configure(up, proxyauth="any", mode="transparent")
|
|
||||||
with pytest.raises(exceptions.OptionsError):
|
|
||||||
ctx.configure(up, proxyauth="any", mode="socks5")
|
|
||||||
|
|
||||||
|
|
||||||
def test_check(monkeypatch):
|
|
||||||
up = proxyauth.ProxyAuth()
|
|
||||||
with taddons.context() as ctx:
|
|
||||||
ctx.configure(up, proxyauth="any", mode="regular")
|
|
||||||
f = tflow.tflow()
|
|
||||||
assert not up.check(f)
|
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "test"
|
|
||||||
)
|
|
||||||
assert up.check(f)
|
|
||||||
|
|
||||||
f.request.headers["Proxy-Authorization"] = "invalid"
|
|
||||||
assert not up.check(f)
|
|
||||||
|
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "test", scheme="unknown"
|
|
||||||
)
|
|
||||||
assert not up.check(f)
|
|
||||||
|
|
||||||
ctx.configure(up, proxyauth="test:test")
|
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "test"
|
|
||||||
)
|
|
||||||
assert up.check(f)
|
|
||||||
ctx.configure(up, proxyauth="test:foo")
|
|
||||||
assert not up.check(f)
|
|
||||||
|
|
||||||
ctx.configure(
|
|
||||||
up,
|
|
||||||
proxyauth="@" + tutils.test_data.path(
|
|
||||||
"mitmproxy/net/data/htpasswd"
|
|
||||||
)
|
)
|
||||||
)
|
assert not up.check(f)
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "test"
|
|
||||||
)
|
|
||||||
assert up.check(f)
|
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "foo"
|
|
||||||
)
|
|
||||||
assert not up.check(f)
|
|
||||||
|
|
||||||
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
||||||
with mock.patch('ldap3.Connection', search="test"):
|
with mock.patch('ldap3.Connection', search="test"):
|
||||||
with mock.patch('ldap3.Connection.search', return_value="test"):
|
with mock.patch('ldap3.Connection.search', return_value="test"):
|
||||||
ctx.configure(
|
ctx.configure(
|
||||||
up,
|
up,
|
||||||
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||||
)
|
)
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
"test", "test"
|
"test", "test"
|
||||||
)
|
)
|
||||||
assert up.check(f)
|
assert up.check(f)
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
"", ""
|
"", ""
|
||||||
)
|
)
|
||||||
assert not up.check(f)
|
assert not up.check(f)
|
||||||
|
|
||||||
|
def test_authenticate(self):
|
||||||
|
up = proxyauth.ProxyAuth()
|
||||||
|
with taddons.context() as ctx:
|
||||||
|
ctx.configure(up, proxyauth="any", mode="regular")
|
||||||
|
|
||||||
def test_authenticate():
|
f = tflow.tflow()
|
||||||
up = proxyauth.ProxyAuth()
|
assert not f.response
|
||||||
with taddons.context() as ctx:
|
up.authenticate(f)
|
||||||
ctx.configure(up, proxyauth="any", mode="regular")
|
assert f.response.status_code == 407
|
||||||
|
|
||||||
f = tflow.tflow()
|
f = tflow.tflow()
|
||||||
assert not f.response
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
up.authenticate(f)
|
"test", "test"
|
||||||
assert f.response.status_code == 407
|
)
|
||||||
|
up.authenticate(f)
|
||||||
|
assert not f.response
|
||||||
|
assert not f.request.headers.get("Proxy-Authorization")
|
||||||
|
|
||||||
f = tflow.tflow()
|
f = tflow.tflow()
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
ctx.configure(up, mode="reverse")
|
||||||
"test", "test"
|
assert not f.response
|
||||||
)
|
up.authenticate(f)
|
||||||
up.authenticate(f)
|
assert f.response.status_code == 401
|
||||||
assert not f.response
|
|
||||||
assert not f.request.headers.get("Proxy-Authorization")
|
|
||||||
|
|
||||||
f = tflow.tflow()
|
f = tflow.tflow()
|
||||||
ctx.configure(up, mode="reverse")
|
f.request.headers["Authorization"] = proxyauth.mkauth(
|
||||||
assert not f.response
|
"test", "test"
|
||||||
up.authenticate(f)
|
)
|
||||||
assert f.response.status_code == 401
|
up.authenticate(f)
|
||||||
|
assert not f.response
|
||||||
|
assert not f.request.headers.get("Authorization")
|
||||||
|
|
||||||
f = tflow.tflow()
|
def test_configure(self):
|
||||||
f.request.headers["Authorization"] = proxyauth.mkauth(
|
up = proxyauth.ProxyAuth()
|
||||||
"test", "test"
|
with taddons.context() as ctx:
|
||||||
)
|
with pytest.raises(exceptions.OptionsError):
|
||||||
up.authenticate(f)
|
ctx.configure(up, proxyauth="foo")
|
||||||
assert not f.response
|
|
||||||
assert not f.request.headers.get("Authorization")
|
|
||||||
|
|
||||||
|
ctx.configure(up, proxyauth="foo:bar")
|
||||||
|
assert up.singleuser == ["foo", "bar"]
|
||||||
|
|
||||||
def test_handlers():
|
ctx.configure(up, proxyauth=None)
|
||||||
up = proxyauth.ProxyAuth()
|
assert up.singleuser is None
|
||||||
with taddons.context() as ctx:
|
|
||||||
ctx.configure(up, proxyauth="any", mode="regular")
|
|
||||||
|
|
||||||
f = tflow.tflow()
|
ctx.configure(up, proxyauth="any")
|
||||||
assert not f.response
|
assert up.nonanonymous
|
||||||
up.requestheaders(f)
|
ctx.configure(up, proxyauth=None)
|
||||||
assert f.response.status_code == 407
|
assert not up.nonanonymous
|
||||||
|
|
||||||
f = tflow.tflow()
|
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
||||||
f.request.method = "CONNECT"
|
with mock.patch('ldap3.Connection', return_value="test"):
|
||||||
assert not f.response
|
ctx.configure(up, proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
||||||
up.http_connect(f)
|
assert up.ldapserver
|
||||||
assert f.response.status_code == 407
|
ctx.configure(up, proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
||||||
|
assert up.ldapserver
|
||||||
|
|
||||||
f = tflow.tflow()
|
with pytest.raises(exceptions.OptionsError):
|
||||||
f.request.method = "CONNECT"
|
ctx.configure(up, proxyauth="ldap:test:test:test")
|
||||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
|
||||||
"test", "test"
|
|
||||||
)
|
|
||||||
up.http_connect(f)
|
|
||||||
assert not f.response
|
|
||||||
|
|
||||||
f2 = tflow.tflow(client_conn=f.client_conn)
|
with pytest.raises(IndexError):
|
||||||
up.requestheaders(f2)
|
ctx.configure(up, proxyauth="ldap:fake_serveruid=?dc=example,dc=com:person")
|
||||||
assert not f2.response
|
|
||||||
assert f2.metadata["proxyauth"] == ('test', 'test')
|
with pytest.raises(exceptions.OptionsError):
|
||||||
|
ctx.configure(up, proxyauth="ldapssssssss:fake_server:dn:password:tree")
|
||||||
|
|
||||||
|
with pytest.raises(exceptions.OptionsError):
|
||||||
|
ctx.configure(
|
||||||
|
up,
|
||||||
|
proxyauth= "@" + tutils.test_data.path("mitmproxy/net/data/server.crt")
|
||||||
|
)
|
||||||
|
with pytest.raises(exceptions.OptionsError):
|
||||||
|
ctx.configure(up, proxyauth="@nonexistent")
|
||||||
|
|
||||||
|
ctx.configure(
|
||||||
|
up,
|
||||||
|
proxyauth= "@" + tutils.test_data.path(
|
||||||
|
"mitmproxy/net/data/htpasswd"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert up.htpasswd
|
||||||
|
assert up.htpasswd.check_password("test", "test")
|
||||||
|
assert not up.htpasswd.check_password("test", "foo")
|
||||||
|
ctx.configure(up, proxyauth=None)
|
||||||
|
assert not up.htpasswd
|
||||||
|
|
||||||
|
with pytest.raises(exceptions.OptionsError):
|
||||||
|
ctx.configure(up, proxyauth="any", mode="transparent")
|
||||||
|
with pytest.raises(exceptions.OptionsError):
|
||||||
|
ctx.configure(up, proxyauth="any", mode="socks5")
|
||||||
|
|
||||||
|
def test_handlers(self):
|
||||||
|
up = proxyauth.ProxyAuth()
|
||||||
|
with taddons.context() as ctx:
|
||||||
|
ctx.configure(up, proxyauth="any", mode="regular")
|
||||||
|
|
||||||
|
f = tflow.tflow()
|
||||||
|
assert not f.response
|
||||||
|
up.requestheaders(f)
|
||||||
|
assert f.response.status_code == 407
|
||||||
|
|
||||||
|
f = tflow.tflow()
|
||||||
|
f.request.method = "CONNECT"
|
||||||
|
assert not f.response
|
||||||
|
up.http_connect(f)
|
||||||
|
assert f.response.status_code == 407
|
||||||
|
|
||||||
|
f = tflow.tflow()
|
||||||
|
f.request.method = "CONNECT"
|
||||||
|
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||||
|
"test", "test"
|
||||||
|
)
|
||||||
|
up.http_connect(f)
|
||||||
|
assert not f.response
|
||||||
|
|
||||||
|
f2 = tflow.tflow(client_conn=f.client_conn)
|
||||||
|
up.requestheaders(f2)
|
||||||
|
assert not f2.response
|
||||||
|
assert f2.metadata["proxyauth"] == ('test', 'test')
|
||||||
|
Loading…
Reference in New Issue
Block a user