diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py
index 31005f20b..8dd6a1786 100644
--- a/mitmproxy/addons/proxyauth.py
+++ b/mitmproxy/addons/proxyauth.py
@@ -1,5 +1,8 @@
+from __future__ import annotations
+
import binascii
import weakref
+from abc import ABC, abstractmethod
from typing import MutableMapping
from typing import Optional
from typing import Tuple
@@ -7,14 +10,136 @@ from typing import Tuple
import ldap3
import passlib.apache
-from mitmproxy import ctx, connection
+from mitmproxy import connection, ctx
from mitmproxy import exceptions
from mitmproxy import http
from mitmproxy.net.http import status_codes
+from mitmproxy.proxy.layers import modes
REALM = "mitmproxy"
+class ProxyAuth:
+ validator: Optional[Validator] = None
+
+ def __init__(self):
+ self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
+ """Contains all connections that are permanently authenticated after an HTTP CONNECT"""
+
+ def load(self, loader):
+ loader.add_option(
+ "proxyauth", Optional[str], None,
+ """
+ Require proxy authentication. Format:
+ "username:pass",
+ "any" to accept any user/pass combination,
+ "@path" to use an Apache htpasswd file,
+ or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
+ """
+ )
+
+ def configure(self, updated):
+ if "proxyauth" not in updated:
+ return
+ auth = ctx.options.proxyauth
+ if auth:
+ if ctx.options.mode == "transparent":
+ raise exceptions.OptionsError("Proxy Authentication not supported in transparent mode.")
+
+ if auth == "any":
+ self.validator = AcceptAll()
+ elif auth.startswith("@"):
+ self.validator = Htpasswd(auth)
+ elif ctx.options.proxyauth.startswith("ldap"):
+ self.validator = Ldap(auth)
+ elif ":" in ctx.options.proxyauth:
+ self.validator = SingleUser(auth)
+ else:
+ raise exceptions.OptionsError("Invalid proxyauth specification.")
+ else:
+ self.validator = None
+
+ def socks5_auth(self, data: modes.Socks5AuthData) -> None:
+ if self.validator and self.validator(data.username, data.password):
+ data.valid = True
+
+ def http_connect(self, f: http.HTTPFlow) -> None:
+ if self.validator:
+ if self.authenticate_http(f):
+ # Make a note that all further requests over this connection are ok.
+ self.authenticated[f.client_conn] = f.metadata["proxyauth"]
+
+ def requestheaders(self, f: http.HTTPFlow) -> None:
+ if self.validator:
+ # Is this connection authenticated by a previous HTTP CONNECT?
+ if f.client_conn in self.authenticated:
+ f.metadata["proxyauth"] = self.authenticated[f.client_conn]
+ else:
+ self.authenticate_http(f)
+
+ def authenticate_http(self, f: http.HTTPFlow) -> bool:
+ """
+ Authenticate an HTTP request, returns if authentication was successful.
+
+ If valid credentials are found, the matching authentication header is removed.
+ In no or invalid credentials are found, flow.response is set to an error page.
+ """
+ assert self.validator
+ username = None
+ password = None
+ is_valid = False
+ try:
+ auth_value = f.request.headers.get(self.http_auth_header, "")
+ scheme, username, password = parse_http_basic_auth(auth_value)
+ is_valid = self.validator(username, password)
+ except Exception:
+ pass
+
+ if is_valid:
+ f.metadata["proxyauth"] = (username, password)
+ del f.request.headers[self.http_auth_header]
+ return True
+ else:
+ f.response = self.make_auth_required_response()
+ return False
+
+ def make_auth_required_response(self) -> http.Response:
+ if self.is_http_proxy:
+ status_code = status_codes.PROXY_AUTH_REQUIRED
+ headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
+ else:
+ status_code = status_codes.UNAUTHORIZED
+ headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
+
+ reason = http.status_codes.RESPONSES[status_code]
+ return http.Response.make(
+ status_code,
+ (
+ f""
+ f"
{status_code} {reason}"
+ f"{status_code} {reason}
"
+ f""
+ ),
+ headers
+ )
+
+ @property
+ def http_auth_header(self) -> str:
+ if self.is_http_proxy:
+ return "Proxy-Authorization"
+ else:
+ return "Authorization"
+
+ @property
+ def is_http_proxy(self) -> bool:
+ """
+ Returns:
+ - True, if authentication is done as if mitmproxy is a proxy
+ - False, if authentication is done as if mitmproxy is an HTTP server
+ """
+ return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
+
+
def mkauth(username: str, password: str, scheme: str = "basic") -> str:
"""
Craft a basic auth string
@@ -40,177 +165,78 @@ def parse_http_basic_auth(s: str) -> Tuple[str, str, str]:
return scheme, user, password
-class ProxyAuth:
- def __init__(self):
- self.nonanonymous = False
- self.htpasswd = None
- self.singleuser = None
- self.ldapconn = None
- self.ldapserver = None
- self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
- """Contains all connections that are permanently authenticated after an HTTP CONNECT"""
+class Validator(ABC):
+ """Base class for all username/password validators."""
- def load(self, loader):
- loader.add_option(
- "proxyauth", Optional[str], None,
- """
- Require proxy authentication. Format:
- "username:pass",
- "any" to accept any user/pass combination,
- "@path" to use an Apache htpasswd file,
- or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
- """
- )
+ @abstractmethod
+ def __call__(self, username: str, password: str) -> bool:
+ raise NotImplementedError
- def enabled(self) -> bool:
- return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver])
- def is_proxy_auth(self) -> bool:
- """
- Returns:
- - True, if authentication is done as if mitmproxy is a proxy
- - False, if authentication is done as if mitmproxy is a HTTP server
- """
- return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
+class AcceptAll(Validator):
+ def __call__(self, username: str, password: str) -> bool:
+ return True
- def which_auth_header(self) -> str:
- if self.is_proxy_auth():
- return 'Proxy-Authorization'
- else:
- return 'Authorization'
- def auth_required_response(self) -> http.Response:
- if self.is_proxy_auth():
- status_code = status_codes.PROXY_AUTH_REQUIRED
- headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
- else:
- status_code = status_codes.UNAUTHORIZED
- headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
-
- reason = http.status_codes.RESPONSES[status_code]
- return http.Response.make(
- status_code,
- (
- f""
- f"{status_code} {reason}"
- f"{status_code} {reason}
"
- f""
- ),
- headers
- )
-
- def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]:
- """
- Check if a request is correctly authenticated.
- Returns:
- - a (username, password) tuple if successful,
- - None, otherwise.
- """
- auth_value = f.request.headers.get(self.which_auth_header(), "")
+class SingleUser(Validator):
+ def __init__(self, proxyauth: str):
try:
- scheme, username, password = parse_http_basic_auth(auth_value)
+ self.username, self.password = proxyauth.split(':')
except ValueError:
- return None
+ raise exceptions.OptionsError("Invalid single-user auth specification.")
- if self.nonanonymous:
- return username, password
- elif self.singleuser:
- if self.singleuser == [username, password]:
- return username, password
- elif self.htpasswd:
- if self.htpasswd.check_password(username, password):
- return username, password
- elif self.ldapconn:
- if not username or not password:
- return None
- self.ldapconn.search(ctx.options.proxyauth.split(':')[4], '(cn=' + username + ')')
- if self.ldapconn.response:
- conn = ldap3.Connection(
- self.ldapserver,
- self.ldapconn.response[0]['dn'],
- password,
- auto_bind=True)
- if conn:
- return username, password
- return None
+ def __call__(self, username: str, password: str) -> bool:
+ return self.username == username and self.password == password
- def authenticate(self, f: http.HTTPFlow) -> bool:
- valid_credentials = self.check(f)
- if valid_credentials:
- f.metadata["proxyauth"] = valid_credentials
- del f.request.headers[self.which_auth_header()]
- return True
+
+class Htpasswd(Validator):
+ def __init__(self, proxyauth: str):
+ path = proxyauth[1:]
+ try:
+ self.htpasswd = passlib.apache.HtpasswdFile(path)
+ except (ValueError, OSError):
+ raise exceptions.OptionsError(f"Could not open htpasswd file: {path}")
+
+ def __call__(self, username: str, password: str) -> bool:
+ return self.htpasswd.check_password(username, password)
+
+
+class Ldap(Validator):
+ conn: ldap3.Connection
+ server: ldap3.Server
+ dn_subtree: str
+
+ def __init__(self, proxyauth: str):
+ try:
+ security, url, ldap_user, ldap_pass, self.dn_subtree = proxyauth.split(":")
+ except ValueError:
+ raise exceptions.OptionsError("Invalid ldap specification")
+ if security == "ldaps":
+ server = ldap3.Server(url, use_ssl=True)
+ elif security == "ldap":
+ server = ldap3.Server(url)
else:
- f.response = self.auth_required_response()
+ raise exceptions.OptionsError("Invalid ldap specification on the first part")
+ conn = ldap3.Connection(
+ server,
+ ldap_user,
+ ldap_pass,
+ auto_bind=True
+ )
+ self.conn = conn
+ self.server = server
+
+ def __call__(self, username: str, password: str) -> bool:
+ if not username or not password:
return False
-
- # Handlers
- def configure(self, updated):
- if "proxyauth" in updated:
- self.nonanonymous = False
- self.singleuser = None
- self.htpasswd = None
- self.ldapserver = None
- if ctx.options.proxyauth:
- if ctx.options.proxyauth == "any":
- self.nonanonymous = True
- elif ctx.options.proxyauth.startswith("@"):
- p = ctx.options.proxyauth[1:]
- try:
- self.htpasswd = passlib.apache.HtpasswdFile(p)
- except (ValueError, OSError):
- raise exceptions.OptionsError(
- "Could not open htpasswd file: %s" % p
- )
- elif ctx.options.proxyauth.startswith("ldap"):
- parts = ctx.options.proxyauth.split(':')
- if len(parts) != 5:
- raise exceptions.OptionsError(
- "Invalid ldap specification"
- )
- security = parts[0]
- ldap_server = parts[1]
- dn_baseauth = parts[2]
- password_baseauth = parts[3]
- if security == "ldaps":
- server = ldap3.Server(ldap_server, use_ssl=True)
- elif security == "ldap":
- server = ldap3.Server(ldap_server)
- else:
- raise exceptions.OptionsError(
- "Invalid ldap specification on the first part"
- )
- conn = ldap3.Connection(
- server,
- dn_baseauth,
- password_baseauth,
- auto_bind=True)
- self.ldapconn = conn
- self.ldapserver = server
- elif ":" in ctx.options.proxyauth:
- parts = ctx.options.proxyauth.split(':')
- if len(parts) != 2:
- raise exceptions.OptionsError(
- "Invalid single-user auth specification."
- )
- self.singleuser = parts
- else:
- raise exceptions.OptionsError("Invalid proxyauth specification.")
- if self.enabled():
- if ctx.options.mode == "transparent":
- raise exceptions.OptionsError(
- "Proxy Authentication not supported in transparent mode."
- )
-
- def http_connect(self, f: http.HTTPFlow) -> None:
- if self.enabled():
- if self.authenticate(f):
- self.authenticated[f.client_conn] = f.metadata["proxyauth"]
-
- def requestheaders(self, f: http.HTTPFlow) -> None:
- if self.enabled():
- # Is this connection authenticated by a previous HTTP CONNECT?
- if f.client_conn in self.authenticated:
- f.metadata["proxyauth"] = self.authenticated[f.client_conn]
- return
- self.authenticate(f)
+ self.conn.search(self.dn_subtree, f"(cn={username})")
+ if self.conn.response:
+ c = ldap3.Connection(
+ self.server,
+ self.conn.response[0]["dn"],
+ password,
+ auto_bind=True
+ )
+ if c:
+ return True
+ return False
diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py
index 2e6cde4d0..eaa489509 100644
--- a/test/mitmproxy/addons/test_proxyauth.py
+++ b/test/mitmproxy/addons/test_proxyauth.py
@@ -6,6 +6,7 @@ import pytest
from mitmproxy import exceptions
from mitmproxy.addons import proxyauth
+from mitmproxy.proxy.layers import modes
from mitmproxy.test import taddons
from mitmproxy.test import tflow
@@ -47,89 +48,42 @@ class TestProxyAuth:
('upstream:', True),
('upstream:foobar', True),
])
- def test_is_proxy_auth(self, mode, expected):
+ def test_is_http_proxy(self, mode, expected):
up = proxyauth.ProxyAuth()
with taddons.context(up, loadcore=False) as ctx:
ctx.options.mode = mode
- assert up.is_proxy_auth() is expected
+ assert up.is_http_proxy is expected
- @pytest.mark.parametrize('is_proxy_auth, expected', [
+ @pytest.mark.parametrize('is_http_proxy, expected', [
(True, 'Proxy-Authorization'),
(False, 'Authorization'),
])
- def test_which_auth_header(self, is_proxy_auth, expected):
+ def test_which_auth_header(self, is_http_proxy, expected):
up = proxyauth.ProxyAuth()
- with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
- assert up.which_auth_header() == expected
+ with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
+ assert up.http_auth_header == expected
- @pytest.mark.parametrize('is_proxy_auth, expected_status_code, expected_header', [
+ @pytest.mark.parametrize('is_http_proxy, expected_status_code, expected_header', [
(True, 407, 'Proxy-Authenticate'),
(False, 401, 'WWW-Authenticate'),
])
- def test_auth_required_response(self, is_proxy_auth, expected_status_code, expected_header):
+ def test_auth_required_response(self, is_http_proxy, expected_status_code, expected_header):
up = proxyauth.ProxyAuth()
- with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
- resp = up.auth_required_response()
+ with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
+ resp = up.make_auth_required_response()
assert resp.status_code == expected_status_code
assert expected_header in resp.headers.keys()
- def test_check(self, tdata):
- up = proxyauth.ProxyAuth()
- with taddons.context(up) as ctx:
- ctx.configure(up, proxyauth="any", mode="regular")
- f = tflow.tflow()
- assert not up.check(f)
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "test"
- )
- assert up.check(f)
-
- f.request.headers["Proxy-Authorization"] = "invalid"
- assert not up.check(f)
-
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "test", scheme="unknown"
- )
- assert not up.check(f)
-
- ctx.configure(up, proxyauth="test:test")
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "test"
- )
- assert up.check(f)
- ctx.configure(up, proxyauth="test:foo")
- assert not up.check(f)
-
- ctx.configure(
- up,
- proxyauth="@" + tdata.path(
- "mitmproxy/net/data/htpasswd"
- )
- )
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "test"
- )
- assert up.check(f)
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "foo"
- )
- assert not up.check(f)
-
- with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
- with mock.patch('ldap3.Connection', search="test"):
- with mock.patch('ldap3.Connection.search', return_value="test"):
- ctx.configure(
- up,
- proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
- )
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "test", "test"
- )
- assert up.check(f)
- f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
- "", ""
- )
- assert not up.check(f)
+ def test_socks5(self):
+ pa = proxyauth.ProxyAuth()
+ with taddons.context(pa, loadcore=False) as ctx:
+ ctx.configure(pa, proxyauth="foo:bar", mode="regular")
+ data = modes.Socks5AuthData("foo", "baz")
+ pa.socks5_auth(data)
+ assert not data.valid
+ data.password = "bar"
+ pa.socks5_auth(data)
+ assert data.valid
def test_authenticate(self):
up = proxyauth.ProxyAuth()
@@ -138,64 +92,60 @@ class TestProxyAuth:
f = tflow.tflow()
assert not f.response
- up.authenticate(f)
+ up.authenticate_http(f)
assert f.response.status_code == 407
f = tflow.tflow()
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
"test", "test"
)
- up.authenticate(f)
+ up.authenticate_http(f)
assert not f.response
assert not f.request.headers.get("Proxy-Authorization")
f = tflow.tflow()
ctx.configure(up, mode="reverse")
assert not f.response
- up.authenticate(f)
+ up.authenticate_http(f)
assert f.response.status_code == 401
f = tflow.tflow()
f.request.headers["Authorization"] = proxyauth.mkauth(
"test", "test"
)
- up.authenticate(f)
+ up.authenticate_http(f)
assert not f.response
assert not f.request.headers.get("Authorization")
def test_configure(self, monkeypatch, tdata):
- monkeypatch.setattr(ldap3, "Server", lambda *_, **__: True)
- monkeypatch.setattr(ldap3, "Connection", lambda *_, **__: True)
+ monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
+ monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
pa = proxyauth.ProxyAuth()
with taddons.context(pa) as ctx:
with pytest.raises(exceptions.OptionsError, match="Invalid proxyauth specification"):
ctx.configure(pa, proxyauth="foo")
+ ctx.configure(pa, proxyauth="foo:bar")
+ assert isinstance(pa.validator, proxyauth.SingleUser)
+ assert pa.validator("foo", "bar")
+ assert not pa.validator("foo", "baz")
+
with pytest.raises(exceptions.OptionsError, match="Invalid single-user auth specification."):
ctx.configure(pa, proxyauth="foo:bar:baz")
- ctx.configure(pa, proxyauth="foo:bar")
- assert pa.singleuser == ["foo", "bar"]
-
- ctx.configure(pa, proxyauth=None)
- assert pa.singleuser is None
-
ctx.configure(pa, proxyauth="any")
- assert pa.nonanonymous
+ assert isinstance(pa.validator, proxyauth.AcceptAll)
+ assert pa.validator("foo", "bar")
+
ctx.configure(pa, proxyauth=None)
- assert not pa.nonanonymous
+ assert pa.validator is None
ctx.configure(
pa,
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
)
- assert pa.ldapserver
- ctx.configure(
- pa,
- proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
- )
- assert pa.ldapserver
+ assert isinstance(pa.validator, proxyauth.Ldap)
with pytest.raises(exceptions.OptionsError, match="Invalid ldap specification"):
ctx.configure(pa, proxyauth="ldap:test:test:test")
@@ -207,30 +157,18 @@ class TestProxyAuth:
ctx.configure(pa, proxyauth="ldapssssssss:fake_server:dn:password:tree")
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
- ctx.configure(
- pa,
- proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt")
- )
+ ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt"))
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
ctx.configure(pa, proxyauth="@nonexistent")
- ctx.configure(
- pa,
- proxyauth="@" + tdata.path(
- "mitmproxy/net/data/htpasswd"
- )
- )
- assert pa.htpasswd
- assert pa.htpasswd.check_password("test", "test")
- assert not pa.htpasswd.check_password("test", "foo")
- ctx.configure(pa, proxyauth=None)
- assert not pa.htpasswd
+ ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/htpasswd"))
+ assert isinstance(pa.validator, proxyauth.Htpasswd)
+ assert pa.validator("test", "test")
+ assert not pa.validator("test", "foo")
with pytest.raises(exceptions.OptionsError,
match="Proxy Authentication not supported in transparent mode."):
ctx.configure(pa, proxyauth="any", mode="transparent")
- with pytest.raises(exceptions.OptionsError, match="Proxy Authentication not supported in SOCKS mode."):
- ctx.configure(pa, proxyauth="any", mode="socks5")
def test_handlers(self):
up = proxyauth.ProxyAuth()
@@ -260,3 +198,14 @@ class TestProxyAuth:
up.requestheaders(f2)
assert not f2.response
assert f2.metadata["proxyauth"] == ('test', 'test')
+
+
+def test_ldap(monkeypatch):
+ monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
+ monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
+
+ validator = proxyauth.Ldap("ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
+ assert not validator("", "")
+ assert validator("foo", "bar")
+ validator.conn.response = False
+ assert not validator("foo", "bar")