Merge pull request #2907 from cortesi/optionscomp

Start moving more complicated options over to /addons
This commit is contained in:
Aldo Cortesi 2018-02-25 10:15:21 +13:00 committed by GitHub
commit 6b5b71aefa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 70 additions and 89 deletions

View File

@ -31,6 +31,23 @@ class Dumper:
self.filter = None # type: flowfilter.TFilter self.filter = None # type: flowfilter.TFilter
self.outfp = outfile # type: typing.io.TextIO self.outfp = outfile # type: typing.io.TextIO
def load(self, loader):
loader.add_option(
"flow_detail", int, 1,
"""
The display detail level for flows in mitmdump: 0 (almost quiet) to 3 (very verbose).
0: shortened request URL, response status code, WebSocket and TCP message notifications.
1: full request URL with response status code
2: 1 + HTTP headers
3: 2 + full response content, content of WebSocket and TCP messages.
"""
)
loader.add_option(
"dumper_default_contentview", str, "auto",
"The default content view mode.",
choices = [i.name.lower() for i in contentviews.views]
)
def configure(self, updated): def configure(self, updated):
if "view_filter" in updated: if "view_filter" in updated:
if ctx.options.view_filter: if ctx.options.view_filter:
@ -61,7 +78,7 @@ class Dumper:
def _echo_message(self, message): def _echo_message(self, message):
_, lines, error = contentviews.get_message_content_view( _, lines, error = contentviews.get_message_content_view(
ctx.options.default_contentview, ctx.options.dumper_default_contentview,
message message
) )
if error: if error:

View File

@ -1,3 +1,5 @@
import typing
from mitmproxy import flowfilter from mitmproxy import flowfilter
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import ctx from mitmproxy import ctx
@ -7,6 +9,17 @@ class Intercept:
def __init__(self): def __init__(self):
self.filt = None self.filt = None
def load(self, loader):
loader.add_option(
"intercept_active", bool, False,
"Intercept toggle"
)
loader.add_option(
"intercept", typing.Optional[str], None,
"Intercept filter expression."
)
def configure(self, updated): def configure(self, updated):
if "intercept" in updated: if "intercept" in updated:
if not ctx.options.intercept: if not ctx.options.intercept:

View File

@ -52,6 +52,18 @@ class ProxyAuth:
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]] self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
"""Contains all connections that are permanently authenticated after an HTTP CONNECT""" """Contains all connections that are permanently authenticated after an HTTP CONNECT"""
def load(self, loader):
loader.add_option(
"proxyauth", Optional[str], None,
"""
Require proxy authentication. Format:
"username:pass",
"any" to accept any user/pass combination,
"@path" to use an Apache htpasswd file,
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
"""
)
def enabled(self) -> bool: def enabled(self) -> bool:
return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver]) return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver])

View File

@ -1,7 +1,6 @@
from typing import Optional, Sequence from typing import Optional, Sequence
from mitmproxy import optmanager from mitmproxy import optmanager
from mitmproxy import contentviews
from mitmproxy.net import tls from mitmproxy.net import tls
log_verbosity = [ log_verbosity = [
@ -57,11 +56,6 @@ class Options(optmanager.OptManager):
# FIXME: Options that must be migrated to addons, but are complicated # FIXME: Options that must be migrated to addons, but are complicated
# because they're used by more than one addon, or because they're # because they're used by more than one addon, or because they're
# embedded in the core code somehow. # embedded in the core code somehow.
default_contentview = None # type: str
flow_detail = None # type: int
intercept = None # type: Optional[str]
intercept_active = None # type: bool
proxyauth = None # type: Optional[str]
showhost = None # type: bool showhost = None # type: bool
verbosity = None # type: str verbosity = None # type: str
view_filter = None # type: Optional[str] view_filter = None # type: Optional[str]
@ -81,23 +75,8 @@ class Options(optmanager.OptManager):
"Log verbosity.", "Log verbosity.",
choices=log_verbosity choices=log_verbosity
) )
self.add_option(
"default_contentview", str, "auto",
"The default content view mode.",
choices = [i.name.lower() for i in contentviews.views]
)
# Proxy options # Proxy options
self.add_option(
"proxyauth", Optional[str], None,
"""
Require proxy authentication. Format:
"username:pass",
"any" to accept any user/pass combination,
"@path" to use an Apache htpasswd file,
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
"""
)
self.add_option( self.add_option(
"add_upstream_certs_to_client_chain", bool, False, "add_upstream_certs_to_client_chain", bool, False,
""" """
@ -252,31 +231,9 @@ class Options(optmanager.OptManager):
""" """
) )
self.add_option(
"intercept_active", bool, False,
"Intercept toggle"
)
self.add_option(
"intercept", Optional[str], None,
"Intercept filter expression."
)
self.add_option( self.add_option(
"view_filter", Optional[str], None, "view_filter", Optional[str], None,
"Limit which flows are displayed." "Limit which flows are displayed."
) )
# Dump options
self.add_option(
"flow_detail", int, 1,
"""
The display detail level for flows in mitmdump: 0 (almost quiet) to 3 (very verbose).
0: shortened request URL, response status code, WebSocket and TCP message notifications.
1: full request URL with response status code
2: 1 + HTTP headers
3: 2 + full response content, content of WebSocket and TCP messages.
"""
)
self.update(**kwargs) self.update(**kwargs)

View File

@ -76,6 +76,11 @@ class ConsoleAddon:
self.started = False self.started = False
def load(self, loader): def load(self, loader):
loader.add_option(
"console_default_contentview", str, "auto",
"The default content view mode.",
choices = [i.name.lower() for i in contentviews.views]
)
loader.add_option( loader.add_option(
"console_layout", str, "single", "console_layout", str, "single",
"Console layout.", "Console layout.",
@ -110,15 +115,6 @@ class ConsoleAddon:
""" """
return ["single", "vertical", "horizontal"] return ["single", "vertical", "horizontal"]
@command.command("console.intercept.toggle")
def intercept_toggle(self) -> None:
"""
Toggles interception on/off leaving intercept filters intact.
"""
ctx.options.update(
intercept_active = not ctx.options.intercept_active
)
@command.command("console.layout.cycle") @command.command("console.layout.cycle")
def layout_cycle(self) -> None: def layout_cycle(self) -> None:
""" """
@ -540,7 +536,7 @@ class ConsoleAddon:
[ [
"@focus", "@focus",
"flowview_mode_%s" % idx, "flowview_mode_%s" % idx,
self.master.options.default_contentview, self.master.options.console_default_contentview,
] ]
) )

View File

@ -26,7 +26,7 @@ def map(km):
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down") km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up") km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
km.add("I", "console.intercept.toggle", ["global"], "Toggle intercept") km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept")
km.add("i", "console.command.set intercept", ["global"], "Set intercept") km.add("i", "console.command.set intercept", ["global"], "Set intercept")
km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file") km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows") km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")

View File

@ -197,10 +197,10 @@ class StatusBar(urwid.WidgetWrap):
r.append("[") r.append("[")
r.append(("heading_key", "u")) r.append(("heading_key", "u"))
r.append(":%s]" % self.master.options.stickyauth) r.append(":%s]" % self.master.options.stickyauth)
if self.master.options.default_contentview != "auto": if self.master.options.console_default_contentview != "auto":
r.append("[") r.append("[")
r.append(("heading_key", "M")) r.append(("heading_key", "M"))
r.append(":%s]" % self.master.options.default_contentview) r.append(":%s]" % self.master.options.console_default_contentview)
if self.master.options.has_changed("view_order"): if self.master.options.has_changed("view_order"):
r.append("[") r.append("[")
r.append(("heading_key", "o")) r.append(("heading_key", "o"))

View File

@ -1,7 +1,7 @@
from unittest import mock from unittest import mock
import pytest import pytest
from mitmproxy.addons import allowremote from mitmproxy.addons import allowremote, proxyauth
from mitmproxy.test import taddons from mitmproxy.test import taddons
@ -19,7 +19,8 @@ from mitmproxy.test import taddons
]) ])
def test_allowremote(allow_remote, ip, should_be_killed): def test_allowremote(allow_remote, ip, should_be_killed):
ar = allowremote.AllowRemote() ar = allowremote.AllowRemote()
with taddons.context(ar) as tctx: up = proxyauth.ProxyAuth()
with taddons.context(ar, up) as tctx:
tctx.options.allow_remote = allow_remote tctx.options.allow_remote = allow_remote
with mock.patch('mitmproxy.proxy.protocol.base.Layer') as layer: with mock.patch('mitmproxy.proxy.protocol.base.Layer') as layer:

View File

@ -14,7 +14,7 @@ from mitmproxy import http
def test_configure(): def test_configure():
d = dumper.Dumper() d = dumper.Dumper()
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, view_filter="~b foo") ctx.configure(d, view_filter="~b foo")
assert d.filter assert d.filter
@ -33,7 +33,7 @@ def test_configure():
def test_simple(): def test_simple():
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=0) ctx.configure(d, flow_detail=0)
d.response(tflow.tflow(resp=True)) d.response(tflow.tflow(resp=True))
assert not sio.getvalue() assert not sio.getvalue()
@ -101,7 +101,7 @@ def test_echo_body():
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=3) ctx.configure(d, flow_detail=3)
d._echo_message(f.response) d._echo_message(f.response)
t = sio.getvalue() t = sio.getvalue()
@ -111,7 +111,7 @@ def test_echo_body():
def test_echo_request_line(): def test_echo_request_line():
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=3, showhost=True) ctx.configure(d, flow_detail=3, showhost=True)
f = tflow.tflow(client_conn=None, server_conn=True, resp=True) f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
f.request.is_replay = True f.request.is_replay = True
@ -146,7 +146,7 @@ class TestContentView:
view_auto.side_effect = exceptions.ContentViewException("") view_auto.side_effect = exceptions.ContentViewException("")
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=4, verbosity='debug') ctx.configure(d, flow_detail=4, verbosity='debug')
d.response(tflow.tflow()) d.response(tflow.tflow())
assert ctx.master.has_log("content viewer failed") assert ctx.master.has_log("content viewer failed")
@ -155,7 +155,7 @@ class TestContentView:
def test_tcp(): def test_tcp():
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=3, showhost=True) ctx.configure(d, flow_detail=3, showhost=True)
f = tflow.ttcpflow() f = tflow.ttcpflow()
d.tcp_message(f) d.tcp_message(f)
@ -170,7 +170,7 @@ def test_tcp():
def test_websocket(): def test_websocket():
sio = io.StringIO() sio = io.StringIO()
d = dumper.Dumper(sio) d = dumper.Dumper(sio)
with taddons.context() as ctx: with taddons.context(d) as ctx:
ctx.configure(d, flow_detail=3, showhost=True) ctx.configure(d, flow_detail=3, showhost=True)
f = tflow.twebsocketflow() f = tflow.twebsocketflow()
d.websocket_message(f) d.websocket_message(f)

View File

@ -8,7 +8,7 @@ from mitmproxy.test import tflow
def test_simple(): def test_simple():
r = intercept.Intercept() r = intercept.Intercept()
with taddons.context() as tctx: with taddons.context(r) as tctx:
assert not r.filt assert not r.filt
tctx.configure(r, intercept="~q") tctx.configure(r, intercept="~q")
assert r.filt assert r.filt

View File

@ -49,7 +49,7 @@ class TestProxyAuth:
]) ])
def test_is_proxy_auth(self, mode, expected): def test_is_proxy_auth(self, mode, expected):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context() as ctx: with taddons.context(up) as ctx:
ctx.options.mode = mode ctx.options.mode = mode
assert up.is_proxy_auth() is expected assert up.is_proxy_auth() is expected
@ -75,7 +75,7 @@ class TestProxyAuth:
def test_check(self): def test_check(self):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context() as ctx: with taddons.context(up) as ctx:
ctx.configure(up, proxyauth="any", mode="regular") ctx.configure(up, proxyauth="any", mode="regular")
f = tflow.tflow() f = tflow.tflow()
assert not up.check(f) assert not up.check(f)
@ -133,7 +133,7 @@ class TestProxyAuth:
def test_authenticate(self): def test_authenticate(self):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context() as ctx: with taddons.context(up) as ctx:
ctx.configure(up, proxyauth="any", mode="regular") ctx.configure(up, proxyauth="any", mode="regular")
f = tflow.tflow() f = tflow.tflow()
@ -165,7 +165,7 @@ class TestProxyAuth:
def test_configure(self): def test_configure(self):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context() as ctx: with taddons.context(up) as ctx:
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
ctx.configure(up, proxyauth="foo") ctx.configure(up, proxyauth="foo")
@ -223,7 +223,7 @@ class TestProxyAuth:
def test_handlers(self): def test_handlers(self):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context() as ctx: with taddons.context(up) as ctx:
ctx.configure(up, proxyauth="any", mode="regular") ctx.configure(up, proxyauth="any", mode="regular")
f = tflow.tflow() f = tflow.tflow()

View File

@ -1,8 +1,6 @@
import urwid import urwid
from mitmproxy import options from mitmproxy import options
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.tools import console from mitmproxy.tools import console
from ... import tservers from ... import tservers
@ -24,16 +22,3 @@ class TestMaster(tservers.MasterTest):
except urwid.ExitMainLoop: except urwid.ExitMainLoop:
pass pass
assert len(m.view) == i assert len(m.view) == i
def test_intercept(self):
"""regression test for https://github.com/mitmproxy/mitmproxy/issues/1605"""
m = self.mkmaster(intercept="~b bar")
f = tflow.tflow(req=tutils.treq(content=b"foo"))
m.addons.handle_lifecycle("request", f)
assert not m.view[0].intercepted
f = tflow.tflow(req=tutils.treq(content=b"bar"))
m.addons.handle_lifecycle("request", f)
assert m.view[1].intercepted
f = tflow.tflow(resp=tutils.tresp(content=b"bar"))
m.addons.handle_lifecycle("request", f)
assert m.view[2].intercepted

View File

@ -14,7 +14,7 @@ def test_statusbar(monkeypatch):
view_filter="~dst example.com", view_filter="~dst example.com",
stickycookie="~dst example.com", stickycookie="~dst example.com",
stickyauth="~dst example.com", stickyauth="~dst example.com",
default_contentview="javascript", console_default_contentview="javascript",
anticache=True, anticache=True,
anticomp=True, anticomp=True,
showhost=True, showhost=True,

View File

@ -11,7 +11,7 @@ from .. import tservers
class TestDumpMaster(tservers.MasterTest): class TestDumpMaster(tservers.MasterTest):
def mkmaster(self, flt, **opts): def mkmaster(self, flt, **opts):
o = options.Options(view_filter=flt, verbosity='error', flow_detail=0, **opts) o = options.Options(view_filter=flt, verbosity='error', **opts)
m = dump.DumpMaster(o, with_termlog=False, with_dumper=False) m = dump.DumpMaster(o, with_termlog=False, with_dumper=False)
return m return m