mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
Move StickyAuth into addon
This is the first addon that handles an OptionsError, so this commit also demos how this works in console. Handling of command-line erorrs is on its way.
This commit is contained in:
parent
696789b8ec
commit
a8a083a10d
@ -1,9 +1,11 @@
|
|||||||
from __future__ import absolute_import, print_function, division
|
from __future__ import absolute_import, print_function, division
|
||||||
|
|
||||||
from mitmproxy.builtins import anticomp
|
from mitmproxy.builtins import anticomp
|
||||||
|
from mitmproxy.builtins import stickyauth
|
||||||
|
|
||||||
|
|
||||||
def default_addons():
|
def default_addons():
|
||||||
return [
|
return [
|
||||||
anticomp.AntiComp(),
|
anticomp.AntiComp(),
|
||||||
|
stickyauth.StickyAuth(),
|
||||||
]
|
]
|
||||||
|
28
mitmproxy/builtins/stickyauth.py
Normal file
28
mitmproxy/builtins/stickyauth.py
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
|
||||||
|
from mitmproxy import filt
|
||||||
|
from mitmproxy import exceptions
|
||||||
|
|
||||||
|
|
||||||
|
class StickyAuth:
|
||||||
|
def __init__(self):
|
||||||
|
# Compiled filter
|
||||||
|
self.flt = None
|
||||||
|
self.hosts = {}
|
||||||
|
|
||||||
|
def configure(self, options):
|
||||||
|
if options.stickyauth:
|
||||||
|
flt = filt.parse(options.stickyauth)
|
||||||
|
if not flt:
|
||||||
|
raise exceptions.OptionsError(
|
||||||
|
"stickyauth: invalid filter expression: %s" % options.stickyauth
|
||||||
|
)
|
||||||
|
self.flt = flt
|
||||||
|
|
||||||
|
def request(self, flow):
|
||||||
|
host = flow.request.host
|
||||||
|
if "authorization" in flow.request.headers:
|
||||||
|
self.hosts[host] = flow.request.headers["authorization"]
|
||||||
|
elif flow.match(self.flt):
|
||||||
|
if host in self.hosts:
|
||||||
|
flow.request.headers["authorization"] = self.hosts[host]
|
@ -244,11 +244,6 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
print("Sticky cookies error: {}".format(r), file=sys.stderr)
|
print("Sticky cookies error: {}".format(r), file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
r = self.set_stickyauth(options.stickyauth)
|
|
||||||
if r:
|
|
||||||
print("Sticky auth error: {}".format(r), file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
self.set_stream_large_bodies(options.stream_large_bodies)
|
self.set_stream_large_bodies(options.stream_large_bodies)
|
||||||
|
|
||||||
self.refresh_server_playback = options.refresh_server_playback
|
self.refresh_server_playback = options.refresh_server_playback
|
||||||
@ -300,6 +295,9 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
self.__dict__[name] = value
|
self.__dict__[name] = value
|
||||||
signals.update_settings.send(self)
|
signals.update_settings.send(self)
|
||||||
|
|
||||||
|
def set_stickyauth(self, txt):
|
||||||
|
self.options.stickyauth = txt
|
||||||
|
|
||||||
def options_error(self, opts, exc):
|
def options_error(self, opts, exc):
|
||||||
signals.status_message.send(
|
signals.status_message.send(
|
||||||
message=str(exc),
|
message=str(exc),
|
||||||
|
@ -120,7 +120,7 @@ class Options(urwid.WidgetWrap):
|
|||||||
select.Option(
|
select.Option(
|
||||||
"Sticky Auth",
|
"Sticky Auth",
|
||||||
"A",
|
"A",
|
||||||
lambda: master.stickyauth_txt,
|
lambda: master.options.stickyauth,
|
||||||
self.sticky_auth
|
self.sticky_auth
|
||||||
),
|
),
|
||||||
select.Option(
|
select.Option(
|
||||||
@ -262,7 +262,7 @@ class Options(urwid.WidgetWrap):
|
|||||||
def sticky_auth(self):
|
def sticky_auth(self):
|
||||||
signals.status_prompt.send(
|
signals.status_prompt.send(
|
||||||
prompt = "Sticky auth filter",
|
prompt = "Sticky auth filter",
|
||||||
text = self.master.stickyauth_txt,
|
text = self.master.options.stickyauth,
|
||||||
callback = self.master.set_stickyauth
|
callback = self.master.set_stickyauth
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -177,10 +177,10 @@ class StatusBar(urwid.WidgetWrap):
|
|||||||
r.append("[")
|
r.append("[")
|
||||||
r.append(("heading_key", "t"))
|
r.append(("heading_key", "t"))
|
||||||
r.append(":%s]" % self.master.stickycookie_txt)
|
r.append(":%s]" % self.master.stickycookie_txt)
|
||||||
if self.master.stickyauth_txt:
|
if self.master.options.stickyauth:
|
||||||
r.append("[")
|
r.append("[")
|
||||||
r.append(("heading_key", "u"))
|
r.append(("heading_key", "u"))
|
||||||
r.append(":%s]" % self.master.stickyauth_txt)
|
r.append(":%s]" % self.master.options.stickyauth)
|
||||||
if self.master.state.default_body_view.name != "Auto":
|
if self.master.state.default_body_view.name != "Auto":
|
||||||
r.append("[")
|
r.append("[")
|
||||||
r.append(("heading_key", "M"))
|
r.append(("heading_key", "M"))
|
||||||
|
@ -87,9 +87,6 @@ class DumpMaster(flow.FlowMaster):
|
|||||||
if options.stickycookie:
|
if options.stickycookie:
|
||||||
self.set_stickycookie(options.stickycookie)
|
self.set_stickycookie(options.stickycookie)
|
||||||
|
|
||||||
if options.stickyauth:
|
|
||||||
self.set_stickyauth(options.stickyauth)
|
|
||||||
|
|
||||||
if options.outfile:
|
if options.outfile:
|
||||||
err = self.start_stream_to_path(
|
err = self.start_stream_to_path(
|
||||||
options.outfile[0],
|
options.outfile[0],
|
||||||
|
@ -5,7 +5,7 @@ from mitmproxy.flow.io import FlowWriter, FilteredFlowWriter, FlowReader, read_f
|
|||||||
from mitmproxy.flow.master import FlowMaster
|
from mitmproxy.flow.master import FlowMaster
|
||||||
from mitmproxy.flow.modules import (
|
from mitmproxy.flow.modules import (
|
||||||
AppRegistry, ReplaceHooks, SetHeaders, StreamLargeBodies, ClientPlaybackState,
|
AppRegistry, ReplaceHooks, SetHeaders, StreamLargeBodies, ClientPlaybackState,
|
||||||
ServerPlaybackState, StickyCookieState, StickyAuthState
|
ServerPlaybackState, StickyCookieState
|
||||||
)
|
)
|
||||||
from mitmproxy.flow.state import State, FlowView
|
from mitmproxy.flow.state import State, FlowView
|
||||||
|
|
||||||
@ -16,6 +16,5 @@ __all__ = [
|
|||||||
"FlowWriter", "FilteredFlowWriter", "FlowReader", "read_flows_from_paths",
|
"FlowWriter", "FilteredFlowWriter", "FlowReader", "read_flows_from_paths",
|
||||||
"FlowMaster",
|
"FlowMaster",
|
||||||
"AppRegistry", "ReplaceHooks", "SetHeaders", "StreamLargeBodies", "ClientPlaybackState",
|
"AppRegistry", "ReplaceHooks", "SetHeaders", "StreamLargeBodies", "ClientPlaybackState",
|
||||||
"ServerPlaybackState", "StickyCookieState", "StickyAuthState",
|
"ServerPlaybackState", "StickyCookieState", "State", "FlowView",
|
||||||
"State", "FlowView",
|
|
||||||
]
|
]
|
||||||
|
@ -42,9 +42,6 @@ class FlowMaster(controller.Master):
|
|||||||
self.stickycookie_state = None # type: Optional[modules.StickyCookieState]
|
self.stickycookie_state = None # type: Optional[modules.StickyCookieState]
|
||||||
self.stickycookie_txt = None
|
self.stickycookie_txt = None
|
||||||
|
|
||||||
self.stickyauth_state = False # type: Optional[modules.StickyAuthState]
|
|
||||||
self.stickyauth_txt = None
|
|
||||||
|
|
||||||
self.anticache = False
|
self.anticache = False
|
||||||
self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies]
|
self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies]
|
||||||
self.refresh_server_playback = False
|
self.refresh_server_playback = False
|
||||||
@ -136,17 +133,6 @@ class FlowMaster(controller.Master):
|
|||||||
else:
|
else:
|
||||||
self.stream_large_bodies = False
|
self.stream_large_bodies = False
|
||||||
|
|
||||||
def set_stickyauth(self, txt):
|
|
||||||
if txt:
|
|
||||||
flt = filt.parse(txt)
|
|
||||||
if not flt:
|
|
||||||
return "Invalid filter expression."
|
|
||||||
self.stickyauth_state = modules.StickyAuthState(flt)
|
|
||||||
self.stickyauth_txt = txt
|
|
||||||
else:
|
|
||||||
self.stickyauth_state = None
|
|
||||||
self.stickyauth_txt = None
|
|
||||||
|
|
||||||
def start_client_playback(self, flows, exit):
|
def start_client_playback(self, flows, exit):
|
||||||
"""
|
"""
|
||||||
flows: List of flows.
|
flows: List of flows.
|
||||||
@ -326,8 +312,6 @@ class FlowMaster(controller.Master):
|
|||||||
def process_new_request(self, f):
|
def process_new_request(self, f):
|
||||||
if self.stickycookie_state:
|
if self.stickycookie_state:
|
||||||
self.stickycookie_state.handle_request(f)
|
self.stickycookie_state.handle_request(f)
|
||||||
if self.stickyauth_state:
|
|
||||||
self.stickyauth_state.handle_request(f)
|
|
||||||
|
|
||||||
if self.anticache:
|
if self.anticache:
|
||||||
f.request.anticache()
|
f.request.anticache()
|
||||||
|
@ -350,20 +350,3 @@ class StickyCookieState:
|
|||||||
if l:
|
if l:
|
||||||
f.request.stickycookie = True
|
f.request.stickycookie = True
|
||||||
f.request.headers["cookie"] = "; ".join(l)
|
f.request.headers["cookie"] = "; ".join(l)
|
||||||
|
|
||||||
|
|
||||||
class StickyAuthState:
|
|
||||||
def __init__(self, flt):
|
|
||||||
"""
|
|
||||||
flt: Compiled filter.
|
|
||||||
"""
|
|
||||||
self.flt = flt
|
|
||||||
self.hosts = {}
|
|
||||||
|
|
||||||
def handle_request(self, f):
|
|
||||||
host = f.request.host
|
|
||||||
if "authorization" in f.request.headers:
|
|
||||||
self.hosts[host] = f.request.headers["authorization"]
|
|
||||||
elif f.match(self.flt):
|
|
||||||
if host in self.hosts:
|
|
||||||
f.request.headers["authorization"] = self.hosts[host]
|
|
||||||
|
23
test/mitmproxy/builtins/test_stickyauth.py
Normal file
23
test/mitmproxy/builtins/test_stickyauth.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
from .. import tutils, mastertest
|
||||||
|
from mitmproxy.builtins import stickyauth
|
||||||
|
from mitmproxy.flow import master
|
||||||
|
from mitmproxy.flow import state
|
||||||
|
from mitmproxy import options
|
||||||
|
|
||||||
|
|
||||||
|
class TestStickyAuth(mastertest.MasterTest):
|
||||||
|
def test_simple(self):
|
||||||
|
s = state.State()
|
||||||
|
m = master.FlowMaster(options.Options(stickyauth = ".*"), None, s)
|
||||||
|
sa = stickyauth.StickyAuth()
|
||||||
|
m.addons.add(sa)
|
||||||
|
|
||||||
|
f = tutils.tflow(resp=True)
|
||||||
|
f.request.headers["authorization"] = "foo"
|
||||||
|
self.invoke(m, "request", f)
|
||||||
|
|
||||||
|
assert "address" in sa.hosts
|
||||||
|
|
||||||
|
f = tutils.tflow(resp=True)
|
||||||
|
self.invoke(m, "request", f)
|
||||||
|
assert f.request.headers["authorization"] == "foo"
|
@ -120,20 +120,6 @@ class TestStickyCookieState:
|
|||||||
assert "cookie" in f.request.headers
|
assert "cookie" in f.request.headers
|
||||||
|
|
||||||
|
|
||||||
class TestStickyAuthState:
|
|
||||||
|
|
||||||
def test_response(self):
|
|
||||||
s = flow.StickyAuthState(filt.parse(".*"))
|
|
||||||
f = tutils.tflow(resp=True)
|
|
||||||
f.request.headers["authorization"] = "foo"
|
|
||||||
s.handle_request(f)
|
|
||||||
assert "address" in s.hosts
|
|
||||||
|
|
||||||
f = tutils.tflow(resp=True)
|
|
||||||
s.handle_request(f)
|
|
||||||
assert f.request.headers["authorization"] == "foo"
|
|
||||||
|
|
||||||
|
|
||||||
class TestClientPlaybackState:
|
class TestClientPlaybackState:
|
||||||
|
|
||||||
def test_tick(self):
|
def test_tick(self):
|
||||||
@ -1004,26 +990,6 @@ class TestFlowMaster:
|
|||||||
fm.request(f)
|
fm.request(f)
|
||||||
assert f.request.headers["cookie"] == "foo=bar"
|
assert f.request.headers["cookie"] == "foo=bar"
|
||||||
|
|
||||||
def test_stickyauth(self):
|
|
||||||
s = flow.State()
|
|
||||||
fm = flow.FlowMaster(None, None, s)
|
|
||||||
assert "Invalid" in fm.set_stickyauth("~h")
|
|
||||||
fm.set_stickyauth(".*")
|
|
||||||
assert fm.stickyauth_state
|
|
||||||
fm.set_stickyauth(None)
|
|
||||||
assert not fm.stickyauth_state
|
|
||||||
|
|
||||||
fm.set_stickyauth(".*")
|
|
||||||
f = tutils.tflow(resp=True)
|
|
||||||
f.request.headers["authorization"] = "foo"
|
|
||||||
fm.request(f)
|
|
||||||
|
|
||||||
f = tutils.tflow(resp=True)
|
|
||||||
assert fm.stickyauth_state.hosts
|
|
||||||
assert "authorization" not in f.request.headers
|
|
||||||
fm.request(f)
|
|
||||||
assert f.request.headers["authorization"] == "foo"
|
|
||||||
|
|
||||||
def test_stream(self):
|
def test_stream(self):
|
||||||
with tutils.tmpdir() as tdir:
|
with tutils.tmpdir() as tdir:
|
||||||
p = os.path.join(tdir, "foo")
|
p = os.path.join(tdir, "foo")
|
||||||
|
Loading…
Reference in New Issue
Block a user