addons.proxyauth: complete and test

This commit is contained in:
Aldo Cortesi 2016-11-13 18:14:23 +13:00
parent e644d2167c
commit dc88b7d110
2 changed files with 205 additions and 16 deletions

View File

@ -3,6 +3,11 @@ import binascii
import passlib.apache
from mitmproxy import exceptions
from mitmproxy import http
import mitmproxy.net.http
REALM = "mitmproxy"
def parse_http_basic_auth(s):
@ -20,19 +25,74 @@ def parse_http_basic_auth(s):
return scheme, parts[0], parts[1]
def assemble_http_basic_auth(scheme, username, password):
v = binascii.b2a_base64(
(username + ":" + password).encode("utf8")
).decode("ascii")
return scheme + " " + v
class ProxyAuth:
def __init__(self):
self.nonanonymous = False
self.htpasswd = None
self.singleuser = None
def enabled(self):
return any([self.nonanonymous, self.htpasswd, self.singleuser])
def which_auth_header(self, f):
if f.mode == "regular":
return 'Proxy-Authorization'
else:
return 'Authorization'
def auth_required_response(self, f):
if f.mode == "regular":
hdrname = 'Proxy-Authenticate'
else:
hdrname = 'WWW-Authenticate'
headers = mitmproxy.net.http.Headers()
headers[hdrname] = 'Basic realm="%s"' % REALM
if f.mode == "transparent":
return http.make_error_response(
401,
"Authentication Required",
headers
)
else:
return http.make_error_response(
407,
"Proxy Authentication Required",
headers,
)
def check(self, f):
auth_value = f.request.headers.get(self.which_auth_header(f), None)
if not auth_value:
return False
parts = parse_http_basic_auth(auth_value)
if not parts:
return False
scheme, username, password = parts
if scheme.lower() != 'basic':
return False
if self.nonanonymous:
pass
elif self.singleuser:
if [username, password] != self.singleuser:
return False
elif self.htpasswd:
if not self.htpasswd.check_password(username, password):
return False
else:
raise NotImplementedError("Should never happen.")
return True
def authenticate(self, f):
if self.check(f):
del f.request.headers[self.which_auth_header(f)]
else:
f.response = self.auth_required_response(f)
# Handlers
def configure(self, options, updated):
if "auth_nonanonymous" in updated:
self.nonanonymous = options.auth_nonanonymous
@ -57,12 +117,25 @@ class ProxyAuth:
"Could not open htpasswd file: %s" % v
)
else:
self.auth_htpasswd = None
self.htpasswd = None
if self.enabled():
if options.mode == "transparent":
raise exceptions.OptionsError(
"Proxy Authentication not supported in transparent mode."
)
elif options.mode == "socks5":
raise exceptions.OptionsError(
"Proxy Authentication not supported in SOCKS mode. "
"https://github.com/mitmproxy/mitmproxy/issues/738"
)
# TODO: check for multiple auth options
def http_connect(self, f):
# mode = regular
pass
if self.enabled() and f.mode == "regular":
self.authenticate(f)
def http_request(self, f):
# mode = regular, no via
pass
def requestheaders(self, f):
if self.enabled():
# Are we already authenticated in CONNECT?
if not (f.mode == "regular" and f.server_conn.via):
self.authenticate(f)

View File

@ -7,11 +7,17 @@ from mitmproxy.test import tutils
from mitmproxy.addons import proxyauth
def mkauth(username, password, scheme="basic"):
v = binascii.b2a_base64(
(username + ":" + password).encode("utf8")
).decode("ascii")
return scheme + " " + v
def test_parse_http_basic_auth():
vals = ("basic", "foo", "bar")
assert proxyauth.parse_http_basic_auth(
proxyauth.assemble_http_basic_auth(*vals)
) == vals
mkauth("test", "test")
) == ("basic", "test", "test")
assert not proxyauth.parse_http_basic_auth("")
assert not proxyauth.parse_http_basic_auth("foo bar")
v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
@ -51,3 +57,113 @@ def test_configure():
up,
auth_htpasswd = "nonexistent"
)
ctx.configure(
up,
auth_htpasswd = tutils.test_data.path(
"mitmproxy/net/data/htpasswd"
)
)
assert up.htpasswd
assert up.htpasswd.check_password("test", "test")
assert not up.htpasswd.check_password("test", "foo")
ctx.configure(up, auth_htpasswd = None)
assert not up.htpasswd
tutils.raises(
exceptions.OptionsError,
ctx.configure,
up,
auth_nonanonymous = True,
mode = "transparent"
)
tutils.raises(
exceptions.OptionsError,
ctx.configure,
up,
auth_nonanonymous = True,
mode = "socks5"
)
def test_check():
up = proxyauth.ProxyAuth()
with taddons.context() as ctx:
ctx.configure(up, auth_nonanonymous=True)
f = tflow.tflow()
assert not up.check(f)
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
assert up.check(f)
f.request.headers["Proxy-Authorization"] = "invalid"
assert not up.check(f)
f.request.headers["Proxy-Authorization"] = mkauth(
"test", "test", scheme = "unknown"
)
assert not up.check(f)
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:test")
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
assert up.check(f)
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:foo")
assert not up.check(f)
ctx.configure(
up,
auth_singleuser = None,
auth_htpasswd = tutils.test_data.path(
"mitmproxy/net/data/htpasswd"
)
)
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
assert up.check(f)
f.request.headers["Proxy-Authorization"] = mkauth("test", "foo")
assert not up.check(f)
def test_authenticate():
up = proxyauth.ProxyAuth()
with taddons.context() as ctx:
ctx.configure(up, auth_nonanonymous=True)
f = tflow.tflow()
assert not f.response
up.authenticate(f)
assert f.response.status_code == 407
f = tflow.tflow()
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
up.authenticate(f)
assert not f.response
assert not f.request.headers.get("Proxy-Authorization")
f = tflow.tflow()
f.mode = "transparent"
assert not f.response
up.authenticate(f)
assert f.response.status_code == 401
f = tflow.tflow()
f.mode = "transparent"
f.request.headers["Authorization"] = mkauth("test", "test")
up.authenticate(f)
assert not f.response
assert not f.request.headers.get("Authorization")
def test_handlers():
up = proxyauth.ProxyAuth()
with taddons.context() as ctx:
ctx.configure(up, auth_nonanonymous=True)
f = tflow.tflow()
assert not f.response
up.requestheaders(f)
assert f.response.status_code == 407
f = tflow.tflow()
f.request.method = "CONNECT"
assert not f.response
up.http_connect(f)
assert f.response.status_code == 407