From 73d809a4c729814de75c962fa385dead69c4a1ff Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 25 Aug 2021 17:18:31 +0200 Subject: [PATCH] refactor proxyauth addon the previous version was difficult to read, this is hopefully better now. --- mitmproxy/addons/proxyauth.py | 354 +++++++++++++----------- test/mitmproxy/addons/test_proxyauth.py | 157 ++++------- 2 files changed, 243 insertions(+), 268 deletions(-) diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py index 31005f20b..8dd6a1786 100644 --- a/mitmproxy/addons/proxyauth.py +++ b/mitmproxy/addons/proxyauth.py @@ -1,5 +1,8 @@ +from __future__ import annotations + import binascii import weakref +from abc import ABC, abstractmethod from typing import MutableMapping from typing import Optional from typing import Tuple @@ -7,14 +10,136 @@ from typing import Tuple import ldap3 import passlib.apache -from mitmproxy import ctx, connection +from mitmproxy import connection, ctx from mitmproxy import exceptions from mitmproxy import http from mitmproxy.net.http import status_codes +from mitmproxy.proxy.layers import modes REALM = "mitmproxy" +class ProxyAuth: + validator: Optional[Validator] = None + + def __init__(self): + self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary() + """Contains all connections that are permanently authenticated after an HTTP CONNECT""" + + def load(self, loader): + loader.add_option( + "proxyauth", Optional[str], None, + """ + Require proxy authentication. Format: + "username:pass", + "any" to accept any user/pass combination, + "@path" to use an Apache htpasswd file, + or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication. + """ + ) + + def configure(self, updated): + if "proxyauth" not in updated: + return + auth = ctx.options.proxyauth + if auth: + if ctx.options.mode == "transparent": + raise exceptions.OptionsError("Proxy Authentication not supported in transparent mode.") + + if auth == "any": + self.validator = AcceptAll() + elif auth.startswith("@"): + self.validator = Htpasswd(auth) + elif ctx.options.proxyauth.startswith("ldap"): + self.validator = Ldap(auth) + elif ":" in ctx.options.proxyauth: + self.validator = SingleUser(auth) + else: + raise exceptions.OptionsError("Invalid proxyauth specification.") + else: + self.validator = None + + def socks5_auth(self, data: modes.Socks5AuthData) -> None: + if self.validator and self.validator(data.username, data.password): + data.valid = True + + def http_connect(self, f: http.HTTPFlow) -> None: + if self.validator: + if self.authenticate_http(f): + # Make a note that all further requests over this connection are ok. + self.authenticated[f.client_conn] = f.metadata["proxyauth"] + + def requestheaders(self, f: http.HTTPFlow) -> None: + if self.validator: + # Is this connection authenticated by a previous HTTP CONNECT? + if f.client_conn in self.authenticated: + f.metadata["proxyauth"] = self.authenticated[f.client_conn] + else: + self.authenticate_http(f) + + def authenticate_http(self, f: http.HTTPFlow) -> bool: + """ + Authenticate an HTTP request, returns if authentication was successful. + + If valid credentials are found, the matching authentication header is removed. + In no or invalid credentials are found, flow.response is set to an error page. + """ + assert self.validator + username = None + password = None + is_valid = False + try: + auth_value = f.request.headers.get(self.http_auth_header, "") + scheme, username, password = parse_http_basic_auth(auth_value) + is_valid = self.validator(username, password) + except Exception: + pass + + if is_valid: + f.metadata["proxyauth"] = (username, password) + del f.request.headers[self.http_auth_header] + return True + else: + f.response = self.make_auth_required_response() + return False + + def make_auth_required_response(self) -> http.Response: + if self.is_http_proxy: + status_code = status_codes.PROXY_AUTH_REQUIRED + headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'} + else: + status_code = status_codes.UNAUTHORIZED + headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'} + + reason = http.status_codes.RESPONSES[status_code] + return http.Response.make( + status_code, + ( + f"" + f"{status_code} {reason}" + f"

{status_code} {reason}

" + f"" + ), + headers + ) + + @property + def http_auth_header(self) -> str: + if self.is_http_proxy: + return "Proxy-Authorization" + else: + return "Authorization" + + @property + def is_http_proxy(self) -> bool: + """ + Returns: + - True, if authentication is done as if mitmproxy is a proxy + - False, if authentication is done as if mitmproxy is an HTTP server + """ + return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:") + + def mkauth(username: str, password: str, scheme: str = "basic") -> str: """ Craft a basic auth string @@ -40,177 +165,78 @@ def parse_http_basic_auth(s: str) -> Tuple[str, str, str]: return scheme, user, password -class ProxyAuth: - def __init__(self): - self.nonanonymous = False - self.htpasswd = None - self.singleuser = None - self.ldapconn = None - self.ldapserver = None - self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary() - """Contains all connections that are permanently authenticated after an HTTP CONNECT""" +class Validator(ABC): + """Base class for all username/password validators.""" - def load(self, loader): - loader.add_option( - "proxyauth", Optional[str], None, - """ - Require proxy authentication. Format: - "username:pass", - "any" to accept any user/pass combination, - "@path" to use an Apache htpasswd file, - or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication. - """ - ) + @abstractmethod + def __call__(self, username: str, password: str) -> bool: + raise NotImplementedError - def enabled(self) -> bool: - return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver]) - def is_proxy_auth(self) -> bool: - """ - Returns: - - True, if authentication is done as if mitmproxy is a proxy - - False, if authentication is done as if mitmproxy is a HTTP server - """ - return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:") +class AcceptAll(Validator): + def __call__(self, username: str, password: str) -> bool: + return True - def which_auth_header(self) -> str: - if self.is_proxy_auth(): - return 'Proxy-Authorization' - else: - return 'Authorization' - def auth_required_response(self) -> http.Response: - if self.is_proxy_auth(): - status_code = status_codes.PROXY_AUTH_REQUIRED - headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'} - else: - status_code = status_codes.UNAUTHORIZED - headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'} - - reason = http.status_codes.RESPONSES[status_code] - return http.Response.make( - status_code, - ( - f"" - f"{status_code} {reason}" - f"

{status_code} {reason}

" - f"" - ), - headers - ) - - def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]: - """ - Check if a request is correctly authenticated. - Returns: - - a (username, password) tuple if successful, - - None, otherwise. - """ - auth_value = f.request.headers.get(self.which_auth_header(), "") +class SingleUser(Validator): + def __init__(self, proxyauth: str): try: - scheme, username, password = parse_http_basic_auth(auth_value) + self.username, self.password = proxyauth.split(':') except ValueError: - return None + raise exceptions.OptionsError("Invalid single-user auth specification.") - if self.nonanonymous: - return username, password - elif self.singleuser: - if self.singleuser == [username, password]: - return username, password - elif self.htpasswd: - if self.htpasswd.check_password(username, password): - return username, password - elif self.ldapconn: - if not username or not password: - return None - self.ldapconn.search(ctx.options.proxyauth.split(':')[4], '(cn=' + username + ')') - if self.ldapconn.response: - conn = ldap3.Connection( - self.ldapserver, - self.ldapconn.response[0]['dn'], - password, - auto_bind=True) - if conn: - return username, password - return None + def __call__(self, username: str, password: str) -> bool: + return self.username == username and self.password == password - def authenticate(self, f: http.HTTPFlow) -> bool: - valid_credentials = self.check(f) - if valid_credentials: - f.metadata["proxyauth"] = valid_credentials - del f.request.headers[self.which_auth_header()] - return True + +class Htpasswd(Validator): + def __init__(self, proxyauth: str): + path = proxyauth[1:] + try: + self.htpasswd = passlib.apache.HtpasswdFile(path) + except (ValueError, OSError): + raise exceptions.OptionsError(f"Could not open htpasswd file: {path}") + + def __call__(self, username: str, password: str) -> bool: + return self.htpasswd.check_password(username, password) + + +class Ldap(Validator): + conn: ldap3.Connection + server: ldap3.Server + dn_subtree: str + + def __init__(self, proxyauth: str): + try: + security, url, ldap_user, ldap_pass, self.dn_subtree = proxyauth.split(":") + except ValueError: + raise exceptions.OptionsError("Invalid ldap specification") + if security == "ldaps": + server = ldap3.Server(url, use_ssl=True) + elif security == "ldap": + server = ldap3.Server(url) else: - f.response = self.auth_required_response() + raise exceptions.OptionsError("Invalid ldap specification on the first part") + conn = ldap3.Connection( + server, + ldap_user, + ldap_pass, + auto_bind=True + ) + self.conn = conn + self.server = server + + def __call__(self, username: str, password: str) -> bool: + if not username or not password: return False - - # Handlers - def configure(self, updated): - if "proxyauth" in updated: - self.nonanonymous = False - self.singleuser = None - self.htpasswd = None - self.ldapserver = None - if ctx.options.proxyauth: - if ctx.options.proxyauth == "any": - self.nonanonymous = True - elif ctx.options.proxyauth.startswith("@"): - p = ctx.options.proxyauth[1:] - try: - self.htpasswd = passlib.apache.HtpasswdFile(p) - except (ValueError, OSError): - raise exceptions.OptionsError( - "Could not open htpasswd file: %s" % p - ) - elif ctx.options.proxyauth.startswith("ldap"): - parts = ctx.options.proxyauth.split(':') - if len(parts) != 5: - raise exceptions.OptionsError( - "Invalid ldap specification" - ) - security = parts[0] - ldap_server = parts[1] - dn_baseauth = parts[2] - password_baseauth = parts[3] - if security == "ldaps": - server = ldap3.Server(ldap_server, use_ssl=True) - elif security == "ldap": - server = ldap3.Server(ldap_server) - else: - raise exceptions.OptionsError( - "Invalid ldap specification on the first part" - ) - conn = ldap3.Connection( - server, - dn_baseauth, - password_baseauth, - auto_bind=True) - self.ldapconn = conn - self.ldapserver = server - elif ":" in ctx.options.proxyauth: - parts = ctx.options.proxyauth.split(':') - if len(parts) != 2: - raise exceptions.OptionsError( - "Invalid single-user auth specification." - ) - self.singleuser = parts - else: - raise exceptions.OptionsError("Invalid proxyauth specification.") - if self.enabled(): - if ctx.options.mode == "transparent": - raise exceptions.OptionsError( - "Proxy Authentication not supported in transparent mode." - ) - - def http_connect(self, f: http.HTTPFlow) -> None: - if self.enabled(): - if self.authenticate(f): - self.authenticated[f.client_conn] = f.metadata["proxyauth"] - - def requestheaders(self, f: http.HTTPFlow) -> None: - if self.enabled(): - # Is this connection authenticated by a previous HTTP CONNECT? - if f.client_conn in self.authenticated: - f.metadata["proxyauth"] = self.authenticated[f.client_conn] - return - self.authenticate(f) + self.conn.search(self.dn_subtree, f"(cn={username})") + if self.conn.response: + c = ldap3.Connection( + self.server, + self.conn.response[0]["dn"], + password, + auto_bind=True + ) + if c: + return True + return False diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py index 2e6cde4d0..eaa489509 100644 --- a/test/mitmproxy/addons/test_proxyauth.py +++ b/test/mitmproxy/addons/test_proxyauth.py @@ -6,6 +6,7 @@ import pytest from mitmproxy import exceptions from mitmproxy.addons import proxyauth +from mitmproxy.proxy.layers import modes from mitmproxy.test import taddons from mitmproxy.test import tflow @@ -47,89 +48,42 @@ class TestProxyAuth: ('upstream:', True), ('upstream:foobar', True), ]) - def test_is_proxy_auth(self, mode, expected): + def test_is_http_proxy(self, mode, expected): up = proxyauth.ProxyAuth() with taddons.context(up, loadcore=False) as ctx: ctx.options.mode = mode - assert up.is_proxy_auth() is expected + assert up.is_http_proxy is expected - @pytest.mark.parametrize('is_proxy_auth, expected', [ + @pytest.mark.parametrize('is_http_proxy, expected', [ (True, 'Proxy-Authorization'), (False, 'Authorization'), ]) - def test_which_auth_header(self, is_proxy_auth, expected): + def test_which_auth_header(self, is_http_proxy, expected): up = proxyauth.ProxyAuth() - with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth): - assert up.which_auth_header() == expected + with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy): + assert up.http_auth_header == expected - @pytest.mark.parametrize('is_proxy_auth, expected_status_code, expected_header', [ + @pytest.mark.parametrize('is_http_proxy, expected_status_code, expected_header', [ (True, 407, 'Proxy-Authenticate'), (False, 401, 'WWW-Authenticate'), ]) - def test_auth_required_response(self, is_proxy_auth, expected_status_code, expected_header): + def test_auth_required_response(self, is_http_proxy, expected_status_code, expected_header): up = proxyauth.ProxyAuth() - with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth): - resp = up.auth_required_response() + with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy): + resp = up.make_auth_required_response() assert resp.status_code == expected_status_code assert expected_header in resp.headers.keys() - def test_check(self, tdata): - up = proxyauth.ProxyAuth() - with taddons.context(up) as ctx: - ctx.configure(up, proxyauth="any", mode="regular") - f = tflow.tflow() - assert not up.check(f) - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test" - ) - assert up.check(f) - - f.request.headers["Proxy-Authorization"] = "invalid" - assert not up.check(f) - - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test", scheme="unknown" - ) - assert not up.check(f) - - ctx.configure(up, proxyauth="test:test") - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test" - ) - assert up.check(f) - ctx.configure(up, proxyauth="test:foo") - assert not up.check(f) - - ctx.configure( - up, - proxyauth="@" + tdata.path( - "mitmproxy/net/data/htpasswd" - ) - ) - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test" - ) - assert up.check(f) - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "foo" - ) - assert not up.check(f) - - with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"): - with mock.patch('ldap3.Connection', search="test"): - with mock.patch('ldap3.Connection.search', return_value="test"): - ctx.configure( - up, - proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com" - ) - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "test", "test" - ) - assert up.check(f) - f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( - "", "" - ) - assert not up.check(f) + def test_socks5(self): + pa = proxyauth.ProxyAuth() + with taddons.context(pa, loadcore=False) as ctx: + ctx.configure(pa, proxyauth="foo:bar", mode="regular") + data = modes.Socks5AuthData("foo", "baz") + pa.socks5_auth(data) + assert not data.valid + data.password = "bar" + pa.socks5_auth(data) + assert data.valid def test_authenticate(self): up = proxyauth.ProxyAuth() @@ -138,64 +92,60 @@ class TestProxyAuth: f = tflow.tflow() assert not f.response - up.authenticate(f) + up.authenticate_http(f) assert f.response.status_code == 407 f = tflow.tflow() f.request.headers["Proxy-Authorization"] = proxyauth.mkauth( "test", "test" ) - up.authenticate(f) + up.authenticate_http(f) assert not f.response assert not f.request.headers.get("Proxy-Authorization") f = tflow.tflow() ctx.configure(up, mode="reverse") assert not f.response - up.authenticate(f) + up.authenticate_http(f) assert f.response.status_code == 401 f = tflow.tflow() f.request.headers["Authorization"] = proxyauth.mkauth( "test", "test" ) - up.authenticate(f) + up.authenticate_http(f) assert not f.response assert not f.request.headers.get("Authorization") def test_configure(self, monkeypatch, tdata): - monkeypatch.setattr(ldap3, "Server", lambda *_, **__: True) - monkeypatch.setattr(ldap3, "Connection", lambda *_, **__: True) + monkeypatch.setattr(ldap3, "Server", mock.MagicMock()) + monkeypatch.setattr(ldap3, "Connection", mock.MagicMock()) pa = proxyauth.ProxyAuth() with taddons.context(pa) as ctx: with pytest.raises(exceptions.OptionsError, match="Invalid proxyauth specification"): ctx.configure(pa, proxyauth="foo") + ctx.configure(pa, proxyauth="foo:bar") + assert isinstance(pa.validator, proxyauth.SingleUser) + assert pa.validator("foo", "bar") + assert not pa.validator("foo", "baz") + with pytest.raises(exceptions.OptionsError, match="Invalid single-user auth specification."): ctx.configure(pa, proxyauth="foo:bar:baz") - ctx.configure(pa, proxyauth="foo:bar") - assert pa.singleuser == ["foo", "bar"] - - ctx.configure(pa, proxyauth=None) - assert pa.singleuser is None - ctx.configure(pa, proxyauth="any") - assert pa.nonanonymous + assert isinstance(pa.validator, proxyauth.AcceptAll) + assert pa.validator("foo", "bar") + ctx.configure(pa, proxyauth=None) - assert not pa.nonanonymous + assert pa.validator is None ctx.configure( pa, proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com" ) - assert pa.ldapserver - ctx.configure( - pa, - proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com" - ) - assert pa.ldapserver + assert isinstance(pa.validator, proxyauth.Ldap) with pytest.raises(exceptions.OptionsError, match="Invalid ldap specification"): ctx.configure(pa, proxyauth="ldap:test:test:test") @@ -207,30 +157,18 @@ class TestProxyAuth: ctx.configure(pa, proxyauth="ldapssssssss:fake_server:dn:password:tree") with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"): - ctx.configure( - pa, - proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt") - ) + ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt")) with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"): ctx.configure(pa, proxyauth="@nonexistent") - ctx.configure( - pa, - proxyauth="@" + tdata.path( - "mitmproxy/net/data/htpasswd" - ) - ) - assert pa.htpasswd - assert pa.htpasswd.check_password("test", "test") - assert not pa.htpasswd.check_password("test", "foo") - ctx.configure(pa, proxyauth=None) - assert not pa.htpasswd + ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/htpasswd")) + assert isinstance(pa.validator, proxyauth.Htpasswd) + assert pa.validator("test", "test") + assert not pa.validator("test", "foo") with pytest.raises(exceptions.OptionsError, match="Proxy Authentication not supported in transparent mode."): ctx.configure(pa, proxyauth="any", mode="transparent") - with pytest.raises(exceptions.OptionsError, match="Proxy Authentication not supported in SOCKS mode."): - ctx.configure(pa, proxyauth="any", mode="socks5") def test_handlers(self): up = proxyauth.ProxyAuth() @@ -260,3 +198,14 @@ class TestProxyAuth: up.requestheaders(f2) assert not f2.response assert f2.metadata["proxyauth"] == ('test', 'test') + + +def test_ldap(monkeypatch): + monkeypatch.setattr(ldap3, "Server", mock.MagicMock()) + monkeypatch.setattr(ldap3, "Connection", mock.MagicMock()) + + validator = proxyauth.Ldap("ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com") + assert not validator("", "") + assert validator("foo", "bar") + validator.conn.response = False + assert not validator("foo", "bar")