mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-25 09:37:37 +00:00
refactor proxyauth addon
the previous version was difficult to read, this is hopefully better now.
This commit is contained in:
parent
a3eca0b859
commit
73d809a4c7
@ -1,5 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import binascii
|
||||
import weakref
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import MutableMapping
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
@ -7,14 +10,136 @@ from typing import Tuple
|
||||
import ldap3
|
||||
import passlib.apache
|
||||
|
||||
from mitmproxy import ctx, connection
|
||||
from mitmproxy import connection, ctx
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net.http import status_codes
|
||||
from mitmproxy.proxy.layers import modes
|
||||
|
||||
REALM = "mitmproxy"
|
||||
|
||||
|
||||
class ProxyAuth:
|
||||
validator: Optional[Validator] = None
|
||||
|
||||
def __init__(self):
|
||||
self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"proxyauth", Optional[str], None,
|
||||
"""
|
||||
Require proxy authentication. Format:
|
||||
"username:pass",
|
||||
"any" to accept any user/pass combination,
|
||||
"@path" to use an Apache htpasswd file,
|
||||
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "proxyauth" not in updated:
|
||||
return
|
||||
auth = ctx.options.proxyauth
|
||||
if auth:
|
||||
if ctx.options.mode == "transparent":
|
||||
raise exceptions.OptionsError("Proxy Authentication not supported in transparent mode.")
|
||||
|
||||
if auth == "any":
|
||||
self.validator = AcceptAll()
|
||||
elif auth.startswith("@"):
|
||||
self.validator = Htpasswd(auth)
|
||||
elif ctx.options.proxyauth.startswith("ldap"):
|
||||
self.validator = Ldap(auth)
|
||||
elif ":" in ctx.options.proxyauth:
|
||||
self.validator = SingleUser(auth)
|
||||
else:
|
||||
raise exceptions.OptionsError("Invalid proxyauth specification.")
|
||||
else:
|
||||
self.validator = None
|
||||
|
||||
def socks5_auth(self, data: modes.Socks5AuthData) -> None:
|
||||
if self.validator and self.validator(data.username, data.password):
|
||||
data.valid = True
|
||||
|
||||
def http_connect(self, f: http.HTTPFlow) -> None:
|
||||
if self.validator:
|
||||
if self.authenticate_http(f):
|
||||
# Make a note that all further requests over this connection are ok.
|
||||
self.authenticated[f.client_conn] = f.metadata["proxyauth"]
|
||||
|
||||
def requestheaders(self, f: http.HTTPFlow) -> None:
|
||||
if self.validator:
|
||||
# Is this connection authenticated by a previous HTTP CONNECT?
|
||||
if f.client_conn in self.authenticated:
|
||||
f.metadata["proxyauth"] = self.authenticated[f.client_conn]
|
||||
else:
|
||||
self.authenticate_http(f)
|
||||
|
||||
def authenticate_http(self, f: http.HTTPFlow) -> bool:
|
||||
"""
|
||||
Authenticate an HTTP request, returns if authentication was successful.
|
||||
|
||||
If valid credentials are found, the matching authentication header is removed.
|
||||
In no or invalid credentials are found, flow.response is set to an error page.
|
||||
"""
|
||||
assert self.validator
|
||||
username = None
|
||||
password = None
|
||||
is_valid = False
|
||||
try:
|
||||
auth_value = f.request.headers.get(self.http_auth_header, "")
|
||||
scheme, username, password = parse_http_basic_auth(auth_value)
|
||||
is_valid = self.validator(username, password)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if is_valid:
|
||||
f.metadata["proxyauth"] = (username, password)
|
||||
del f.request.headers[self.http_auth_header]
|
||||
return True
|
||||
else:
|
||||
f.response = self.make_auth_required_response()
|
||||
return False
|
||||
|
||||
def make_auth_required_response(self) -> http.Response:
|
||||
if self.is_http_proxy:
|
||||
status_code = status_codes.PROXY_AUTH_REQUIRED
|
||||
headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
|
||||
else:
|
||||
status_code = status_codes.UNAUTHORIZED
|
||||
headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
|
||||
|
||||
reason = http.status_codes.RESPONSES[status_code]
|
||||
return http.Response.make(
|
||||
status_code,
|
||||
(
|
||||
f"<html>"
|
||||
f"<head><title>{status_code} {reason}</title></head>"
|
||||
f"<body><h1>{status_code} {reason}</h1></body>"
|
||||
f"</html>"
|
||||
),
|
||||
headers
|
||||
)
|
||||
|
||||
@property
|
||||
def http_auth_header(self) -> str:
|
||||
if self.is_http_proxy:
|
||||
return "Proxy-Authorization"
|
||||
else:
|
||||
return "Authorization"
|
||||
|
||||
@property
|
||||
def is_http_proxy(self) -> bool:
|
||||
"""
|
||||
Returns:
|
||||
- True, if authentication is done as if mitmproxy is a proxy
|
||||
- False, if authentication is done as if mitmproxy is an HTTP server
|
||||
"""
|
||||
return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
|
||||
|
||||
|
||||
def mkauth(username: str, password: str, scheme: str = "basic") -> str:
|
||||
"""
|
||||
Craft a basic auth string
|
||||
@ -40,177 +165,78 @@ def parse_http_basic_auth(s: str) -> Tuple[str, str, str]:
|
||||
return scheme, user, password
|
||||
|
||||
|
||||
class ProxyAuth:
|
||||
def __init__(self):
|
||||
self.nonanonymous = False
|
||||
self.htpasswd = None
|
||||
self.singleuser = None
|
||||
self.ldapconn = None
|
||||
self.ldapserver = None
|
||||
self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
class Validator(ABC):
|
||||
"""Base class for all username/password validators."""
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"proxyauth", Optional[str], None,
|
||||
"""
|
||||
Require proxy authentication. Format:
|
||||
"username:pass",
|
||||
"any" to accept any user/pass combination,
|
||||
"@path" to use an Apache htpasswd file,
|
||||
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
|
||||
"""
|
||||
)
|
||||
@abstractmethod
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def enabled(self) -> bool:
|
||||
return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver])
|
||||
|
||||
def is_proxy_auth(self) -> bool:
|
||||
"""
|
||||
Returns:
|
||||
- True, if authentication is done as if mitmproxy is a proxy
|
||||
- False, if authentication is done as if mitmproxy is a HTTP server
|
||||
"""
|
||||
return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
|
||||
class AcceptAll(Validator):
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return True
|
||||
|
||||
def which_auth_header(self) -> str:
|
||||
if self.is_proxy_auth():
|
||||
return 'Proxy-Authorization'
|
||||
else:
|
||||
return 'Authorization'
|
||||
|
||||
def auth_required_response(self) -> http.Response:
|
||||
if self.is_proxy_auth():
|
||||
status_code = status_codes.PROXY_AUTH_REQUIRED
|
||||
headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
|
||||
else:
|
||||
status_code = status_codes.UNAUTHORIZED
|
||||
headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
|
||||
|
||||
reason = http.status_codes.RESPONSES[status_code]
|
||||
return http.Response.make(
|
||||
status_code,
|
||||
(
|
||||
f"<html>"
|
||||
f"<head><title>{status_code} {reason}</title></head>"
|
||||
f"<body><h1>{status_code} {reason}</h1></body>"
|
||||
f"</html>"
|
||||
),
|
||||
headers
|
||||
)
|
||||
|
||||
def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]:
|
||||
"""
|
||||
Check if a request is correctly authenticated.
|
||||
Returns:
|
||||
- a (username, password) tuple if successful,
|
||||
- None, otherwise.
|
||||
"""
|
||||
auth_value = f.request.headers.get(self.which_auth_header(), "")
|
||||
class SingleUser(Validator):
|
||||
def __init__(self, proxyauth: str):
|
||||
try:
|
||||
scheme, username, password = parse_http_basic_auth(auth_value)
|
||||
self.username, self.password = proxyauth.split(':')
|
||||
except ValueError:
|
||||
return None
|
||||
raise exceptions.OptionsError("Invalid single-user auth specification.")
|
||||
|
||||
if self.nonanonymous:
|
||||
return username, password
|
||||
elif self.singleuser:
|
||||
if self.singleuser == [username, password]:
|
||||
return username, password
|
||||
elif self.htpasswd:
|
||||
if self.htpasswd.check_password(username, password):
|
||||
return username, password
|
||||
elif self.ldapconn:
|
||||
if not username or not password:
|
||||
return None
|
||||
self.ldapconn.search(ctx.options.proxyauth.split(':')[4], '(cn=' + username + ')')
|
||||
if self.ldapconn.response:
|
||||
conn = ldap3.Connection(
|
||||
self.ldapserver,
|
||||
self.ldapconn.response[0]['dn'],
|
||||
password,
|
||||
auto_bind=True)
|
||||
if conn:
|
||||
return username, password
|
||||
return None
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return self.username == username and self.password == password
|
||||
|
||||
def authenticate(self, f: http.HTTPFlow) -> bool:
|
||||
valid_credentials = self.check(f)
|
||||
if valid_credentials:
|
||||
f.metadata["proxyauth"] = valid_credentials
|
||||
del f.request.headers[self.which_auth_header()]
|
||||
return True
|
||||
|
||||
class Htpasswd(Validator):
|
||||
def __init__(self, proxyauth: str):
|
||||
path = proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(path)
|
||||
except (ValueError, OSError):
|
||||
raise exceptions.OptionsError(f"Could not open htpasswd file: {path}")
|
||||
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return self.htpasswd.check_password(username, password)
|
||||
|
||||
|
||||
class Ldap(Validator):
|
||||
conn: ldap3.Connection
|
||||
server: ldap3.Server
|
||||
dn_subtree: str
|
||||
|
||||
def __init__(self, proxyauth: str):
|
||||
try:
|
||||
security, url, ldap_user, ldap_pass, self.dn_subtree = proxyauth.split(":")
|
||||
except ValueError:
|
||||
raise exceptions.OptionsError("Invalid ldap specification")
|
||||
if security == "ldaps":
|
||||
server = ldap3.Server(url, use_ssl=True)
|
||||
elif security == "ldap":
|
||||
server = ldap3.Server(url)
|
||||
else:
|
||||
f.response = self.auth_required_response()
|
||||
raise exceptions.OptionsError("Invalid ldap specification on the first part")
|
||||
conn = ldap3.Connection(
|
||||
server,
|
||||
ldap_user,
|
||||
ldap_pass,
|
||||
auto_bind=True
|
||||
)
|
||||
self.conn = conn
|
||||
self.server = server
|
||||
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
if not username or not password:
|
||||
return False
|
||||
|
||||
# Handlers
|
||||
def configure(self, updated):
|
||||
if "proxyauth" in updated:
|
||||
self.nonanonymous = False
|
||||
self.singleuser = None
|
||||
self.htpasswd = None
|
||||
self.ldapserver = None
|
||||
if ctx.options.proxyauth:
|
||||
if ctx.options.proxyauth == "any":
|
||||
self.nonanonymous = True
|
||||
elif ctx.options.proxyauth.startswith("@"):
|
||||
p = ctx.options.proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(p)
|
||||
except (ValueError, OSError):
|
||||
raise exceptions.OptionsError(
|
||||
"Could not open htpasswd file: %s" % p
|
||||
)
|
||||
elif ctx.options.proxyauth.startswith("ldap"):
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 5:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specification"
|
||||
)
|
||||
security = parts[0]
|
||||
ldap_server = parts[1]
|
||||
dn_baseauth = parts[2]
|
||||
password_baseauth = parts[3]
|
||||
if security == "ldaps":
|
||||
server = ldap3.Server(ldap_server, use_ssl=True)
|
||||
elif security == "ldap":
|
||||
server = ldap3.Server(ldap_server)
|
||||
else:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specification on the first part"
|
||||
)
|
||||
conn = ldap3.Connection(
|
||||
server,
|
||||
dn_baseauth,
|
||||
password_baseauth,
|
||||
auto_bind=True)
|
||||
self.ldapconn = conn
|
||||
self.ldapserver = server
|
||||
elif ":" in ctx.options.proxyauth:
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user auth specification."
|
||||
)
|
||||
self.singleuser = parts
|
||||
else:
|
||||
raise exceptions.OptionsError("Invalid proxyauth specification.")
|
||||
if self.enabled():
|
||||
if ctx.options.mode == "transparent":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in transparent mode."
|
||||
)
|
||||
|
||||
def http_connect(self, f: http.HTTPFlow) -> None:
|
||||
if self.enabled():
|
||||
if self.authenticate(f):
|
||||
self.authenticated[f.client_conn] = f.metadata["proxyauth"]
|
||||
|
||||
def requestheaders(self, f: http.HTTPFlow) -> None:
|
||||
if self.enabled():
|
||||
# Is this connection authenticated by a previous HTTP CONNECT?
|
||||
if f.client_conn in self.authenticated:
|
||||
f.metadata["proxyauth"] = self.authenticated[f.client_conn]
|
||||
return
|
||||
self.authenticate(f)
|
||||
self.conn.search(self.dn_subtree, f"(cn={username})")
|
||||
if self.conn.response:
|
||||
c = ldap3.Connection(
|
||||
self.server,
|
||||
self.conn.response[0]["dn"],
|
||||
password,
|
||||
auto_bind=True
|
||||
)
|
||||
if c:
|
||||
return True
|
||||
return False
|
||||
|
@ -6,6 +6,7 @@ import pytest
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy.proxy.layers import modes
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
|
||||
@ -47,89 +48,42 @@ class TestProxyAuth:
|
||||
('upstream:', True),
|
||||
('upstream:foobar', True),
|
||||
])
|
||||
def test_is_proxy_auth(self, mode, expected):
|
||||
def test_is_http_proxy(self, mode, expected):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context(up, loadcore=False) as ctx:
|
||||
ctx.options.mode = mode
|
||||
assert up.is_proxy_auth() is expected
|
||||
assert up.is_http_proxy is expected
|
||||
|
||||
@pytest.mark.parametrize('is_proxy_auth, expected', [
|
||||
@pytest.mark.parametrize('is_http_proxy, expected', [
|
||||
(True, 'Proxy-Authorization'),
|
||||
(False, 'Authorization'),
|
||||
])
|
||||
def test_which_auth_header(self, is_proxy_auth, expected):
|
||||
def test_which_auth_header(self, is_http_proxy, expected):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||
assert up.which_auth_header() == expected
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
|
||||
assert up.http_auth_header == expected
|
||||
|
||||
@pytest.mark.parametrize('is_proxy_auth, expected_status_code, expected_header', [
|
||||
@pytest.mark.parametrize('is_http_proxy, expected_status_code, expected_header', [
|
||||
(True, 407, 'Proxy-Authenticate'),
|
||||
(False, 401, 'WWW-Authenticate'),
|
||||
])
|
||||
def test_auth_required_response(self, is_proxy_auth, expected_status_code, expected_header):
|
||||
def test_auth_required_response(self, is_http_proxy, expected_status_code, expected_header):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||
resp = up.auth_required_response()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
|
||||
resp = up.make_auth_required_response()
|
||||
assert resp.status_code == expected_status_code
|
||||
assert expected_header in resp.headers.keys()
|
||||
|
||||
def test_check(self, tdata):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context(up) as ctx:
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
f = tflow.tflow()
|
||||
assert not up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = "invalid"
|
||||
assert not up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test", scheme="unknown"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(up, proxyauth="test:test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
ctx.configure(up, proxyauth="test:foo")
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth="@" + tdata.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "foo"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
||||
with mock.patch('ldap3.Connection', search="test"):
|
||||
with mock.patch('ldap3.Connection.search', return_value="test"):
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"", ""
|
||||
)
|
||||
assert not up.check(f)
|
||||
def test_socks5(self):
|
||||
pa = proxyauth.ProxyAuth()
|
||||
with taddons.context(pa, loadcore=False) as ctx:
|
||||
ctx.configure(pa, proxyauth="foo:bar", mode="regular")
|
||||
data = modes.Socks5AuthData("foo", "baz")
|
||||
pa.socks5_auth(data)
|
||||
assert not data.valid
|
||||
data.password = "bar"
|
||||
pa.socks5_auth(data)
|
||||
assert data.valid
|
||||
|
||||
def test_authenticate(self):
|
||||
up = proxyauth.ProxyAuth()
|
||||
@ -138,64 +92,60 @@ class TestProxyAuth:
|
||||
|
||||
f = tflow.tflow()
|
||||
assert not f.response
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert f.response.status_code == 407
|
||||
|
||||
f = tflow.tflow()
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Proxy-Authorization")
|
||||
|
||||
f = tflow.tflow()
|
||||
ctx.configure(up, mode="reverse")
|
||||
assert not f.response
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert f.response.status_code == 401
|
||||
|
||||
f = tflow.tflow()
|
||||
f.request.headers["Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Authorization")
|
||||
|
||||
def test_configure(self, monkeypatch, tdata):
|
||||
monkeypatch.setattr(ldap3, "Server", lambda *_, **__: True)
|
||||
monkeypatch.setattr(ldap3, "Connection", lambda *_, **__: True)
|
||||
monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
|
||||
monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
|
||||
|
||||
pa = proxyauth.ProxyAuth()
|
||||
with taddons.context(pa) as ctx:
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid proxyauth specification"):
|
||||
ctx.configure(pa, proxyauth="foo")
|
||||
|
||||
ctx.configure(pa, proxyauth="foo:bar")
|
||||
assert isinstance(pa.validator, proxyauth.SingleUser)
|
||||
assert pa.validator("foo", "bar")
|
||||
assert not pa.validator("foo", "baz")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid single-user auth specification."):
|
||||
ctx.configure(pa, proxyauth="foo:bar:baz")
|
||||
|
||||
ctx.configure(pa, proxyauth="foo:bar")
|
||||
assert pa.singleuser == ["foo", "bar"]
|
||||
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert pa.singleuser is None
|
||||
|
||||
ctx.configure(pa, proxyauth="any")
|
||||
assert pa.nonanonymous
|
||||
assert isinstance(pa.validator, proxyauth.AcceptAll)
|
||||
assert pa.validator("foo", "bar")
|
||||
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert not pa.nonanonymous
|
||||
assert pa.validator is None
|
||||
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
assert pa.ldapserver
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
assert pa.ldapserver
|
||||
assert isinstance(pa.validator, proxyauth.Ldap)
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid ldap specification"):
|
||||
ctx.configure(pa, proxyauth="ldap:test:test:test")
|
||||
@ -207,30 +157,18 @@ class TestProxyAuth:
|
||||
ctx.configure(pa, proxyauth="ldapssssssss:fake_server:dn:password:tree")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt")
|
||||
)
|
||||
ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt"))
|
||||
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
|
||||
ctx.configure(pa, proxyauth="@nonexistent")
|
||||
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="@" + tdata.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
assert pa.htpasswd
|
||||
assert pa.htpasswd.check_password("test", "test")
|
||||
assert not pa.htpasswd.check_password("test", "foo")
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert not pa.htpasswd
|
||||
ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/htpasswd"))
|
||||
assert isinstance(pa.validator, proxyauth.Htpasswd)
|
||||
assert pa.validator("test", "test")
|
||||
assert not pa.validator("test", "foo")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError,
|
||||
match="Proxy Authentication not supported in transparent mode."):
|
||||
ctx.configure(pa, proxyauth="any", mode="transparent")
|
||||
with pytest.raises(exceptions.OptionsError, match="Proxy Authentication not supported in SOCKS mode."):
|
||||
ctx.configure(pa, proxyauth="any", mode="socks5")
|
||||
|
||||
def test_handlers(self):
|
||||
up = proxyauth.ProxyAuth()
|
||||
@ -260,3 +198,14 @@ class TestProxyAuth:
|
||||
up.requestheaders(f2)
|
||||
assert not f2.response
|
||||
assert f2.metadata["proxyauth"] == ('test', 'test')
|
||||
|
||||
|
||||
def test_ldap(monkeypatch):
|
||||
monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
|
||||
monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
|
||||
|
||||
validator = proxyauth.Ldap("ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
||||
assert not validator("", "")
|
||||
assert validator("foo", "bar")
|
||||
validator.conn.response = False
|
||||
assert not validator("foo", "bar")
|
||||
|
Loading…
Reference in New Issue
Block a user