addon options: setheaders, stickyauth

This commit is contained in:
Aldo Cortesi 2018-02-24 15:45:12 +13:00
parent 12633adeb9
commit 704c1db1b7
8 changed files with 36 additions and 35 deletions

View File

@ -1,3 +1,5 @@
import typing
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import flowfilter from mitmproxy import flowfilter
from mitmproxy import ctx from mitmproxy import ctx
@ -44,6 +46,15 @@ class SetHeaders:
def __init__(self): def __init__(self):
self.lst = [] self.lst = []
def load(self, loader):
loader.add_option(
"setheaders", typing.Sequence[str], [],
"""
Header set pattern of the form "/pattern/header/value", where the
separator can be any character.
"""
)
def configure(self, updated): def configure(self, updated):
if "setheaders" in updated: if "setheaders" in updated:
self.lst = [] self.lst = []

View File

@ -1,3 +1,5 @@
import typing
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import flowfilter from mitmproxy import flowfilter
from mitmproxy import ctx from mitmproxy import ctx
@ -8,6 +10,12 @@ class StickyAuth:
self.flt = None self.flt = None
self.hosts = {} self.hosts = {}
def load(self, loader):
loader.add_option(
"stickyauth", typing.Optional[str], None,
"Set sticky auth filter. Matched against requests."
)
def configure(self, updated): def configure(self, updated):
if "stickyauth" in updated: if "stickyauth" in updated:
if ctx.options.stickyauth: if ctx.options.stickyauth:

View File

@ -67,7 +67,6 @@ class Options(optmanager.OptManager):
view_filter = None # type: Optional[str] view_filter = None # type: Optional[str]
# FIXME: Options that should be uncomplicated to migrate to addons # FIXME: Options that should be uncomplicated to migrate to addons
setheaders = None # type: Sequence[str]
stickyauth = None # type: Optional[str] stickyauth = None # type: Optional[str]
stickycookie = None # type: Optional[str] stickycookie = None # type: Optional[str]
stream_large_bodies = None # type: Optional[str] stream_large_bodies = None # type: Optional[str]
@ -90,21 +89,10 @@ class Options(optmanager.OptManager):
"showhost", bool, False, "showhost", bool, False,
"Use the Host header to construct URLs for display." "Use the Host header to construct URLs for display."
) )
self.add_option(
"setheaders", Sequence[str], [],
"""
Header set pattern of the form "/pattern/header/value", where the
separator can be any character.
"""
)
self.add_option( self.add_option(
"stickycookie", Optional[str], None, "stickycookie", Optional[str], None,
"Set sticky cookie filter. Matched against requests." "Set sticky cookie filter. Matched against requests."
) )
self.add_option(
"stickyauth", Optional[str], None,
"Set sticky auth filter. Matched against requests."
)
self.add_option( self.add_option(
"stream_large_bodies", Optional[str], None, "stream_large_bodies", Optional[str], None,
""" """

View File

@ -68,7 +68,7 @@ class context:
self.wrapped = None self.wrapped = None
for a in addons: for a in addons:
self.master.addons.register(a) self.master.addons.add(a)
def ctx(self): def ctx(self):
""" """

View File

@ -128,28 +128,23 @@ def test_options(tmpdir):
p = str(tmpdir.join("path")) p = str(tmpdir.join("path"))
sa = core.Core() sa = core.Core()
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.options.stickycookie = "foo" tctx.options.listen_host = "foo"
assert tctx.options.stickycookie == "foo" assert tctx.options.listen_host == "foo"
sa.options_reset() sa.options_reset_one("listen_host")
assert tctx.options.stickycookie is None assert tctx.options.listen_host != "foo"
tctx.options.stickycookie = "foo"
tctx.options.stickyauth = "bar"
sa.options_reset_one("stickycookie")
assert tctx.options.stickycookie is None
assert tctx.options.stickyauth == "bar"
with pytest.raises(exceptions.CommandError): with pytest.raises(exceptions.CommandError):
sa.options_reset_one("unknown") sa.options_reset_one("unknown")
tctx.options.listen_host = "foo"
sa.options_save(p) sa.options_save(p)
with pytest.raises(exceptions.CommandError): with pytest.raises(exceptions.CommandError):
sa.options_save("/") sa.options_save("/")
sa.options_reset() sa.options_reset()
assert tctx.options.stickyauth is None assert tctx.options.listen_host == ""
sa.options_load(p) sa.options_load(p)
assert tctx.options.stickyauth == "bar" assert tctx.options.listen_host == "foo"
sa.options_load("/nonexistent") sa.options_load("/nonexistent")

View File

@ -171,7 +171,7 @@ class TestScriptLoader:
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context() as tctx: with taddons.context(sc) as tctx:
sc.script_run([tflow.tflow(resp=True)], rp) sc.script_run([tflow.tflow(resp=True)], rp)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"] debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [ assert debug == [
@ -183,7 +183,7 @@ class TestScriptLoader:
def test_script_run_nonexistent(self): def test_script_run_nonexistent(self):
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context(): with taddons.context(sc):
with pytest.raises(exceptions.CommandError): with pytest.raises(exceptions.CommandError):
sc.script_run([tflow.tflow(resp=True)], "/") sc.script_run([tflow.tflow(resp=True)], "/")
@ -208,8 +208,7 @@ class TestScriptLoader:
def test_dupes(self): def test_dupes(self):
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context() as tctx: with taddons.context(sc) as tctx:
tctx.master.addons.add(sc)
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
tctx.configure( tctx.configure(
sc, sc,
@ -232,7 +231,7 @@ class TestScriptLoader:
def test_load_err(self): def test_load_err(self):
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context() as tctx: with taddons.context(sc) as tctx:
tctx.configure(sc, scripts=[ tctx.configure(sc, scripts=[
tutils.test_data.path("mitmproxy/data/addonscripts/load_error.py") tutils.test_data.path("mitmproxy/data/addonscripts/load_error.py")
]) ])
@ -242,7 +241,7 @@ class TestScriptLoader:
pass # this is expected and normally guarded. pass # this is expected and normally guarded.
# on the next tick we should not fail however. # on the next tick we should not fail however.
tctx.invoke(sc, "tick") tctx.invoke(sc, "tick")
assert len(tctx.master.addons) == 0 assert len(tctx.master.addons) == 1
def test_order(self): def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")

View File

@ -19,14 +19,14 @@ class TestSetHeaders:
def test_configure(self): def test_configure(self):
sh = setheaders.SetHeaders() sh = setheaders.SetHeaders()
with taddons.context() as tctx: with taddons.context(sh) as tctx:
with pytest.raises(Exception, match="Invalid setheader filter pattern"): with pytest.raises(Exception, match="Invalid setheader filter pattern"):
tctx.configure(sh, setheaders = ["/~b/one/two"]) tctx.configure(sh, setheaders = ["/~b/one/two"])
tctx.configure(sh, setheaders = ["/foo/bar/voing"]) tctx.configure(sh, setheaders = ["/foo/bar/voing"])
def test_setheaders(self): def test_setheaders(self):
sh = setheaders.SetHeaders() sh = setheaders.SetHeaders()
with taddons.context() as tctx: with taddons.context(sh) as tctx:
tctx.configure( tctx.configure(
sh, sh,
setheaders = [ setheaders = [

View File

@ -9,7 +9,7 @@ from mitmproxy import exceptions
def test_configure(): def test_configure():
r = stickyauth.StickyAuth() r = stickyauth.StickyAuth()
with taddons.context() as tctx: with taddons.context(r) as tctx:
tctx.configure(r, stickyauth="~s") tctx.configure(r, stickyauth="~s")
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
tctx.configure(r, stickyauth="~~") tctx.configure(r, stickyauth="~~")
@ -20,7 +20,7 @@ def test_configure():
def test_simple(): def test_simple():
r = stickyauth.StickyAuth() r = stickyauth.StickyAuth()
with taddons.context() as tctx: with taddons.context(r) as tctx:
tctx.configure(r, stickyauth=".*") tctx.configure(r, stickyauth=".*")
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
f.request.headers["authorization"] = "foo" f.request.headers["authorization"] = "foo"