mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 23:09:44 +00:00
console: Port from state to view
First phase of the port - basic flow list and flow view functionality working. More to come.
This commit is contained in:
parent
b9eb1a3479
commit
2b76db1272
@ -17,6 +17,7 @@ import sortedcontainers
|
||||
|
||||
import mitmproxy.flow
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import exceptions
|
||||
|
||||
|
||||
def key_request_start(f: mitmproxy.flow.Flow) -> datetime.datetime:
|
||||
@ -51,6 +52,15 @@ class View(collections.Sequence):
|
||||
self.focus = Focus(self)
|
||||
self.settings = Settings(self)
|
||||
|
||||
def store_count(self):
|
||||
return len(self._store)
|
||||
|
||||
def inbounds(self, index: int) -> bool:
|
||||
"""
|
||||
Is this index >= 0 and < len(self)
|
||||
"""
|
||||
return index >= 0 and index < len(self)
|
||||
|
||||
def _rev(self, idx: int) -> int:
|
||||
"""
|
||||
Reverses an index, if needed
|
||||
@ -112,7 +122,7 @@ class View(collections.Sequence):
|
||||
"""
|
||||
Clears both the state and view.
|
||||
"""
|
||||
self._state.clear()
|
||||
self._store.clear()
|
||||
self._view.clear()
|
||||
self.sig_refresh.send(self)
|
||||
|
||||
@ -157,6 +167,16 @@ class View(collections.Sequence):
|
||||
pass
|
||||
|
||||
# Event handlers
|
||||
def configure(self, opts, updated):
|
||||
filt = None
|
||||
if "filter" in updated:
|
||||
if opts.filter:
|
||||
filt = flowfilter.parse(opts.filter)
|
||||
if not filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid interception filter: %s" % opts.filter
|
||||
)
|
||||
self.set_filter(filt)
|
||||
|
||||
def request(self, f):
|
||||
self.add(f)
|
||||
@ -202,21 +222,11 @@ class Focus:
|
||||
if self.flow:
|
||||
return self.view.index(self.flow)
|
||||
|
||||
def next(self):
|
||||
"""
|
||||
Sets the focus to the next flow.
|
||||
"""
|
||||
if self.flow:
|
||||
idx = min(self.index + 1, len(self.view) - 1)
|
||||
self.flow = self.view[idx]
|
||||
|
||||
def prev(self):
|
||||
"""
|
||||
Sets the focus to the previous flow.
|
||||
"""
|
||||
if self.flow:
|
||||
idx = max(self.index - 1, 0)
|
||||
self.flow = self.view[idx]
|
||||
@index.setter
|
||||
def index(self, idx) -> typing.Optional[int]:
|
||||
if idx < 0 or idx > len(self.view) - 1:
|
||||
raise ValueError("Index out of view bounds")
|
||||
self.flow = self.view[idx]
|
||||
|
||||
def _nearest(self, f, v):
|
||||
return min(v.bisect(f), len(v) - 1)
|
||||
|
@ -109,8 +109,8 @@ class BodyPile(urwid.Pile):
|
||||
|
||||
class ConnectionItem(urwid.WidgetWrap):
|
||||
|
||||
def __init__(self, master, state, flow, focus):
|
||||
self.master, self.state, self.flow = master, state, flow
|
||||
def __init__(self, master, view, flow, focus):
|
||||
self.master, self.view, self.flow = master, view, flow
|
||||
self.f = focus
|
||||
w = self.get_text()
|
||||
urwid.WidgetWrap.__init__(self, w)
|
||||
@ -143,7 +143,7 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
def server_replay_prompt(self, k):
|
||||
a = self.master.addons.get("serverplayback")
|
||||
if k == "a":
|
||||
a.load([i.copy() for i in self.master.state.view])
|
||||
a.load([i.copy() for i in self.master.view])
|
||||
elif k == "t":
|
||||
a.load([self.flow.copy()])
|
||||
signals.update_settings.send(self)
|
||||
@ -163,11 +163,12 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
elif key == "d":
|
||||
if self.flow.killable:
|
||||
self.flow.kill(self.master)
|
||||
self.state.delete_flow(self.flow)
|
||||
self.view.remove(self.view.focus.flow)
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "D":
|
||||
f = self.master.state.duplicate_flow(self.flow)
|
||||
self.master.state.set_focus_flow(f)
|
||||
cp = self.flow.copy()
|
||||
self.master.view.add(cp)
|
||||
self.master.view.focus.flow = cp
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "m":
|
||||
self.flow.marked = not self.flow.marked
|
||||
@ -208,14 +209,14 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
callback = self.server_replay_prompt,
|
||||
)
|
||||
elif key == "U":
|
||||
for f in self.state.flows:
|
||||
for f in self.view:
|
||||
f.marked = False
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "V":
|
||||
if not self.flow.modified():
|
||||
signals.status_message.send(message="Flow not modified.")
|
||||
return
|
||||
self.state.revert(self.flow)
|
||||
self.flow.revert()
|
||||
signals.flowlist_change.send(self)
|
||||
signals.status_message.send(message="Reverted.")
|
||||
elif key == "w":
|
||||
@ -264,38 +265,49 @@ class ConnectionItem(urwid.WidgetWrap):
|
||||
|
||||
class FlowListWalker(urwid.ListWalker):
|
||||
|
||||
def __init__(self, master, state):
|
||||
self.master, self.state = master, state
|
||||
signals.flowlist_change.connect(self.sig_flowlist_change)
|
||||
def __init__(self, master, view):
|
||||
self.master, self.view = master, view
|
||||
self.view.sig_refresh.connect(self.sig_mod)
|
||||
self.view.sig_add.connect(self.sig_mod)
|
||||
self.view.sig_remove.connect(self.sig_mod)
|
||||
self.view.sig_update.connect(self.sig_mod)
|
||||
signals.flowlist_change.connect(self.sig_mod)
|
||||
|
||||
def sig_flowlist_change(self, sender):
|
||||
def sig_mod(self, *args, **kwargs):
|
||||
self._modified()
|
||||
|
||||
def get_focus(self):
|
||||
f, i = self.state.get_focus()
|
||||
f = ConnectionItem(self.master, self.state, f, True) if f else None
|
||||
return f, i
|
||||
if not self.view.focus.flow:
|
||||
return None, 0
|
||||
return ConnectionItem(
|
||||
self.master, self.view, self.view.focus.flow, True
|
||||
), self.view.focus.index
|
||||
|
||||
def set_focus(self, focus):
|
||||
ret = self.state.set_focus(focus)
|
||||
return ret
|
||||
def set_focus(self, index):
|
||||
if self.view.inbounds(index):
|
||||
self.view.focus.index = index
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def get_next(self, pos):
|
||||
f, i = self.state.get_next(pos)
|
||||
f = ConnectionItem(self.master, self.state, f, False) if f else None
|
||||
return f, i
|
||||
pos = pos + 1
|
||||
if not self.view.inbounds(pos):
|
||||
return None, None
|
||||
f = ConnectionItem(self.master, self.view, self.view[pos], False)
|
||||
return f, pos
|
||||
|
||||
def get_prev(self, pos):
|
||||
f, i = self.state.get_prev(pos)
|
||||
f = ConnectionItem(self.master, self.state, f, False) if f else None
|
||||
return f, i
|
||||
pos = pos - 1
|
||||
if not self.view.inbounds(pos):
|
||||
return None, None
|
||||
f = ConnectionItem(self.master, self.view, self.view[pos], False)
|
||||
return f, pos
|
||||
|
||||
|
||||
class FlowListBox(urwid.ListBox):
|
||||
|
||||
def __init__(self, master: "mitmproxy.console.master.ConsoleMaster"):
|
||||
self.master = master
|
||||
super().__init__(FlowListWalker(master, master.state))
|
||||
super().__init__(FlowListWalker(master, master.view))
|
||||
|
||||
def get_method_raw(self, k):
|
||||
if k:
|
||||
@ -331,29 +343,32 @@ class FlowListBox(urwid.ListBox):
|
||||
return
|
||||
scheme, host, port, path = parts
|
||||
f = self.master.create_request(method, scheme, host, port, path)
|
||||
self.master.state.set_focus_flow(f)
|
||||
self.master.view.focus.flow = f
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def keypress(self, size, key):
|
||||
key = common.shortcuts(key)
|
||||
if key == "A":
|
||||
self.master.accept_all()
|
||||
for f in self.master.view:
|
||||
if f.intercepted:
|
||||
f.resume()
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "z":
|
||||
self.master.clear_flows()
|
||||
self.master.view.clear()
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "e":
|
||||
self.master.toggle_eventlog()
|
||||
elif key == "g":
|
||||
self.master.state.set_focus(0)
|
||||
self.master.view.focus.index = 0
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "G":
|
||||
self.master.state.set_focus(self.master.state.flow_count())
|
||||
self.master.view.focus.index = len(self.master.view) - 1
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "f":
|
||||
signals.status_prompt.send(
|
||||
prompt = "Filter View",
|
||||
text = self.master.state.filter_txt,
|
||||
callback = self.master.set_view_filter
|
||||
text = self.master.options.filter,
|
||||
callback = self.master.options.setter("filter")
|
||||
)
|
||||
elif key == "L":
|
||||
signals.status_prompt_path.send(
|
||||
|
@ -127,8 +127,8 @@ TAB_RESP = 1
|
||||
class FlowView(tabs.Tabs):
|
||||
highlight_color = "focusfield"
|
||||
|
||||
def __init__(self, master, state, flow, tab_offset):
|
||||
self.master, self.state, self.flow = master, state, flow
|
||||
def __init__(self, master, view, flow, tab_offset):
|
||||
self.master, self.view, self.flow = master, view, flow
|
||||
super().__init__(
|
||||
[
|
||||
(self.tab_request, self.view_request),
|
||||
@ -164,7 +164,7 @@ class FlowView(tabs.Tabs):
|
||||
return self.conn_text(self.flow.response)
|
||||
|
||||
def view_details(self):
|
||||
return flowdetailview.flowdetails(self.state, self.flow)
|
||||
return flowdetailview.flowdetails(self.view, self.flow)
|
||||
|
||||
def sig_flow_change(self, sender, flow):
|
||||
if flow == self.flow:
|
||||
@ -175,11 +175,8 @@ class FlowView(tabs.Tabs):
|
||||
msg, body = "", [urwid.Text([("error", "[content missing]")])]
|
||||
return msg, body
|
||||
else:
|
||||
full = self.state.get_flow_setting(
|
||||
self.flow,
|
||||
(self.tab_offset, "fullcontents"),
|
||||
False
|
||||
)
|
||||
s = self.view.settings[self.flow]
|
||||
full = s.get((self.tab_offset, "fullcontents"), False)
|
||||
if full:
|
||||
limit = sys.maxsize
|
||||
else:
|
||||
@ -237,9 +234,9 @@ class FlowView(tabs.Tabs):
|
||||
return description, text_objects
|
||||
|
||||
def viewmode_get(self):
|
||||
override = self.state.get_flow_setting(
|
||||
self.flow,
|
||||
(self.tab_offset, "prettyview")
|
||||
override = self.view.settings[self.flow].get(
|
||||
(self.tab_offset, "prettyview"),
|
||||
None
|
||||
)
|
||||
return self.master.options.default_contentview if override is None else override
|
||||
|
||||
@ -284,7 +281,7 @@ class FlowView(tabs.Tabs):
|
||||
]
|
||||
)
|
||||
]
|
||||
return searchable.Searchable(self.state, txt)
|
||||
return searchable.Searchable(self.view, txt)
|
||||
|
||||
def set_method_raw(self, m):
|
||||
if m:
|
||||
@ -468,18 +465,20 @@ class FlowView(tabs.Tabs):
|
||||
|
||||
def _view_nextprev_flow(self, np, flow):
|
||||
try:
|
||||
idx = self.state.view.index(flow)
|
||||
idx = self.view.index(flow)
|
||||
except IndexError:
|
||||
return
|
||||
|
||||
new_idx = idx
|
||||
if np == "next":
|
||||
new_flow, new_idx = self.state.get_next(idx)
|
||||
new_idx += 1
|
||||
else:
|
||||
new_flow, new_idx = self.state.get_prev(idx)
|
||||
if new_flow is None:
|
||||
new_idx -= 1
|
||||
if not self.view.inbounds(new_idx):
|
||||
signals.status_message.send(message="No more flows")
|
||||
else:
|
||||
signals.pop_view_state.send(self)
|
||||
self.master.view_flow(new_flow, self.tab_offset)
|
||||
return
|
||||
signals.pop_view_state.send(self)
|
||||
self.master.view_flow(self.view[new_idx], self.tab_offset)
|
||||
|
||||
def view_next_flow(self, flow):
|
||||
return self._view_nextprev_flow("next", flow)
|
||||
|
@ -9,7 +9,6 @@ import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
import weakref
|
||||
|
||||
import urwid
|
||||
from typing import Optional
|
||||
@ -19,9 +18,8 @@ from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import master
|
||||
from mitmproxy import io
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import log
|
||||
from mitmproxy.addons import state
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy.addons import intercept
|
||||
import mitmproxy.options
|
||||
from mitmproxy.tools.console import flowlist
|
||||
@ -34,7 +32,6 @@ from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import statusbar
|
||||
from mitmproxy.tools.console import window
|
||||
from mitmproxy.flowfilter import FMarked
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
from mitmproxy.net import tcp
|
||||
@ -42,163 +39,6 @@ from mitmproxy.net import tcp
|
||||
EVENTLOG_SIZE = 500
|
||||
|
||||
|
||||
class ConsoleState(state.State):
|
||||
|
||||
def __init__(self):
|
||||
state.State.__init__(self)
|
||||
self.focus = None
|
||||
self.follow_focus = None
|
||||
self.flowsettings = weakref.WeakKeyDictionary()
|
||||
self.last_search = None
|
||||
self.last_filter = ""
|
||||
self.mark_filter = False
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
self.__dict__[name] = value
|
||||
signals.update_settings.send(self)
|
||||
|
||||
def add_flow_setting(self, flow, key, value):
|
||||
d = self.flowsettings.setdefault(flow, {})
|
||||
d[key] = value
|
||||
|
||||
def get_flow_setting(self, flow, key, default=None):
|
||||
d = self.flowsettings.get(flow, {})
|
||||
return d.get(key, default)
|
||||
|
||||
def add_flow(self, f):
|
||||
super().add_flow(f)
|
||||
signals.flowlist_change.send(self)
|
||||
self.update_focus()
|
||||
return f
|
||||
|
||||
def update_flow(self, f):
|
||||
super().update_flow(f)
|
||||
signals.flowlist_change.send(self)
|
||||
self.update_focus()
|
||||
return f
|
||||
|
||||
def set_view_filter(self, txt):
|
||||
ret = super().set_view_filter(txt)
|
||||
self.set_focus(self.focus)
|
||||
return ret
|
||||
|
||||
def get_focus(self):
|
||||
if not self.view or self.focus is None:
|
||||
return None, None
|
||||
return self.view[self.focus], self.focus
|
||||
|
||||
def set_focus(self, idx):
|
||||
if self.view:
|
||||
if idx is None or idx < 0:
|
||||
idx = 0
|
||||
elif idx >= len(self.view):
|
||||
idx = len(self.view) - 1
|
||||
self.focus = idx
|
||||
else:
|
||||
self.focus = None
|
||||
|
||||
def update_focus(self):
|
||||
if self.focus is None:
|
||||
self.set_focus(0)
|
||||
elif self.follow_focus:
|
||||
self.set_focus(len(self.view) - 1)
|
||||
|
||||
def set_focus_flow(self, f):
|
||||
self.set_focus(self.view.index(f))
|
||||
|
||||
def get_from_pos(self, pos):
|
||||
if len(self.view) <= pos or pos < 0:
|
||||
return None, None
|
||||
return self.view[pos], pos
|
||||
|
||||
def get_next(self, pos):
|
||||
return self.get_from_pos(pos + 1)
|
||||
|
||||
def get_prev(self, pos):
|
||||
return self.get_from_pos(pos - 1)
|
||||
|
||||
def delete_flow(self, f):
|
||||
if f in self.view and self.view.index(f) <= self.focus:
|
||||
self.focus -= 1
|
||||
if self.focus < 0:
|
||||
self.focus = None
|
||||
ret = super().delete_flow(f)
|
||||
self.set_focus(self.focus)
|
||||
return ret
|
||||
|
||||
def get_nearest_matching_flow(self, flow, flt):
|
||||
fidx = self.view.index(flow)
|
||||
dist = 1
|
||||
|
||||
fprev = fnext = True
|
||||
while fprev or fnext:
|
||||
fprev, _ = self.get_from_pos(fidx - dist)
|
||||
fnext, _ = self.get_from_pos(fidx + dist)
|
||||
|
||||
if fprev and flowfilter.match(flt, fprev):
|
||||
return fprev
|
||||
elif fnext and flowfilter.match(flt, fnext):
|
||||
return fnext
|
||||
|
||||
dist += 1
|
||||
|
||||
return None
|
||||
|
||||
def enable_marked_filter(self):
|
||||
marked_flows = [f for f in self.flows if f.marked]
|
||||
if not marked_flows:
|
||||
return
|
||||
|
||||
marked_filter = "~%s" % FMarked.code
|
||||
|
||||
# Save Focus
|
||||
last_focus, _ = self.get_focus()
|
||||
nearest_marked = self.get_nearest_matching_flow(last_focus, marked_filter)
|
||||
|
||||
self.last_filter = self.filter_txt
|
||||
self.set_view_filter(marked_filter)
|
||||
|
||||
# Restore Focus
|
||||
if last_focus.marked:
|
||||
self.set_focus_flow(last_focus)
|
||||
else:
|
||||
self.set_focus_flow(nearest_marked)
|
||||
|
||||
self.mark_filter = True
|
||||
|
||||
def disable_marked_filter(self):
|
||||
marked_filter = "~%s" % FMarked.code
|
||||
|
||||
# Save Focus
|
||||
last_focus, _ = self.get_focus()
|
||||
nearest_marked = self.get_nearest_matching_flow(last_focus, marked_filter)
|
||||
|
||||
self.set_view_filter(self.last_filter)
|
||||
self.last_filter = ""
|
||||
|
||||
# Restore Focus
|
||||
if last_focus.marked:
|
||||
self.set_focus_flow(last_focus)
|
||||
else:
|
||||
self.set_focus_flow(nearest_marked)
|
||||
|
||||
self.mark_filter = False
|
||||
|
||||
def clear(self):
|
||||
marked_flows = [f for f in self.view if f.marked]
|
||||
super().clear()
|
||||
|
||||
for f in marked_flows:
|
||||
self.add_flow(f)
|
||||
f.marked = True
|
||||
|
||||
if len(self.flows.views) == 0:
|
||||
self.focus = None
|
||||
else:
|
||||
self.focus = 0
|
||||
self.set_focus(self.focus)
|
||||
|
||||
|
||||
class Options(mitmproxy.options.Options):
|
||||
def __init__(
|
||||
self,
|
||||
@ -210,6 +50,7 @@ class Options(mitmproxy.options.Options):
|
||||
palette: Optional[str] = None,
|
||||
palette_transparent: bool = False,
|
||||
no_mouse: bool = False,
|
||||
follow_focus: bool = False,
|
||||
**kwargs
|
||||
):
|
||||
self.eventlog = eventlog
|
||||
@ -219,6 +60,7 @@ class Options(mitmproxy.options.Options):
|
||||
self.palette = palette
|
||||
self.palette_transparent = palette_transparent
|
||||
self.no_mouse = no_mouse
|
||||
self.follow_focus = follow_focus
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
@ -227,15 +69,12 @@ class ConsoleMaster(master.Master):
|
||||
|
||||
def __init__(self, options, server):
|
||||
super().__init__(options, server)
|
||||
self.state = ConsoleState()
|
||||
self.view = view.View()
|
||||
self.stream_path = None
|
||||
# This line is just for type hinting
|
||||
self.options = self.options # type: Options
|
||||
self.options.errored.connect(self.options_error)
|
||||
|
||||
if options.filter:
|
||||
self.set_view_filter(options.filter)
|
||||
|
||||
self.palette = options.palette
|
||||
self.palette_transparent = options.palette_transparent
|
||||
|
||||
@ -250,7 +89,7 @@ class ConsoleMaster(master.Master):
|
||||
signals.push_view_state.connect(self.sig_push_view_state)
|
||||
signals.sig_add_log.connect(self.sig_add_log)
|
||||
self.addons.add(*addons.default_addons())
|
||||
self.addons.add(self.state, intercept.Intercept())
|
||||
self.addons.add(intercept.Intercept(), self.view)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
self.__dict__[name] = value
|
||||
@ -432,13 +271,13 @@ class ConsoleMaster(master.Master):
|
||||
|
||||
if self.options.rfile:
|
||||
ret = self.load_flows_path(self.options.rfile)
|
||||
if ret and self.state.flow_count():
|
||||
if ret and self.view.store_count():
|
||||
signals.add_log(
|
||||
"File truncated or corrupted. "
|
||||
"Loaded as many flows as possible.",
|
||||
"error"
|
||||
)
|
||||
elif ret and not self.state.flow_count():
|
||||
elif ret and not self.view.store_count():
|
||||
self.shutdown()
|
||||
print("Could not load file: {}".format(ret), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
@ -533,17 +372,12 @@ class ConsoleMaster(master.Master):
|
||||
def view_flowlist(self):
|
||||
if self.ui.started:
|
||||
self.ui.clear()
|
||||
if self.state.follow_focus:
|
||||
self.state.set_focus(self.state.flow_count())
|
||||
|
||||
if self.options.eventlog:
|
||||
body = flowlist.BodyPile(self)
|
||||
else:
|
||||
body = flowlist.FlowListBox(self)
|
||||
|
||||
if self.follow:
|
||||
self.toggle_follow_flows()
|
||||
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
@ -556,12 +390,12 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
|
||||
def view_flow(self, flow, tab_offset=0):
|
||||
self.state.set_focus_flow(flow)
|
||||
self.view.focus.flow = flow
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
flowview.FlowView(self, self.state, flow, tab_offset),
|
||||
flowview.FlowView(self, self.view, flow, tab_offset),
|
||||
flowview.FlowViewHeader(self, flow),
|
||||
statusbar.StatusBar(self, flowview.footer),
|
||||
flowview.help_context
|
||||
@ -585,7 +419,7 @@ class ConsoleMaster(master.Master):
|
||||
return self._write_flows(path, [flow])
|
||||
|
||||
def save_flows(self, path):
|
||||
return self._write_flows(path, self.state.view)
|
||||
return self._write_flows(path, self.view)
|
||||
|
||||
def load_flows_callback(self, path):
|
||||
if not path:
|
||||
@ -602,14 +436,6 @@ class ConsoleMaster(master.Master):
|
||||
signals.flowlist_change.send(self)
|
||||
return reterr
|
||||
|
||||
def accept_all(self):
|
||||
self.state.accept_all(self)
|
||||
|
||||
def set_view_filter(self, txt):
|
||||
v = self.state.set_view_filter(txt)
|
||||
signals.flowlist_change.send(self)
|
||||
return v
|
||||
|
||||
def edit_scripts(self, scripts):
|
||||
self.options.scripts = [x[0] for x in scripts]
|
||||
|
||||
@ -617,56 +443,14 @@ class ConsoleMaster(master.Master):
|
||||
if a != "n":
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
def shutdown(self):
|
||||
self.state.killall(self)
|
||||
master.Master.shutdown(self)
|
||||
|
||||
def clear_flows(self):
|
||||
self.state.clear()
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def toggle_follow_flows(self):
|
||||
# toggle flow follow
|
||||
self.state.follow_focus = not self.state.follow_focus
|
||||
# jump to most recent flow if follow is now on
|
||||
if self.state.follow_focus:
|
||||
self.state.set_focus(self.state.flow_count())
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def delete_flow(self, f):
|
||||
self.state.delete_flow(f)
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def refresh_focus(self):
|
||||
if self.state.view:
|
||||
signals.flow_change.send(
|
||||
self,
|
||||
flow = self.state.view[self.state.focus]
|
||||
)
|
||||
|
||||
def process_flow(self, f):
|
||||
signals.flowlist_change.send(self)
|
||||
signals.flow_change.send(self, flow=f)
|
||||
if self.view.focus.flow:
|
||||
signals.flow_change.send(self, flow = self.view.focus.flow)
|
||||
|
||||
def clear_events(self):
|
||||
self.logbuffer[:] = []
|
||||
|
||||
# Handlers
|
||||
@controller.handler
|
||||
def error(self, f):
|
||||
super().error(f)
|
||||
self.process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def request(self, f):
|
||||
super().request(f)
|
||||
self.process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def response(self, f):
|
||||
super().response(f)
|
||||
self.process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def tcp_message(self, f):
|
||||
super().tcp_message(f)
|
||||
|
@ -164,10 +164,10 @@ class StatusBar(urwid.WidgetWrap):
|
||||
r.append("[")
|
||||
r.append(("heading_key", "i"))
|
||||
r.append(":%s]" % self.master.options.intercept)
|
||||
if self.master.state.filter_txt:
|
||||
if self.master.options.filter:
|
||||
r.append("[")
|
||||
r.append(("heading_key", "f"))
|
||||
r.append(":%s]" % self.master.state.filter_txt)
|
||||
r.append(":%s]" % self.master.options.filter)
|
||||
if self.master.options.stickycookie:
|
||||
r.append("[")
|
||||
r.append(("heading_key", "t"))
|
||||
@ -194,7 +194,7 @@ class StatusBar(urwid.WidgetWrap):
|
||||
opts.append("killextra")
|
||||
if self.master.options.no_upstream_cert:
|
||||
opts.append("no-upstream-cert")
|
||||
if self.master.state.follow_focus:
|
||||
if self.master.options.follow_focus:
|
||||
opts.append("following")
|
||||
if self.master.options.stream_large_bodies:
|
||||
opts.append(
|
||||
@ -224,11 +224,11 @@ class StatusBar(urwid.WidgetWrap):
|
||||
return r
|
||||
|
||||
def redraw(self):
|
||||
fc = self.master.state.flow_count()
|
||||
if self.master.state.focus is None:
|
||||
fc = len(self.master.view)
|
||||
if self.master.view.focus.flow is None:
|
||||
offset = 0
|
||||
else:
|
||||
offset = min(self.master.state.focus + 1, fc)
|
||||
offset = self.master.view.focus.index + 1
|
||||
t = [
|
||||
('heading', ("[%s/%s]" % (offset, fc)).ljust(9))
|
||||
]
|
||||
|
@ -1,6 +1,8 @@
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import options
|
||||
from mitmproxy.test import taddons
|
||||
|
||||
from .. import tutils
|
||||
|
||||
@ -31,6 +33,10 @@ def test_simple():
|
||||
assert list(v) == [f, f3, f2]
|
||||
assert len(v._store) == 3
|
||||
|
||||
v.clear()
|
||||
assert len(v) == 0
|
||||
assert len(v._store) == 0
|
||||
|
||||
|
||||
def tft(*, method="get", start=0):
|
||||
f = tflow.tflow()
|
||||
@ -236,31 +242,6 @@ def test_focus():
|
||||
assert f.index is None
|
||||
|
||||
|
||||
def test_focus_nextprev():
|
||||
v = view.View()
|
||||
# Nops on an empty view
|
||||
v.focus.next()
|
||||
v.focus.prev()
|
||||
|
||||
# Nops on a single-flow view
|
||||
v.add(tft(start=0))
|
||||
assert v.focus.flow == v[0]
|
||||
v.focus.next()
|
||||
assert v.focus.flow == v[0]
|
||||
v.focus.prev()
|
||||
assert v.focus.flow == v[0]
|
||||
|
||||
v.add(tft(start=1))
|
||||
v.focus.next()
|
||||
assert v.focus.flow == v[1]
|
||||
v.focus.next()
|
||||
assert v.focus.flow == v[1]
|
||||
v.focus.prev()
|
||||
assert v.focus.flow == v[0]
|
||||
v.focus.prev()
|
||||
assert v.focus.flow == v[0]
|
||||
|
||||
|
||||
def test_settings():
|
||||
v = view.View()
|
||||
f = tft()
|
||||
@ -274,3 +255,16 @@ def test_settings():
|
||||
v.remove(f)
|
||||
tutils.raises(KeyError, v.settings.__getitem__, f)
|
||||
assert not v.settings.keys()
|
||||
|
||||
|
||||
class Options(options.Options):
|
||||
def __init__(self, *, filter=None, **kwargs):
|
||||
self.filter = filter
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
def test_configure():
|
||||
v = view.View()
|
||||
with taddons.context(options=Options()) as tctx:
|
||||
tctx.configure(v, filter="~q")
|
||||
tutils.raises("invalid interception filter", tctx.configure, v, filter="~~")
|
||||
|
Loading…
Reference in New Issue
Block a user