From dc88b7d1102e0bf2d0634fe22682ce4e66ebf772 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sun, 13 Nov 2016 18:14:23 +1300 Subject: [PATCH] addons.proxyauth: complete and test --- mitmproxy/addons/proxyauth.py | 99 ++++++++++++++++--- test/mitmproxy/addons/test_proxyauth.py | 122 +++++++++++++++++++++++- 2 files changed, 205 insertions(+), 16 deletions(-) diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py index fc68de71e..aeeb04f3c 100644 --- a/mitmproxy/addons/proxyauth.py +++ b/mitmproxy/addons/proxyauth.py @@ -3,6 +3,11 @@ import binascii import passlib.apache from mitmproxy import exceptions +from mitmproxy import http +import mitmproxy.net.http + + +REALM = "mitmproxy" def parse_http_basic_auth(s): @@ -20,19 +25,74 @@ def parse_http_basic_auth(s): return scheme, parts[0], parts[1] -def assemble_http_basic_auth(scheme, username, password): - v = binascii.b2a_base64( - (username + ":" + password).encode("utf8") - ).decode("ascii") - return scheme + " " + v - - class ProxyAuth: def __init__(self): self.nonanonymous = False self.htpasswd = None self.singleuser = None + def enabled(self): + return any([self.nonanonymous, self.htpasswd, self.singleuser]) + + def which_auth_header(self, f): + if f.mode == "regular": + return 'Proxy-Authorization' + else: + return 'Authorization' + + def auth_required_response(self, f): + if f.mode == "regular": + hdrname = 'Proxy-Authenticate' + else: + hdrname = 'WWW-Authenticate' + + headers = mitmproxy.net.http.Headers() + headers[hdrname] = 'Basic realm="%s"' % REALM + + if f.mode == "transparent": + return http.make_error_response( + 401, + "Authentication Required", + headers + ) + else: + return http.make_error_response( + 407, + "Proxy Authentication Required", + headers, + ) + + def check(self, f): + auth_value = f.request.headers.get(self.which_auth_header(f), None) + if not auth_value: + return False + parts = parse_http_basic_auth(auth_value) + if not parts: + return False + scheme, username, password = parts + if scheme.lower() != 'basic': + return False + + if self.nonanonymous: + pass + elif self.singleuser: + if [username, password] != self.singleuser: + return False + elif self.htpasswd: + if not self.htpasswd.check_password(username, password): + return False + else: + raise NotImplementedError("Should never happen.") + + return True + + def authenticate(self, f): + if self.check(f): + del f.request.headers[self.which_auth_header(f)] + else: + f.response = self.auth_required_response(f) + + # Handlers def configure(self, options, updated): if "auth_nonanonymous" in updated: self.nonanonymous = options.auth_nonanonymous @@ -57,12 +117,25 @@ class ProxyAuth: "Could not open htpasswd file: %s" % v ) else: - self.auth_htpasswd = None + self.htpasswd = None + if self.enabled(): + if options.mode == "transparent": + raise exceptions.OptionsError( + "Proxy Authentication not supported in transparent mode." + ) + elif options.mode == "socks5": + raise exceptions.OptionsError( + "Proxy Authentication not supported in SOCKS mode. " + "https://github.com/mitmproxy/mitmproxy/issues/738" + ) + # TODO: check for multiple auth options def http_connect(self, f): - # mode = regular - pass + if self.enabled() and f.mode == "regular": + self.authenticate(f) - def http_request(self, f): - # mode = regular, no via - pass + def requestheaders(self, f): + if self.enabled(): + # Are we already authenticated in CONNECT? + if not (f.mode == "regular" and f.server_conn.via): + self.authenticate(f) diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py index e9dcf7bfc..73d87cbfc 100644 --- a/test/mitmproxy/addons/test_proxyauth.py +++ b/test/mitmproxy/addons/test_proxyauth.py @@ -7,11 +7,17 @@ from mitmproxy.test import tutils from mitmproxy.addons import proxyauth +def mkauth(username, password, scheme="basic"): + v = binascii.b2a_base64( + (username + ":" + password).encode("utf8") + ).decode("ascii") + return scheme + " " + v + + def test_parse_http_basic_auth(): - vals = ("basic", "foo", "bar") assert proxyauth.parse_http_basic_auth( - proxyauth.assemble_http_basic_auth(*vals) - ) == vals + mkauth("test", "test") + ) == ("basic", "test", "test") assert not proxyauth.parse_http_basic_auth("") assert not proxyauth.parse_http_basic_auth("foo bar") v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") @@ -51,3 +57,113 @@ def test_configure(): up, auth_htpasswd = "nonexistent" ) + + ctx.configure( + up, + auth_htpasswd = tutils.test_data.path( + "mitmproxy/net/data/htpasswd" + ) + ) + assert up.htpasswd + assert up.htpasswd.check_password("test", "test") + assert not up.htpasswd.check_password("test", "foo") + ctx.configure(up, auth_htpasswd = None) + assert not up.htpasswd + + tutils.raises( + exceptions.OptionsError, + ctx.configure, + up, + auth_nonanonymous = True, + mode = "transparent" + ) + tutils.raises( + exceptions.OptionsError, + ctx.configure, + up, + auth_nonanonymous = True, + mode = "socks5" + ) + + +def test_check(): + up = proxyauth.ProxyAuth() + with taddons.context() as ctx: + ctx.configure(up, auth_nonanonymous=True) + f = tflow.tflow() + assert not up.check(f) + f.request.headers["Proxy-Authorization"] = mkauth("test", "test") + assert up.check(f) + + f.request.headers["Proxy-Authorization"] = "invalid" + assert not up.check(f) + + f.request.headers["Proxy-Authorization"] = mkauth( + "test", "test", scheme = "unknown" + ) + assert not up.check(f) + + ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:test") + f.request.headers["Proxy-Authorization"] = mkauth("test", "test") + assert up.check(f) + ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:foo") + assert not up.check(f) + + ctx.configure( + up, + auth_singleuser = None, + auth_htpasswd = tutils.test_data.path( + "mitmproxy/net/data/htpasswd" + ) + ) + f.request.headers["Proxy-Authorization"] = mkauth("test", "test") + assert up.check(f) + f.request.headers["Proxy-Authorization"] = mkauth("test", "foo") + assert not up.check(f) + + +def test_authenticate(): + up = proxyauth.ProxyAuth() + with taddons.context() as ctx: + ctx.configure(up, auth_nonanonymous=True) + + f = tflow.tflow() + assert not f.response + up.authenticate(f) + assert f.response.status_code == 407 + + f = tflow.tflow() + f.request.headers["Proxy-Authorization"] = mkauth("test", "test") + up.authenticate(f) + assert not f.response + assert not f.request.headers.get("Proxy-Authorization") + + f = tflow.tflow() + f.mode = "transparent" + assert not f.response + up.authenticate(f) + assert f.response.status_code == 401 + + f = tflow.tflow() + f.mode = "transparent" + f.request.headers["Authorization"] = mkauth("test", "test") + up.authenticate(f) + assert not f.response + assert not f.request.headers.get("Authorization") + + +def test_handlers(): + up = proxyauth.ProxyAuth() + with taddons.context() as ctx: + ctx.configure(up, auth_nonanonymous=True) + + f = tflow.tflow() + assert not f.response + up.requestheaders(f) + assert f.response.status_code == 407 + + f = tflow.tflow() + f.request.method = "CONNECT" + assert not f.response + up.http_connect(f) + assert f.response.status_code == 407