mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
Change the way proxy authetication is specified
We now have one option "proxyauth". If this is "any", we accept any credentials, if it starts with an @ it's treated as a path to an htpasswd file, if it is of the form username:password it's a single-user credential.
This commit is contained in:
parent
b0ba765598
commit
aed780bf48
@ -114,30 +114,28 @@ class ProxyAuth:
|
||||
|
||||
# Handlers
|
||||
def configure(self, options, updated):
|
||||
if "auth_nonanonymous" in updated:
|
||||
self.nonanonymous = options.auth_nonanonymous
|
||||
if "auth_singleuser" in updated:
|
||||
if options.auth_singleuser:
|
||||
parts = options.auth_singleuser.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user auth specification."
|
||||
)
|
||||
self.singleuser = parts
|
||||
else:
|
||||
self.singleuser = None
|
||||
if "auth_htpasswd" in updated:
|
||||
if options.auth_htpasswd:
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(
|
||||
options.auth_htpasswd
|
||||
)
|
||||
except (ValueError, OSError) as v:
|
||||
raise exceptions.OptionsError(
|
||||
"Could not open htpasswd file: %s" % v
|
||||
)
|
||||
else:
|
||||
self.htpasswd = None
|
||||
if "proxyauth" in updated:
|
||||
self.nonanonymous = False
|
||||
self.singleuser = None
|
||||
self.htpasswd = None
|
||||
if options.proxyauth:
|
||||
if options.proxyauth == "any":
|
||||
self.nonanonymous = True
|
||||
elif options.proxyauth.startswith("@"):
|
||||
p = options.proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(p)
|
||||
except (ValueError, OSError) as v:
|
||||
raise exceptions.OptionsError(
|
||||
"Could not open htpasswd file: %s" % p
|
||||
)
|
||||
else:
|
||||
parts = options.proxyauth.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user auth specification."
|
||||
)
|
||||
self.singleuser = parts
|
||||
if "mode" in updated:
|
||||
self.mode = options.mode
|
||||
if self.enabled():
|
||||
|
@ -203,20 +203,15 @@ class Options(optmanager.OptManager):
|
||||
|
||||
# Proxy options
|
||||
self.add_option(
|
||||
"auth_nonanonymous", False, bool,
|
||||
"Allow access to any user long as a credentials are specified."
|
||||
)
|
||||
self.add_option(
|
||||
"auth_singleuser", None, Optional[str],
|
||||
"proxyauth", None, Optional[str],
|
||||
"""
|
||||
Allows access to a a single user, specified in the form
|
||||
username:password.
|
||||
Require authentication before proxying requests. If the value is
|
||||
"any", we prompt for authentication, but permit any values. If it
|
||||
starts with an "@", it is treated as a path to an Apache htpasswd
|
||||
file. If its is of the form "username:password", it is treated as a
|
||||
single-user credential.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"auth_htpasswd", None, Optional[str],
|
||||
"Allow access to users specified in an Apache htpasswd file."
|
||||
)
|
||||
self.add_option(
|
||||
"add_upstream_certs_to_client_chain", False, bool,
|
||||
"""
|
||||
|
@ -107,9 +107,7 @@ def common_options(parser, opts):
|
||||
used for authenticating them.
|
||||
"""
|
||||
).add_mutually_exclusive_group()
|
||||
opts.make_parser(group, "auth_nonanonymous")
|
||||
opts.make_parser(group, "auth_singleuser", metavar="USER:PASS")
|
||||
opts.make_parser(group, "auth_htpasswd", metavar="PATH")
|
||||
opts.make_parser(group, "proxyauth", metavar="SPEC")
|
||||
|
||||
|
||||
def mitmproxy(opts):
|
||||
|
@ -28,40 +28,43 @@ def test_configure():
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context() as ctx:
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, auth_singleuser="foo")
|
||||
ctx.configure(up, proxyauth="foo")
|
||||
|
||||
ctx.configure(up, auth_singleuser="foo:bar")
|
||||
ctx.configure(up, proxyauth="foo:bar")
|
||||
assert up.singleuser == ["foo", "bar"]
|
||||
|
||||
ctx.configure(up, auth_singleuser=None)
|
||||
ctx.configure(up, proxyauth=None)
|
||||
assert up.singleuser is None
|
||||
|
||||
ctx.configure(up, auth_nonanonymous=True)
|
||||
ctx.configure(up, proxyauth="any")
|
||||
assert up.nonanonymous
|
||||
ctx.configure(up, auth_nonanonymous=False)
|
||||
ctx.configure(up, proxyauth=None)
|
||||
assert not up.nonanonymous
|
||||
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, auth_htpasswd=tutils.test_data.path("mitmproxy/net/data/server.crt"))
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth= "@" + tutils.test_data.path("mitmproxy/net/data/server.crt")
|
||||
)
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, auth_htpasswd="nonexistent")
|
||||
ctx.configure(up, proxyauth="@nonexistent")
|
||||
|
||||
ctx.configure(
|
||||
up,
|
||||
auth_htpasswd=tutils.test_data.path(
|
||||
proxyauth= "@" + tutils.test_data.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
assert up.htpasswd
|
||||
assert up.htpasswd.check_password("test", "test")
|
||||
assert not up.htpasswd.check_password("test", "foo")
|
||||
ctx.configure(up, auth_htpasswd=None)
|
||||
ctx.configure(up, proxyauth=None)
|
||||
assert not up.htpasswd
|
||||
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, auth_nonanonymous=True, mode="transparent")
|
||||
ctx.configure(up, proxyauth="any", mode="transparent")
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, auth_nonanonymous=True, mode="socks5")
|
||||
ctx.configure(up, proxyauth="any", mode="socks5")
|
||||
|
||||
ctx.configure(up, mode="regular")
|
||||
assert up.mode == "regular"
|
||||
@ -70,7 +73,7 @@ def test_configure():
|
||||
def test_check():
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(up, auth_nonanonymous=True, mode="regular")
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
f = tflow.tflow()
|
||||
assert not up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
@ -86,18 +89,17 @@ def test_check():
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:test")
|
||||
ctx.configure(up, proxyauth="test:test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:foo")
|
||||
ctx.configure(up, proxyauth="test:foo")
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(
|
||||
up,
|
||||
auth_singleuser=None,
|
||||
auth_htpasswd=tutils.test_data.path(
|
||||
proxyauth="@" + tutils.test_data.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
@ -114,7 +116,7 @@ def test_check():
|
||||
def test_authenticate():
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(up, auth_nonanonymous=True, mode="regular")
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
|
||||
f = tflow.tflow()
|
||||
assert not f.response
|
||||
@ -147,7 +149,7 @@ def test_authenticate():
|
||||
def test_handlers():
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(up, auth_nonanonymous=True, mode="regular")
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
|
||||
f = tflow.tflow()
|
||||
assert not f.response
|
||||
|
@ -302,7 +302,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
|
||||
class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
def test_auth(self):
|
||||
self.master.addons.add(proxyauth.ProxyAuth())
|
||||
self.master.options.auth_singleuser = "test:test"
|
||||
self.master.options.proxyauth = "test:test"
|
||||
assert self.pathod("202").status_code == 407
|
||||
p = self.pathoc()
|
||||
with p.connect():
|
||||
@ -321,7 +321,7 @@ class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
class TestHTTPReverseAuth(tservers.ReverseProxyTest):
|
||||
def test_auth(self):
|
||||
self.master.addons.add(proxyauth.ProxyAuth())
|
||||
self.master.options.auth_singleuser = "test:test"
|
||||
self.master.options.proxyauth = "test:test"
|
||||
assert self.pathod("202").status_code == 401
|
||||
p = self.pathoc()
|
||||
with p.connect():
|
||||
|
Loading…
Reference in New Issue
Block a user