mitmproxy/libmproxy/console/flowlist.py

353 lines
12 KiB
Python
Raw Normal View History

from __future__ import absolute_import
import urwid
from netlib import http
from . import common, signals
2015-03-13 11:29:21 +00:00
def _mkhelp():
text = []
keys = [
("A", "accept all intercepted flows"),
("a", "accept this intercepted flow"),
2015-01-16 16:08:25 +00:00
("b", "save request/response body"),
("C", "clear flow list or eventlog"),
("d", "delete flow"),
("D", "duplicate flow"),
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
("l", "set limit filter pattern"),
("L", "load saved flows"),
2015-02-12 03:31:05 +00:00
("n", "create a new request"),
("P", "copy flow to clipboard"),
("r", "replay request"),
("V", "revert changes to request"),
("w", "save flows "),
("W", "stream flows to file"),
("X", "kill and delete flow, even if it's mid-intercept"),
("tab", "tab between eventlog and flow list"),
("enter", "view flow"),
("|", "run script on this flow"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
help_context = _mkhelp()
footer = [
('heading_key', "?"), ":help ",
]
2015-03-13 11:29:21 +00:00
2012-02-08 01:07:17 +00:00
class EventListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
urwid.ListBox.__init__(self, master.eventlist)
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "C":
self.master.clear_events()
key = None
elif key == "G":
self.set_focus(0)
elif key == "g":
self.set_focus(len(self.master.eventlist)-1)
2012-02-08 01:07:17 +00:00
return urwid.ListBox.keypress(self, size, key)
class BodyPile(urwid.Pile):
def __init__(self, master):
h = urwid.Text("Event log")
h = urwid.Padding(h, align="left", width=("relative", 100))
2012-02-18 05:48:08 +00:00
self.inactive_header = urwid.AttrWrap(h, "heading_inactive")
2012-02-08 01:07:17 +00:00
self.active_header = urwid.AttrWrap(h, "heading")
urwid.Pile.__init__(
self,
[
FlowListBox(master),
2015-03-13 11:29:21 +00:00
urwid.Frame(
EventListBox(master),
header = self.inactive_header
)
2012-02-08 01:07:17 +00:00
]
)
self.master = master
def keypress(self, size, key):
if key == "tab":
self.focus_position = (self.focus_position + 1)%len(self.widget_list)
if self.focus_position == 1:
2012-02-08 01:07:17 +00:00
self.widget_list[1].header = self.active_header
else:
self.widget_list[1].header = self.inactive_header
key = None
elif key == "e":
2012-02-08 01:07:17 +00:00
self.master.toggle_eventlog()
key = None
# This is essentially a copypasta from urwid.Pile's keypress handler.
# So much for "closed for modification, but open for extension".
item_rows = None
2015-03-13 11:29:21 +00:00
if len(size) == 2:
item_rows = self.get_item_rows(size, focus = True)
2012-02-08 01:07:17 +00:00
i = self.widget_list.index(self.focus_item)
2015-03-13 11:29:21 +00:00
tsize = self.get_item_size(size, i, True, item_rows)
return self.focus_item.keypress(tsize, key)
2012-02-08 01:07:17 +00:00
class ConnectionItem(urwid.WidgetWrap):
def __init__(self, master, state, flow, focus):
self.master, self.state, self.flow = master, state, flow
2012-10-28 20:30:59 +00:00
self.f = focus
w = self.get_text()
urwid.WidgetWrap.__init__(self, w)
def get_text(self):
2015-03-13 11:29:21 +00:00
return common.format_flow(
self.flow,
self.f,
hostheader = self.master.showhost
)
def selectable(self):
return True
def save_flows_prompt(self, k):
if k == "a":
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
prompt = "Save all flows to",
callback = self.master.save_flows
)
else:
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
prompt = "Save this flow to",
callback = self.master.save_one_flow,
args = (self.flow,)
)
2012-08-17 12:13:04 +00:00
def stop_server_playback_prompt(self, a):
if a != "n":
self.master.stop_server_playback()
def server_replay_prompt(self, k):
if k == "a":
self.master.start_server_playback(
[i.copy() for i in self.master.state.view],
self.master.killextra, self.master.rheaders,
2014-11-06 10:25:03 +00:00
False, self.master.nopop,
2015-03-13 11:29:21 +00:00
self.master.options.replay_ignore_params,
self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
2012-08-17 12:13:04 +00:00
)
elif k == "t":
self.master.start_server_playback(
[self.flow.copy()],
self.master.killextra, self.master.rheaders,
2014-11-06 10:25:03 +00:00
False, self.master.nopop,
2015-03-13 11:29:21 +00:00
self.master.options.replay_ignore_params,
self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
2012-08-17 12:13:04 +00:00
)
else:
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
prompt = "Server replay path",
callback = self.master.server_playback_path
2012-08-17 12:13:04 +00:00
)
def keypress(self, (maxcol,), key):
key = common.shortcuts(key)
if key == "a":
2014-12-23 19:33:42 +00:00
self.flow.accept_intercept(self.master)
signals.flowlist_change.send(self)
elif key == "d":
self.flow.kill(self.master)
self.state.delete_flow(self.flow)
signals.flowlist_change.send(self)
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
signals.status_message.send(message=r)
signals.flowlist_change.send(self)
2012-08-17 12:13:04 +00:00
elif key == "S":
if not self.master.server_playback:
signals.status_prompt_onekey.send(
prompt = "Server Replay",
keys = (
2012-08-17 12:13:04 +00:00
("all flows", "a"),
("this flow", "t"),
("file", "f"),
),
callback = self.server_replay_prompt,
2012-08-17 12:13:04 +00:00
)
else:
signals.status_prompt_onekey.send(
prompt = "Stop current server replay?",
keys = (
2012-08-17 12:13:04 +00:00
("yes", "y"),
("no", "n"),
),
callback = self.stop_server_playback_prompt,
2012-08-17 12:13:04 +00:00
)
elif key == "V":
if not self.flow.modified():
signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
signals.flowlist_change.send(self)
signals.status_message.send(message="Reverted.")
elif key == "w":
signals.status_prompt_onekey.send(
self,
prompt = "Save",
keys = (
("all flows", "a"),
("this flow", "t"),
),
callback = self.save_flows_prompt,
)
elif key == "X":
self.flow.kill(self.master)
elif key == "enter":
if self.flow.request:
self.master.view_flow(self.flow)
elif key == "|":
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
prompt = "Send flow to script",
callback = self.master.run_script_once,
args = (self.flow,)
)
elif key == "P":
2015-02-06 23:33:29 +00:00
common.ask_copy_part("a", self.flow, self.master, self.state)
2015-01-16 16:08:25 +00:00
elif key == "b":
2015-02-06 22:32:22 +00:00
common.ask_save_body(None, self.master, self.state, self.flow)
else:
return key
class FlowListWalker(urwid.ListWalker):
def __init__(self, master, state):
self.master, self.state = master, state
if self.state.flow_count():
self.set_focus(0)
signals.flowlist_change.connect(self.sig_flowlist_change)
def sig_flowlist_change(self, sender):
self._modified()
def get_focus(self):
f, i = self.state.get_focus()
f = ConnectionItem(self.master, self.state, f, True) if f else None
return f, i
def set_focus(self, focus):
ret = self.state.set_focus(focus)
return ret
def get_next(self, pos):
f, i = self.state.get_next(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
def get_prev(self, pos):
f, i = self.state.get_prev(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
class FlowListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
urwid.ListBox.__init__(
self,
FlowListWalker(master, master.state)
)
def get_method_raw(self, k):
if k:
2015-03-13 11:29:21 +00:00
self.get_url(k)
def get_method(self, k):
if k == "e":
signals.status_prompt.send(
self,
2015-03-22 00:59:34 +00:00
prompt = "Method",
text = "",
callback = self.get_method_raw
)
else:
method = ""
for i in common.METHOD_OPTIONS:
if i[1] == k:
method = i[0].upper()
self.get_url(method)
2015-03-13 11:29:21 +00:00
def get_url(self, method):
signals.status_prompt.send(
2015-03-22 00:59:34 +00:00
prompt = "URL",
text = "http://www.example.com/",
callback = self.new_request,
args = (method,)
2015-03-13 11:29:21 +00:00
)
def new_request(self, url, method):
2015-02-12 04:33:35 +00:00
parts = http.parse_url(str(url))
if not parts:
signals.status_message.send(message="Invalid Url")
2015-02-12 04:33:35 +00:00
return
scheme, host, port, path = parts
f = self.master.create_request(method, scheme, host, port, path)
self.master.view_flow(f)
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "A":
self.master.accept_all()
signals.flowlist_change.send(self)
elif key == "C":
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
elif key == "G":
self.master.state.set_focus(0)
signals.flowlist_change.send(self)
elif key == "g":
self.master.state.set_focus(self.master.state.flow_count())
signals.flowlist_change.send(self)
elif key == "l":
signals.status_prompt.send(
2015-03-22 00:59:34 +00:00
prompt = "Limit",
text = self.master.state.limit_txt,
callback = self.master.set_limit
2015-03-13 11:29:21 +00:00
)
elif key == "L":
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
self,
2015-03-22 00:59:34 +00:00
prompt = "Load flows",
callback = self.master.load_flows_callback
)
elif key == "n":
signals.status_prompt_onekey.send(
prompt = "Method",
keys = common.METHOD_OPTIONS,
callback = self.get_method
2015-03-13 11:29:21 +00:00
)
elif key == "F":
self.master.toggle_follow_flows()
elif key == "W":
if self.master.stream:
self.master.stop_stream()
else:
2015-03-22 00:59:34 +00:00
signals.status_prompt_path.send(
self,
2015-03-22 00:59:34 +00:00
prompt = "Stream flows to",
callback = self.master.start_stream_to_path
)
else:
return urwid.ListBox.keypress(self, size, key)