mitmproxy/libmproxy/console/flowlist.py

334 lines
11 KiB
Python
Raw Normal View History

from __future__ import absolute_import
import urwid
from netlib import http
from . import common
2015-03-13 11:29:21 +00:00
def _mkhelp():
text = []
keys = [
("A", "accept all intercepted flows"),
("a", "accept this intercepted flow"),
2015-01-16 16:08:25 +00:00
("b", "save request/response body"),
("C", "clear flow list or eventlog"),
("d", "delete flow"),
("D", "duplicate flow"),
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
2015-02-06 23:33:29 +00:00
("g", "copy flow to clipboard"),
("l", "set limit filter pattern"),
("L", "load saved flows"),
2015-02-12 03:31:05 +00:00
("n", "create a new request"),
("r", "replay request"),
("V", "revert changes to request"),
("w", "save flows "),
("W", "stream flows to file"),
("X", "kill and delete flow, even if it's mid-intercept"),
("tab", "tab between eventlog and flow list"),
("enter", "view flow"),
("|", "run script on this flow"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
help_context = _mkhelp()
footer = [
('heading_key', "?"), ":help ",
]
2015-03-13 11:29:21 +00:00
2012-02-08 01:07:17 +00:00
class EventListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
urwid.ListBox.__init__(self, master.eventlist)
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "C":
self.master.clear_events()
key = None
return urwid.ListBox.keypress(self, size, key)
class BodyPile(urwid.Pile):
def __init__(self, master):
h = urwid.Text("Event log")
h = urwid.Padding(h, align="left", width=("relative", 100))
2012-02-18 05:48:08 +00:00
self.inactive_header = urwid.AttrWrap(h, "heading_inactive")
2012-02-08 01:07:17 +00:00
self.active_header = urwid.AttrWrap(h, "heading")
urwid.Pile.__init__(
self,
[
FlowListBox(master),
2015-03-13 11:29:21 +00:00
urwid.Frame(
EventListBox(master),
header = self.inactive_header
)
2012-02-08 01:07:17 +00:00
]
)
self.master = master
def keypress(self, size, key):
if key == "tab":
self.focus_position = (self.focus_position + 1)%len(self.widget_list)
if self.focus_position == 1:
2012-02-08 01:07:17 +00:00
self.widget_list[1].header = self.active_header
else:
self.widget_list[1].header = self.inactive_header
key = None
elif key == "e":
2012-02-08 01:07:17 +00:00
self.master.toggle_eventlog()
key = None
# This is essentially a copypasta from urwid.Pile's keypress handler.
# So much for "closed for modification, but open for extension".
item_rows = None
2015-03-13 11:29:21 +00:00
if len(size) == 2:
item_rows = self.get_item_rows(size, focus = True)
2012-02-08 01:07:17 +00:00
i = self.widget_list.index(self.focus_item)
2015-03-13 11:29:21 +00:00
tsize = self.get_item_size(size, i, True, item_rows)
return self.focus_item.keypress(tsize, key)
2012-02-08 01:07:17 +00:00
class ConnectionItem(urwid.WidgetWrap):
def __init__(self, master, state, flow, focus):
self.master, self.state, self.flow = master, state, flow
2012-10-28 20:30:59 +00:00
self.f = focus
w = self.get_text()
urwid.WidgetWrap.__init__(self, w)
def get_text(self):
2015-03-13 11:29:21 +00:00
return common.format_flow(
self.flow,
self.f,
hostheader = self.master.showhost
)
def selectable(self):
return True
def save_flows_prompt(self, k):
if k == "a":
self.master.path_prompt(
"Save all flows to: ",
self.state.last_saveload,
self.master.save_flows
)
else:
self.master.path_prompt(
"Save this flow to: ",
self.state.last_saveload,
self.master.save_one_flow,
self.flow
)
2012-08-17 12:13:04 +00:00
def stop_server_playback_prompt(self, a):
if a != "n":
self.master.stop_server_playback()
def server_replay_prompt(self, k):
if k == "a":
self.master.start_server_playback(
[i.copy() for i in self.master.state.view],
self.master.killextra, self.master.rheaders,
2014-11-06 10:25:03 +00:00
False, self.master.nopop,
2015-03-13 11:29:21 +00:00
self.master.options.replay_ignore_params,
self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
2012-08-17 12:13:04 +00:00
)
elif k == "t":
self.master.start_server_playback(
[self.flow.copy()],
self.master.killextra, self.master.rheaders,
2014-11-06 10:25:03 +00:00
False, self.master.nopop,
2015-03-13 11:29:21 +00:00
self.master.options.replay_ignore_params,
self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
2012-08-17 12:13:04 +00:00
)
else:
self.master.path_prompt(
"Server replay path: ",
2012-08-17 12:13:04 +00:00
self.state.last_saveload,
self.master.server_playback_path
)
def keypress(self, (maxcol,), key):
key = common.shortcuts(key)
if key == "a":
2014-12-23 19:33:42 +00:00
self.flow.accept_intercept(self.master)
self.master.sync_list_view()
elif key == "d":
self.flow.kill(self.master)
self.state.delete_flow(self.flow)
self.master.sync_list_view()
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
self.master.statusbar.message(r)
self.master.sync_list_view()
2012-08-17 12:13:04 +00:00
elif key == "S":
if not self.master.server_playback:
self.master.prompt_onekey(
"Server Replay",
(
("all flows", "a"),
("this flow", "t"),
("file", "f"),
),
self.server_replay_prompt,
)
else:
self.master.prompt_onekey(
"Stop current server replay?",
(
("yes", "y"),
("no", "n"),
),
self.stop_server_playback_prompt,
)
elif key == "V":
if not self.flow.modified():
self.master.statusbar.message("Flow not modified.")
return
self.state.revert(self.flow)
self.master.sync_list_view()
self.master.statusbar.message("Reverted.")
elif key == "w":
self.master.prompt_onekey(
"Save",
(
("all flows", "a"),
("this flow", "t"),
),
self.save_flows_prompt,
)
elif key == "X":
self.flow.kill(self.master)
elif key == "enter":
if self.flow.request:
self.master.view_flow(self.flow)
elif key == "|":
self.master.path_prompt(
"Send flow to script: ",
self.state.last_script,
self.master.run_script_once,
self.flow
)
elif key == "g":
2015-02-06 23:33:29 +00:00
common.ask_copy_part("a", self.flow, self.master, self.state)
2015-01-16 16:08:25 +00:00
elif key == "b":
2015-02-06 22:32:22 +00:00
common.ask_save_body(None, self.master, self.state, self.flow)
else:
return key
class FlowListWalker(urwid.ListWalker):
def __init__(self, master, state):
self.master, self.state = master, state
if self.state.flow_count():
self.set_focus(0)
def get_focus(self):
f, i = self.state.get_focus()
f = ConnectionItem(self.master, self.state, f, True) if f else None
return f, i
def set_focus(self, focus):
ret = self.state.set_focus(focus)
return ret
def get_next(self, pos):
f, i = self.state.get_next(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
def get_prev(self, pos):
f, i = self.state.get_prev(pos)
f = ConnectionItem(self.master, self.state, f, False) if f else None
return f, i
class FlowListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
urwid.ListBox.__init__(self, master.flow_list_walker)
def get_method_raw(self, k):
if k:
2015-03-13 11:29:21 +00:00
self.get_url(k)
def get_method(self, k):
if k == "e":
self.master.prompt("Method:", "", self.get_method_raw)
else:
method = ""
for i in common.METHOD_OPTIONS:
if i[1] == k:
method = i[0].upper()
self.get_url(method)
2015-03-13 11:29:21 +00:00
def get_url(self, method):
self.master.prompt(
"URL:",
"http://www.example.com/",
self.new_request,
method
)
def new_request(self, url, method):
2015-02-12 04:33:35 +00:00
parts = http.parse_url(str(url))
if not parts:
self.master.statusbar.message("Invalid Url")
2015-02-12 04:33:35 +00:00
return
scheme, host, port, path = parts
f = self.master.create_request(method, scheme, host, port, path)
self.master.view_flow(f)
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "A":
self.master.accept_all()
self.master.sync_list_view()
elif key == "C":
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
elif key == "l":
2015-03-13 11:29:21 +00:00
self.master.prompt(
"Limit: ",
self.master.state.limit_txt,
self.master.set_limit
)
elif key == "L":
self.master.path_prompt(
"Load flows: ",
self.master.state.last_saveload,
self.master.load_flows_callback
)
elif key == "n":
2015-03-13 11:29:21 +00:00
self.master.prompt_onekey(
"Method",
common.METHOD_OPTIONS,
self.get_method
)
elif key == "F":
self.master.toggle_follow_flows()
elif key == "W":
if self.master.stream:
self.master.stop_stream()
else:
self.master.path_prompt(
"Stream flows to: ",
self.master.state.last_saveload,
2014-12-29 13:40:34 +00:00
self.master.start_stream_to_path
)
else:
return urwid.ListBox.keypress(self, size, key)