Merge pull request #1690 from cortesi/consoleview

console: Port from state to view
This commit is contained in:
Aldo Cortesi 2016-10-30 16:27:12 +13:00 committed by GitHub
commit b229d470c4
9 changed files with 178 additions and 469 deletions

View File

@ -17,6 +17,7 @@ import sortedcontainers
import mitmproxy.flow
from mitmproxy import flowfilter
from mitmproxy import exceptions
def key_request_start(f: mitmproxy.flow.Flow) -> datetime.datetime:
@ -35,6 +36,8 @@ class View(collections.Sequence):
super().__init__()
self._store = {}
self.filter = matchall
# Should we show only marked flows?
self.show_marked = False
self.order_key = key_request_start
self.order_reversed = False
self._view = sortedcontainers.SortedListWithKey(key = self.order_key)
@ -51,6 +54,15 @@ class View(collections.Sequence):
self.focus = Focus(self)
self.settings = Settings(self)
def store_count(self):
return len(self._store)
def inbounds(self, index: int) -> bool:
"""
Is this index >= 0 and < len(self)
"""
return index >= 0 and index < len(self)
def _rev(self, idx: int) -> int:
"""
Reverses an index, if needed
@ -82,7 +94,19 @@ class View(collections.Sequence):
def index(self, f: mitmproxy.flow.Flow) -> int:
return self._rev(self._view.index(f))
def _refilter(self):
self._view.clear()
for i in self._store.values():
if self.show_marked and not i.marked:
continue
if self.filter(i):
self._view.add(i)
self.sig_refresh.send(self)
# API
def toggle_marked(self):
self.show_marked = not self.show_marked
self._refilter()
def toggle_reversed(self):
self.order_reversed = not self.order_reversed
@ -102,17 +126,13 @@ class View(collections.Sequence):
Sets the current view filter.
"""
self.filter = flt or matchall
self._view.clear()
for i in self._store.values():
if self.filter(i):
self._view.add(i)
self.sig_refresh.send(self)
self._refilter()
def clear(self):
"""
Clears both the state and view.
"""
self._state.clear()
self._store.clear()
self._view.clear()
self.sig_refresh.send(self)
@ -157,6 +177,16 @@ class View(collections.Sequence):
pass
# Event handlers
def configure(self, opts, updated):
filt = None
if "filter" in updated:
if opts.filter:
filt = flowfilter.parse(opts.filter)
if not filt:
raise exceptions.OptionsError(
"Invalid interception filter: %s" % opts.filter
)
self.set_filter(filt)
def request(self, f):
self.add(f)
@ -202,21 +232,11 @@ class Focus:
if self.flow:
return self.view.index(self.flow)
def next(self):
"""
Sets the focus to the next flow.
"""
if self.flow:
idx = min(self.index + 1, len(self.view) - 1)
self.flow = self.view[idx]
def prev(self):
"""
Sets the focus to the previous flow.
"""
if self.flow:
idx = max(self.index - 1, 0)
self.flow = self.view[idx]
@index.setter
def index(self, idx) -> typing.Optional[int]:
if idx < 0 or idx > len(self.view) - 1:
raise ValueError("Index out of view bounds")
self.flow = self.view[idx]
def _nearest(self, f, v):
return min(v.bisect(f), len(v) - 1)

View File

@ -109,8 +109,8 @@ class BodyPile(urwid.Pile):
class ConnectionItem(urwid.WidgetWrap):
def __init__(self, master, state, flow, focus):
self.master, self.state, self.flow = master, state, flow
def __init__(self, master, view, flow, focus):
self.master, self.view, self.flow = master, view, flow
self.f = focus
w = self.get_text()
urwid.WidgetWrap.__init__(self, w)
@ -143,7 +143,7 @@ class ConnectionItem(urwid.WidgetWrap):
def server_replay_prompt(self, k):
a = self.master.addons.get("serverplayback")
if k == "a":
a.load([i.copy() for i in self.master.state.view])
a.load([i.copy() for i in self.master.view])
elif k == "t":
a.load([self.flow.copy()])
signals.update_settings.send(self)
@ -163,21 +163,18 @@ class ConnectionItem(urwid.WidgetWrap):
elif key == "d":
if self.flow.killable:
self.flow.kill(self.master)
self.state.delete_flow(self.flow)
self.view.remove(self.view.focus.flow)
signals.flowlist_change.send(self)
elif key == "D":
f = self.master.state.duplicate_flow(self.flow)
self.master.state.set_focus_flow(f)
cp = self.flow.copy()
self.master.view.add(cp)
self.master.view.focus.flow = cp
signals.flowlist_change.send(self)
elif key == "m":
self.flow.marked = not self.flow.marked
signals.flowlist_change.send(self)
elif key == "M":
if self.state.mark_filter:
self.state.disable_marked_filter()
else:
self.state.enable_marked_filter()
signals.flowlist_change.send(self)
self.master.view.toggle_marked()
elif key == "r":
try:
self.master.replay_request(self.flow)
@ -208,14 +205,14 @@ class ConnectionItem(urwid.WidgetWrap):
callback = self.server_replay_prompt,
)
elif key == "U":
for f in self.state.flows:
for f in self.view:
f.marked = False
signals.flowlist_change.send(self)
elif key == "V":
if not self.flow.modified():
signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
self.flow.revert()
signals.flowlist_change.send(self)
signals.status_message.send(message="Reverted.")
elif key == "w":
@ -264,38 +261,49 @@ class ConnectionItem(urwid.WidgetWrap):
class FlowListWalker(urwid.ListWalker):
def __init__(self, master, state):
self.master, self.state = master, state
signals.flowlist_change.connect(self.sig_flowlist_change)
def __init__(self, master, view):
self.master, self.view = master, view
self.view.sig_refresh.connect(self.sig_mod)
self.view.sig_add.connect(self.sig_mod)
self.view.sig_remove.connect(self.sig_mod)
self.view.sig_update.connect(self.sig_mod)
signals.flowlist_change.connect(self.sig_mod)
def sig_flowlist_change(self, sender):
def sig_mod(self, *args, **kwargs):
self._modified()
def get_focus(self):
f, i = self.state.get_focus()
f = ConnectionItem(self.master, self.state, f, True) if f else None
return f, i
if not self.view.focus.flow:
return None, 0
return ConnectionItem(
self.master, self.view, self.view.focus.flow, True
), self.view.focus.index
def set_focus(self, focus):
ret = self.state.set_focus(focus)
return ret
def set_focus(self, index):
if self.view.inbounds(index):
self.view.focus.index = index
signals.flowlist_change.send(self)
def get_next(self, pos):
f, i = self.state.get_next(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
pos = pos + 1
if not self.view.inbounds(pos):
return None, None
f = ConnectionItem(self.master, self.view, self.view[pos], False)
return f, pos
def get_prev(self, pos):
f, i = self.state.get_prev(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
pos = pos - 1
if not self.view.inbounds(pos):
return None, None
f = ConnectionItem(self.master, self.view, self.view[pos], False)
return f, pos
class FlowListBox(urwid.ListBox):
def __init__(self, master: "mitmproxy.console.master.ConsoleMaster"):
self.master = master
super().__init__(FlowListWalker(master, master.state))
super().__init__(FlowListWalker(master, master.view))
def get_method_raw(self, k):
if k:
@ -331,29 +339,32 @@ class FlowListBox(urwid.ListBox):
return
scheme, host, port, path = parts
f = self.master.create_request(method, scheme, host, port, path)
self.master.state.set_focus_flow(f)
self.master.view.focus.flow = f
signals.flowlist_change.send(self)
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "A":
self.master.accept_all()
for f in self.master.view:
if f.intercepted:
f.resume()
signals.flowlist_change.send(self)
elif key == "z":
self.master.clear_flows()
self.master.view.clear()
signals.flowlist_change.send(self)
elif key == "e":
self.master.toggle_eventlog()
elif key == "g":
self.master.state.set_focus(0)
self.master.view.focus.index = 0
signals.flowlist_change.send(self)
elif key == "G":
self.master.state.set_focus(self.master.state.flow_count())
self.master.view.focus.index = len(self.master.view) - 1
signals.flowlist_change.send(self)
elif key == "f":
signals.status_prompt.send(
prompt = "Filter View",
text = self.master.state.filter_txt,
callback = self.master.set_view_filter
text = self.master.options.filter,
callback = self.master.options.setter("filter")
)
elif key == "L":
signals.status_prompt_path.send(

View File

@ -127,8 +127,8 @@ TAB_RESP = 1
class FlowView(tabs.Tabs):
highlight_color = "focusfield"
def __init__(self, master, state, flow, tab_offset):
self.master, self.state, self.flow = master, state, flow
def __init__(self, master, view, flow, tab_offset):
self.master, self.view, self.flow = master, view, flow
super().__init__(
[
(self.tab_request, self.view_request),
@ -164,7 +164,7 @@ class FlowView(tabs.Tabs):
return self.conn_text(self.flow.response)
def view_details(self):
return flowdetailview.flowdetails(self.state, self.flow)
return flowdetailview.flowdetails(self.view, self.flow)
def sig_flow_change(self, sender, flow):
if flow == self.flow:
@ -175,11 +175,8 @@ class FlowView(tabs.Tabs):
msg, body = "", [urwid.Text([("error", "[content missing]")])]
return msg, body
else:
full = self.state.get_flow_setting(
self.flow,
(self.tab_offset, "fullcontents"),
False
)
s = self.view.settings[self.flow]
full = s.get((self.tab_offset, "fullcontents"), False)
if full:
limit = sys.maxsize
else:
@ -237,9 +234,9 @@ class FlowView(tabs.Tabs):
return description, text_objects
def viewmode_get(self):
override = self.state.get_flow_setting(
self.flow,
(self.tab_offset, "prettyview")
override = self.view.settings[self.flow].get(
(self.tab_offset, "prettyview"),
None
)
return self.master.options.default_contentview if override is None else override
@ -284,7 +281,7 @@ class FlowView(tabs.Tabs):
]
)
]
return searchable.Searchable(self.state, txt)
return searchable.Searchable(self.view, txt)
def set_method_raw(self, m):
if m:
@ -466,33 +463,25 @@ class FlowView(tabs.Tabs):
)
signals.flow_change.send(self, flow = self.flow)
def _view_nextprev_flow(self, np, flow):
try:
idx = self.state.view.index(flow)
except IndexError:
return
if np == "next":
new_flow, new_idx = self.state.get_next(idx)
else:
new_flow, new_idx = self.state.get_prev(idx)
if new_flow is None:
def view_flow(self, flow):
signals.pop_view_state.send(self)
self.master.view_flow(flow, self.tab_offset)
def _view_nextprev_flow(self, idx, flow):
if not self.view.inbounds(idx):
signals.status_message.send(message="No more flows")
else:
signals.pop_view_state.send(self)
self.master.view_flow(new_flow, self.tab_offset)
return
self.view_flow(self.view[idx])
def view_next_flow(self, flow):
return self._view_nextprev_flow("next", flow)
return self._view_nextprev_flow(self.view.index(flow) + 1, flow)
def view_prev_flow(self, flow):
return self._view_nextprev_flow("prev", flow)
return self._view_nextprev_flow(self.view.index(flow) - 1, flow)
def change_this_display_mode(self, t):
self.state.add_flow_setting(
self.flow,
(self.tab_offset, "prettyview"),
contentviews.get_by_shortcut(t).name
)
name = contentviews.get_by_shortcut(t).name
self.view.settings[self.flow][(self.tab_offset, "prettyview")] = name
signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
@ -521,20 +510,18 @@ class FlowView(tabs.Tabs):
self.master.accept_all()
signals.flow_change.send(self, flow = self.flow)
elif key == "d":
if self.state.flow_count() == 1:
if self.flow.killable:
self.flow.kill(self.master)
self.view.remove(self.flow)
if not self.view.focus.flow:
self.master.view_flowlist()
elif self.state.view.index(self.flow) == len(self.state.view) - 1:
self.view_prev_flow(self.flow)
else:
self.view_next_flow(self.flow)
f = self.flow
if f.killable:
f.kill(self.master)
self.state.delete_flow(f)
self.view_flow(self.view.focus.flow)
elif key == "D":
f = self.master.state.duplicate_flow(self.flow)
signals.pop_view_state.send(self)
self.master.view_flow(f)
cp = self.flow.copy()
self.master.view.add(cp)
self.master.view.focus.flow = cp
self.view_flow(cp)
signals.status_message.send(message="Duplicated.")
elif key == "p":
self.view_prev_flow(self.flow)
@ -546,7 +533,7 @@ class FlowView(tabs.Tabs):
signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if self.flow.modified():
self.state.revert(self.flow)
self.flow.revert()
signals.flow_change.send(self, flow = self.flow)
signals.status_message.send(message="Reverted.")
else:
@ -608,14 +595,9 @@ class FlowView(tabs.Tabs):
else:
common.ask_save_body("s", self.flow)
elif key == "f":
signals.status_message.send(message="Loading all body data...")
self.state.add_flow_setting(
self.flow,
(self.tab_offset, "fullcontents"),
True
)
self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True
signals.flow_change.send(self, flow = self.flow)
signals.status_message.send(message="")
signals.status_message.send(message="Loading all body data...")
elif key == "m":
p = list(contentviews.view_prompts)
p.insert(0, ("Clear", "C"))

View File

@ -9,7 +9,6 @@ import subprocess
import sys
import tempfile
import traceback
import weakref
import urwid
from typing import Optional
@ -19,9 +18,8 @@ from mitmproxy import controller
from mitmproxy import exceptions
from mitmproxy import master
from mitmproxy import io
from mitmproxy import flowfilter
from mitmproxy import log
from mitmproxy.addons import state
from mitmproxy.addons import view
from mitmproxy.addons import intercept
import mitmproxy.options
from mitmproxy.tools.console import flowlist
@ -34,7 +32,6 @@ from mitmproxy.tools.console import palettes
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import statusbar
from mitmproxy.tools.console import window
from mitmproxy.flowfilter import FMarked
from mitmproxy.utils import strutils
from mitmproxy.net import tcp
@ -42,163 +39,6 @@ from mitmproxy.net import tcp
EVENTLOG_SIZE = 500
class ConsoleState(state.State):
def __init__(self):
state.State.__init__(self)
self.focus = None
self.follow_focus = None
self.flowsettings = weakref.WeakKeyDictionary()
self.last_search = None
self.last_filter = ""
self.mark_filter = False
def __setattr__(self, name, value):
self.__dict__[name] = value
signals.update_settings.send(self)
def add_flow_setting(self, flow, key, value):
d = self.flowsettings.setdefault(flow, {})
d[key] = value
def get_flow_setting(self, flow, key, default=None):
d = self.flowsettings.get(flow, {})
return d.get(key, default)
def add_flow(self, f):
super().add_flow(f)
signals.flowlist_change.send(self)
self.update_focus()
return f
def update_flow(self, f):
super().update_flow(f)
signals.flowlist_change.send(self)
self.update_focus()
return f
def set_view_filter(self, txt):
ret = super().set_view_filter(txt)
self.set_focus(self.focus)
return ret
def get_focus(self):
if not self.view or self.focus is None:
return None, None
return self.view[self.focus], self.focus
def set_focus(self, idx):
if self.view:
if idx is None or idx < 0:
idx = 0
elif idx >= len(self.view):
idx = len(self.view) - 1
self.focus = idx
else:
self.focus = None
def update_focus(self):
if self.focus is None:
self.set_focus(0)
elif self.follow_focus:
self.set_focus(len(self.view) - 1)
def set_focus_flow(self, f):
self.set_focus(self.view.index(f))
def get_from_pos(self, pos):
if len(self.view) <= pos or pos < 0:
return None, None
return self.view[pos], pos
def get_next(self, pos):
return self.get_from_pos(pos + 1)
def get_prev(self, pos):
return self.get_from_pos(pos - 1)
def delete_flow(self, f):
if f in self.view and self.view.index(f) <= self.focus:
self.focus -= 1
if self.focus < 0:
self.focus = None
ret = super().delete_flow(f)
self.set_focus(self.focus)
return ret
def get_nearest_matching_flow(self, flow, flt):
fidx = self.view.index(flow)
dist = 1
fprev = fnext = True
while fprev or fnext:
fprev, _ = self.get_from_pos(fidx - dist)
fnext, _ = self.get_from_pos(fidx + dist)
if fprev and flowfilter.match(flt, fprev):
return fprev
elif fnext and flowfilter.match(flt, fnext):
return fnext
dist += 1
return None
def enable_marked_filter(self):
marked_flows = [f for f in self.flows if f.marked]
if not marked_flows:
return
marked_filter = "~%s" % FMarked.code
# Save Focus
last_focus, _ = self.get_focus()
nearest_marked = self.get_nearest_matching_flow(last_focus, marked_filter)
self.last_filter = self.filter_txt
self.set_view_filter(marked_filter)
# Restore Focus
if last_focus.marked:
self.set_focus_flow(last_focus)
else:
self.set_focus_flow(nearest_marked)
self.mark_filter = True
def disable_marked_filter(self):
marked_filter = "~%s" % FMarked.code
# Save Focus
last_focus, _ = self.get_focus()
nearest_marked = self.get_nearest_matching_flow(last_focus, marked_filter)
self.set_view_filter(self.last_filter)
self.last_filter = ""
# Restore Focus
if last_focus.marked:
self.set_focus_flow(last_focus)
else:
self.set_focus_flow(nearest_marked)
self.mark_filter = False
def clear(self):
marked_flows = [f for f in self.view if f.marked]
super().clear()
for f in marked_flows:
self.add_flow(f)
f.marked = True
if len(self.flows.views) == 0:
self.focus = None
else:
self.focus = 0
self.set_focus(self.focus)
class Options(mitmproxy.options.Options):
def __init__(
self,
@ -210,6 +50,7 @@ class Options(mitmproxy.options.Options):
palette: Optional[str] = None,
palette_transparent: bool = False,
no_mouse: bool = False,
follow_focus: bool = False,
**kwargs
):
self.eventlog = eventlog
@ -219,6 +60,7 @@ class Options(mitmproxy.options.Options):
self.palette = palette
self.palette_transparent = palette_transparent
self.no_mouse = no_mouse
self.follow_focus = follow_focus
super().__init__(**kwargs)
@ -227,15 +69,12 @@ class ConsoleMaster(master.Master):
def __init__(self, options, server):
super().__init__(options, server)
self.state = ConsoleState()
self.view = view.View()
self.stream_path = None
# This line is just for type hinting
self.options = self.options # type: Options
self.options.errored.connect(self.options_error)
if options.filter:
self.set_view_filter(options.filter)
self.palette = options.palette
self.palette_transparent = options.palette_transparent
@ -250,7 +89,7 @@ class ConsoleMaster(master.Master):
signals.push_view_state.connect(self.sig_push_view_state)
signals.sig_add_log.connect(self.sig_add_log)
self.addons.add(*addons.default_addons())
self.addons.add(self.state, intercept.Intercept())
self.addons.add(intercept.Intercept(), self.view)
def __setattr__(self, name, value):
self.__dict__[name] = value
@ -432,13 +271,13 @@ class ConsoleMaster(master.Master):
if self.options.rfile:
ret = self.load_flows_path(self.options.rfile)
if ret and self.state.flow_count():
if ret and self.view.store_count():
signals.add_log(
"File truncated or corrupted. "
"Loaded as many flows as possible.",
"error"
)
elif ret and not self.state.flow_count():
elif ret and not self.view.store_count():
self.shutdown()
print("Could not load file: {}".format(ret), file=sys.stderr)
sys.exit(1)
@ -533,17 +372,12 @@ class ConsoleMaster(master.Master):
def view_flowlist(self):
if self.ui.started:
self.ui.clear()
if self.state.follow_focus:
self.state.set_focus(self.state.flow_count())
if self.options.eventlog:
body = flowlist.BodyPile(self)
else:
body = flowlist.FlowListBox(self)
if self.follow:
self.toggle_follow_flows()
signals.push_view_state.send(
self,
window = window.Window(
@ -556,12 +390,12 @@ class ConsoleMaster(master.Master):
)
def view_flow(self, flow, tab_offset=0):
self.state.set_focus_flow(flow)
self.view.focus.flow = flow
signals.push_view_state.send(
self,
window = window.Window(
self,
flowview.FlowView(self, self.state, flow, tab_offset),
flowview.FlowView(self, self.view, flow, tab_offset),
flowview.FlowViewHeader(self, flow),
statusbar.StatusBar(self, flowview.footer),
flowview.help_context
@ -585,7 +419,7 @@ class ConsoleMaster(master.Master):
return self._write_flows(path, [flow])
def save_flows(self, path):
return self._write_flows(path, self.state.view)
return self._write_flows(path, self.view)
def load_flows_callback(self, path):
if not path:
@ -602,14 +436,6 @@ class ConsoleMaster(master.Master):
signals.flowlist_change.send(self)
return reterr
def accept_all(self):
self.state.accept_all(self)
def set_view_filter(self, txt):
v = self.state.set_view_filter(txt)
signals.flowlist_change.send(self)
return v
def edit_scripts(self, scripts):
self.options.scripts = [x[0] for x in scripts]
@ -617,56 +443,14 @@ class ConsoleMaster(master.Master):
if a != "n":
raise urwid.ExitMainLoop
def shutdown(self):
self.state.killall(self)
master.Master.shutdown(self)
def clear_flows(self):
self.state.clear()
signals.flowlist_change.send(self)
def toggle_follow_flows(self):
# toggle flow follow
self.state.follow_focus = not self.state.follow_focus
# jump to most recent flow if follow is now on
if self.state.follow_focus:
self.state.set_focus(self.state.flow_count())
signals.flowlist_change.send(self)
def delete_flow(self, f):
self.state.delete_flow(f)
signals.flowlist_change.send(self)
def refresh_focus(self):
if self.state.view:
signals.flow_change.send(
self,
flow = self.state.view[self.state.focus]
)
def process_flow(self, f):
signals.flowlist_change.send(self)
signals.flow_change.send(self, flow=f)
if self.view.focus.flow:
signals.flow_change.send(self, flow = self.view.focus.flow)
def clear_events(self):
self.logbuffer[:] = []
# Handlers
@controller.handler
def error(self, f):
super().error(f)
self.process_flow(f)
@controller.handler
def request(self, f):
super().request(f)
self.process_flow(f)
@controller.handler
def response(self, f):
super().response(f)
self.process_flow(f)
@controller.handler
def tcp_message(self, f):
super().tcp_message(f)

View File

@ -172,10 +172,8 @@ class Options(urwid.WidgetWrap):
showhost = False,
stickyauth = None,
stickycookie = None,
default_contentview = "auto",
)
self.master.state.default_body_view = contentviews.get("Auto")
signals.update_settings.send(self)
signals.status_message.send(
message = "All select.Options cleared",

View File

@ -16,13 +16,14 @@ class Highlight(urwid.AttrMap):
class Searchable(urwid.ListBox):
def __init__(self, state, contents):
def __init__(self, view, contents):
self.walker = urwid.SimpleFocusListWalker(contents)
urwid.ListBox.__init__(self, self.walker)
self.state = state
self.view = view
self.search_offset = 0
self.current_highlight = None
self.search_term = None
self.last_search = None
def keypress(self, size, key):
if key == "/":
@ -45,7 +46,7 @@ class Searchable(urwid.ListBox):
return super().keypress(size, key)
def set_search(self, text):
self.state.last_search = text
self.last_search = text
self.search_term = text or None
self.find_next(False)
@ -69,8 +70,8 @@ class Searchable(urwid.ListBox):
def find_next(self, backwards):
if not self.search_term:
if self.state.last_search:
self.search_term = self.state.last_search
if self.last_search:
self.search_term = self.last_search
else:
self.set_highlight(None)
return

View File

@ -164,10 +164,10 @@ class StatusBar(urwid.WidgetWrap):
r.append("[")
r.append(("heading_key", "i"))
r.append(":%s]" % self.master.options.intercept)
if self.master.state.filter_txt:
if self.master.options.filter:
r.append("[")
r.append(("heading_key", "f"))
r.append(":%s]" % self.master.state.filter_txt)
r.append(":%s]" % self.master.options.filter)
if self.master.options.stickycookie:
r.append("[")
r.append(("heading_key", "t"))
@ -194,7 +194,7 @@ class StatusBar(urwid.WidgetWrap):
opts.append("killextra")
if self.master.options.no_upstream_cert:
opts.append("no-upstream-cert")
if self.master.state.follow_focus:
if self.master.options.follow_focus:
opts.append("following")
if self.master.options.stream_large_bodies:
opts.append(
@ -224,11 +224,11 @@ class StatusBar(urwid.WidgetWrap):
return r
def redraw(self):
fc = self.master.state.flow_count()
if self.master.state.focus is None:
fc = len(self.master.view)
if self.master.view.focus.flow is None:
offset = 0
else:
offset = min(self.master.state.focus + 1, fc)
offset = self.master.view.focus.index + 1
t = [
('heading', ("[%s/%s]" % (offset, fc)).ljust(9))
]

View File

@ -1,6 +1,8 @@
from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy.test import taddons
from .. import tutils
@ -31,6 +33,10 @@ def test_simple():
assert list(v) == [f, f3, f2]
assert len(v._store) == 3
v.clear()
assert len(v) == 0
assert len(v._store) == 0
def tft(*, method="get", start=0):
f = tflow.tflow()
@ -52,6 +58,14 @@ def test_filter():
assert len(v._store) == 4
v.set_filter(None)
assert len(v) == 4
v[1].marked = True
v.toggle_marked()
assert len(v) == 1
assert v[0].marked
v.toggle_marked()
assert len(v) == 4
def test_order():
v = view.View()
@ -236,31 +250,6 @@ def test_focus():
assert f.index is None
def test_focus_nextprev():
v = view.View()
# Nops on an empty view
v.focus.next()
v.focus.prev()
# Nops on a single-flow view
v.add(tft(start=0))
assert v.focus.flow == v[0]
v.focus.next()
assert v.focus.flow == v[0]
v.focus.prev()
assert v.focus.flow == v[0]
v.add(tft(start=1))
v.focus.next()
assert v.focus.flow == v[1]
v.focus.next()
assert v.focus.flow == v[1]
v.focus.prev()
assert v.focus.flow == v[0]
v.focus.prev()
assert v.focus.flow == v[0]
def test_settings():
v = view.View()
f = tft()
@ -274,3 +263,16 @@ def test_settings():
v.remove(f)
tutils.raises(KeyError, v.settings.__getitem__, f)
assert not v.settings.keys()
class Options(options.Options):
def __init__(self, *, filter=None, **kwargs):
self.filter = filter
super().__init__(**kwargs)
def test_configure():
v = view.View()
with taddons.context(options=Options()) as tctx:
tctx.configure(v, filter="~q")
tutils.raises("invalid interception filter", tctx.configure, v, filter="~~")

View File

@ -1,5 +1,3 @@
import gc
from mitmproxy.test import tflow
import mitmproxy.test.tutils
from mitmproxy.tools import console
@ -9,93 +7,6 @@ from mitmproxy.tools.console import common
from .. import mastertest
class TestConsoleState:
def test_flow(self):
"""
normal flow:
connect -> request -> response
"""
c = console.master.ConsoleState()
f = self._add_request(c)
assert f in c.flows
assert c.get_focus() == (f, 0)
def test_focus(self):
"""
normal flow:
connect -> request -> response
"""
c = console.master.ConsoleState()
f = self._add_request(c)
assert c.get_focus() == (f, 0)
assert c.get_from_pos(0) == (f, 0)
assert c.get_from_pos(1) == (None, None)
assert c.get_next(0) == (None, None)
f2 = self._add_request(c)
assert c.get_focus() == (f, 0)
assert c.get_next(0) == (f2, 1)
assert c.get_prev(1) == (f, 0)
assert c.get_next(1) == (None, None)
c.set_focus(0)
assert c.get_focus() == (f, 0)
c.set_focus(-1)
assert c.get_focus() == (f, 0)
c.set_focus(2)
assert c.get_focus() == (f2, 1)
c.delete_flow(f2)
assert c.get_focus() == (f, 0)
c.delete_flow(f)
assert c.get_focus() == (None, None)
def _add_request(self, state):
f = tflow.tflow()
return state.add_flow(f)
def _add_response(self, state):
f = self._add_request(state)
f.response = mitmproxy.test.tutils.tresp()
state.update_flow(f)
def test_add_response(self):
c = console.master.ConsoleState()
f = self._add_request(c)
f.response = mitmproxy.test.tutils.tresp()
c.focus = None
c.update_flow(f)
def test_focus_view(self):
c = console.master.ConsoleState()
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
assert not c.set_view_filter("~s")
assert len(c.view) == 3
assert c.focus == 0
def test_settings(self):
c = console.master.ConsoleState()
f = self._add_request(c)
c.add_flow_setting(f, "foo", "bar")
assert c.get_flow_setting(f, "foo") == "bar"
assert c.get_flow_setting(f, "oink") is None
assert c.get_flow_setting(f, "oink", "foo") == "foo"
assert len(c.flowsettings) == 1
c.delete_flow(f)
del f
gc.collect()
assert len(c.flowsettings) == 0
def test_format_keyvals():
assert common.format_keyvals(
[
@ -123,17 +34,17 @@ class TestMaster(mastertest.MasterTest):
m = self.mkmaster()
for i in (1, 2, 3):
self.dummy_cycle(m, 1, b"")
assert len(m.state.flows) == i
assert len(m.view) == i
def test_intercept(self):
"""regression test for https://github.com/mitmproxy/mitmproxy/issues/1605"""
m = self.mkmaster(intercept="~b bar")
f = tflow.tflow(req=mitmproxy.test.tutils.treq(content=b"foo"))
m.request(f)
assert not m.state.flows[0].intercepted
assert not m.view[0].intercepted
f = tflow.tflow(req=mitmproxy.test.tutils.treq(content=b"bar"))
m.request(f)
assert m.state.flows[1].intercepted
assert m.view[1].intercepted
f = tflow.tflow(resp=mitmproxy.test.tutils.tresp(content=b"bar"))
m.request(f)
assert m.state.flows[2].intercepted
assert m.view[2].intercepted