mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-24 00:31:33 +00:00
bf1399fa2b
This includes all key/value formatted views, e.g. the image view. We need to support these ultimately, but no time before the next release.
781 lines
27 KiB
Python
781 lines
27 KiB
Python
import os, sys, copy
|
|
import urwid
|
|
import common, grideditor, contentview
|
|
from .. import utils, flow, controller
|
|
|
|
|
|
class SearchError(Exception): pass
|
|
|
|
|
|
def _mkhelp():
|
|
text = []
|
|
keys = [
|
|
("A", "accept all intercepted flows"),
|
|
("a", "accept this intercepted flow"),
|
|
("b", "save request/response body"),
|
|
("d", "delete flow"),
|
|
("D", "duplicate flow"),
|
|
("e", "edit request/response"),
|
|
("f", "load full body data"),
|
|
("m", "change body display mode for this entity"),
|
|
(None,
|
|
common.highlight_key("automatic", "a") +
|
|
[("text", ": automatic detection")]
|
|
),
|
|
(None,
|
|
common.highlight_key("hex", "e") +
|
|
[("text", ": Hex")]
|
|
),
|
|
(None,
|
|
common.highlight_key("html", "h") +
|
|
[("text", ": HTML")]
|
|
),
|
|
(None,
|
|
common.highlight_key("image", "i") +
|
|
[("text", ": Image")]
|
|
),
|
|
(None,
|
|
common.highlight_key("javascript", "j") +
|
|
[("text", ": JavaScript")]
|
|
),
|
|
(None,
|
|
common.highlight_key("json", "s") +
|
|
[("text", ": JSON")]
|
|
),
|
|
(None,
|
|
common.highlight_key("urlencoded", "u") +
|
|
[("text", ": URL-encoded data")]
|
|
),
|
|
(None,
|
|
common.highlight_key("raw", "r") +
|
|
[("text", ": raw data")]
|
|
),
|
|
(None,
|
|
common.highlight_key("xml", "x") +
|
|
[("text", ": XML")]
|
|
),
|
|
("M", "change default body display mode"),
|
|
("p", "previous flow"),
|
|
("r", "replay request"),
|
|
("V", "revert changes to request"),
|
|
("v", "view body in external viewer"),
|
|
("w", "save all flows matching current limit"),
|
|
("W", "save this flow"),
|
|
("x", "delete body"),
|
|
("X", "view flow details"),
|
|
("z", "encode/decode a request/response"),
|
|
("tab", "toggle request/response view"),
|
|
("space", "next flow"),
|
|
("|", "run script on this flow"),
|
|
("/", "search in response body (case sensitive)"),
|
|
("n", "repeat previous search"),
|
|
]
|
|
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
|
|
return text
|
|
help_context = _mkhelp()
|
|
|
|
footer = [
|
|
('heading_key', "?"), ":help ",
|
|
('heading_key', "q"), ":back ",
|
|
]
|
|
|
|
|
|
class FlowViewHeader(common.WWrap):
|
|
def __init__(self, master, f):
|
|
self.master, self.flow = master, f
|
|
self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
|
|
|
|
def refresh_flow(self, f):
|
|
if f == self.flow:
|
|
self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
|
|
|
|
|
|
class CallbackCache:
|
|
@utils.LRUCache(200)
|
|
def _callback(self, method, *args, **kwargs):
|
|
return getattr(self.obj, method)(*args, **kwargs)
|
|
|
|
def callback(self, obj, method, *args, **kwargs):
|
|
# obj varies!
|
|
self.obj = obj
|
|
return self._callback(method, *args, **kwargs)
|
|
cache = CallbackCache()
|
|
|
|
|
|
class FlowView(common.WWrap):
|
|
REQ = 0
|
|
RESP = 1
|
|
method_options = [
|
|
("get", "g"),
|
|
("post", "p"),
|
|
("put", "u"),
|
|
("head", "h"),
|
|
("trace", "t"),
|
|
("delete", "d"),
|
|
("options", "o"),
|
|
("edit raw", "e"),
|
|
]
|
|
|
|
highlight_color = "focusfield"
|
|
|
|
def __init__(self, master, state, flow):
|
|
self.master, self.state, self.flow = master, state, flow
|
|
self.last_displayed_body = None
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
|
|
self.view_response()
|
|
else:
|
|
self.view_request()
|
|
|
|
def _cached_content_view(self, viewmode, hdrItems, content, limit):
|
|
return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event)
|
|
|
|
def content_view(self, viewmode, conn):
|
|
full = self.state.get_flow_setting(
|
|
self.flow,
|
|
(self.state.view_flow_mode, "fullcontents"),
|
|
False
|
|
)
|
|
if full:
|
|
limit = sys.maxint
|
|
else:
|
|
limit = contentview.VIEW_CUTOFF
|
|
description, text_objects = cache.callback(
|
|
self, "_cached_content_view",
|
|
viewmode,
|
|
tuple(tuple(i) for i in conn.headers.lst),
|
|
conn.content,
|
|
limit
|
|
)
|
|
return (description, text_objects)
|
|
|
|
def cont_view_handle_missing(self, conn, viewmode):
|
|
if conn.content == flow.CONTENT_MISSING:
|
|
msg, body = "", [urwid.Text([("error", "[content missing]")])], 0
|
|
else:
|
|
msg, body = self.content_view(viewmode, conn)
|
|
|
|
return (msg, body)
|
|
|
|
def viewmode_get(self, override):
|
|
return self.state.default_body_view if override is None else override
|
|
|
|
def override_get(self):
|
|
return self.state.get_flow_setting(self.flow,
|
|
(self.state.view_flow_mode, "prettyview"))
|
|
|
|
def conn_text_raw(self, conn):
|
|
"""
|
|
Based on a request/response, conn, returns the elements for
|
|
display.
|
|
"""
|
|
headers = common.format_keyvals(
|
|
[(h+":", v) for (h, v) in conn.headers.lst],
|
|
key = "header",
|
|
val = "text"
|
|
)
|
|
if conn.content is not None:
|
|
override = self.override_get()
|
|
viewmode = self.viewmode_get(override)
|
|
msg, body = self.cont_view_handle_missing(conn, viewmode)
|
|
elif conn.content == flow.CONTENT_MISSING:
|
|
pass
|
|
return headers, msg, body
|
|
|
|
def conn_text_merge(self, headers, msg, body):
|
|
"""
|
|
Grabs what is returned by conn_text_raw and merges them all
|
|
toghether, mainly used by conn_text and search
|
|
"""
|
|
override = self.override_get()
|
|
viewmode = self.viewmode_get(override)
|
|
|
|
cols = [urwid.Text(
|
|
[
|
|
("heading", msg),
|
|
]
|
|
)
|
|
]
|
|
|
|
if override is not None:
|
|
cols.append(urwid.Text([
|
|
" ",
|
|
('heading', "["),
|
|
('heading_key', "m"),
|
|
('heading', (":%s]"%viewmode.name)),
|
|
],
|
|
align="right"
|
|
)
|
|
)
|
|
|
|
title = urwid.AttrWrap(urwid.Columns(cols), "heading")
|
|
headers.append(title)
|
|
headers.extend(body)
|
|
|
|
return headers
|
|
|
|
def conn_text(self, conn):
|
|
"""
|
|
Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
|
|
"""
|
|
headers, msg, body = self.conn_text_raw(conn)
|
|
merged = self.conn_text_merge(headers, msg, body)
|
|
return urwid.ListBox(merged)
|
|
|
|
def _tab(self, content, attr):
|
|
p = urwid.Text(content)
|
|
p = urwid.Padding(p, align="left", width=("relative", 100))
|
|
p = urwid.AttrWrap(p, attr)
|
|
return p
|
|
|
|
def wrap_body(self, active, body):
|
|
parts = []
|
|
|
|
if self.flow.intercepting and not self.flow.request.reply.acked:
|
|
qt = "Request intercepted"
|
|
else:
|
|
qt = "Request"
|
|
if active == common.VIEW_FLOW_REQUEST:
|
|
parts.append(self._tab(qt, "heading"))
|
|
else:
|
|
parts.append(self._tab(qt, "heading_inactive"))
|
|
|
|
if self.flow.intercepting and self.flow.response and not self.flow.response.reply.acked:
|
|
st = "Response intercepted"
|
|
else:
|
|
st = "Response"
|
|
if active == common.VIEW_FLOW_RESPONSE:
|
|
parts.append(self._tab(st, "heading"))
|
|
else:
|
|
parts.append(self._tab(st, "heading_inactive"))
|
|
|
|
h = urwid.Columns(parts)
|
|
f = urwid.Frame(
|
|
body,
|
|
header=h
|
|
)
|
|
return f
|
|
|
|
def search_wrapped_around(self, last_find_line, last_search_index):
|
|
"""
|
|
returns true if search wrapped around the bottom.
|
|
"""
|
|
|
|
current_find_line = self.state.get_flow_setting(self.flow,
|
|
"last_find_line")
|
|
current_search_index = self.state.get_flow_setting(self.flow,
|
|
"last_search_index")
|
|
|
|
if current_find_line <= last_find_line:
|
|
return True
|
|
elif current_find_line == last_find_line:
|
|
if current_search_index <= last_search_index:
|
|
return True
|
|
|
|
return False
|
|
|
|
def search(self, search_string):
|
|
"""
|
|
similar to view_response or view_request, but instead of just
|
|
displaying the conn, it highlights a word that the user is
|
|
searching for and handles all the logic surrounding that.
|
|
"""
|
|
|
|
if search_string == "":
|
|
search_string = self.state.get_flow_setting(self.flow,
|
|
"last_search_string")
|
|
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
text = self.flow.request
|
|
const = common.VIEW_FLOW_REQUEST
|
|
else:
|
|
text = self.flow.response
|
|
const = common.VIEW_FLOW_RESPONSE
|
|
if not self.flow.response:
|
|
return "no response to search in"
|
|
|
|
last_find_line = self.state.get_flow_setting(self.flow,
|
|
"last_find_line")
|
|
last_search_index = self.state.get_flow_setting(self.flow,
|
|
"last_search_index")
|
|
|
|
# generate the body, highlight the words and get focus
|
|
headers, msg, body = self.conn_text_raw(text)
|
|
try:
|
|
body, focus_position = self.search_highlight_text(body, search_string)
|
|
except SearchError:
|
|
return "Search not supported in this view."
|
|
|
|
if focus_position == None:
|
|
# no results found.
|
|
return "no matches for '%s'" % search_string
|
|
|
|
# UI stuff.
|
|
merged = self.conn_text_merge(headers, msg, body)
|
|
list_box = urwid.ListBox(merged)
|
|
list_box.set_focus(focus_position + 2)
|
|
self.w = self.wrap_body(const, list_box)
|
|
self.master.statusbar.redraw()
|
|
|
|
self.last_displayed_body = list_box
|
|
|
|
if self.search_wrapped_around(last_find_line, last_search_index):
|
|
return "search hit BOTTOM, continuing at TOP"
|
|
|
|
def search_get_start(self, search_string):
|
|
start_line = 0
|
|
start_index = 0
|
|
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
|
|
if search_string == last_search_string:
|
|
start_line = self.state.get_flow_setting(self.flow, "last_find_line")
|
|
start_index = self.state.get_flow_setting(self.flow,
|
|
"last_search_index")
|
|
|
|
if start_index == None:
|
|
start_index = 0
|
|
else:
|
|
start_index += len(search_string)
|
|
|
|
if start_line == None:
|
|
start_line = 0
|
|
|
|
else:
|
|
self.state.add_flow_setting(self.flow, "last_search_string",
|
|
search_string)
|
|
|
|
return (start_line, start_index)
|
|
|
|
def search_highlight_text(self, text_objects, search_string, looping = False):
|
|
start_line, start_index = self.search_get_start(search_string)
|
|
i = start_line
|
|
|
|
found = False
|
|
text_objects = copy.deepcopy(text_objects)
|
|
for text_object in text_objects[start_line:]:
|
|
if i != start_line:
|
|
start_index = 0
|
|
|
|
try:
|
|
text, style = text_object.get_text()
|
|
except AttributeError:
|
|
raise SearchError()
|
|
find_index = text.find(search_string, start_index)
|
|
if find_index != -1:
|
|
before = text[:find_index]
|
|
after = text[find_index+len(search_string):]
|
|
new_text = urwid.Text(
|
|
[
|
|
before,
|
|
(self.highlight_color, search_string),
|
|
after,
|
|
]
|
|
)
|
|
|
|
self.state.add_flow_setting(self.flow, "last_search_index",
|
|
find_index)
|
|
self.state.add_flow_setting(self.flow, "last_find_line", i)
|
|
|
|
text_objects[i] = new_text
|
|
|
|
found = True
|
|
|
|
break
|
|
|
|
i += 1
|
|
|
|
if found:
|
|
focus_pos = i
|
|
else :
|
|
# loop from the beginning, but not forever.
|
|
if (start_line == 0 and start_index == 0) or looping:
|
|
focus_pos = None
|
|
else:
|
|
self.state.add_flow_setting(self.flow, "last_search_index", 0)
|
|
self.state.add_flow_setting(self.flow, "last_find_line", 0)
|
|
text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
|
|
|
|
return text_objects, focus_pos
|
|
|
|
def view_request(self):
|
|
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
|
|
body = self.conn_text(self.flow.request)
|
|
self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
|
|
self.master.statusbar.redraw()
|
|
|
|
def view_response(self):
|
|
self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE
|
|
if self.flow.response:
|
|
body = self.conn_text(self.flow.response)
|
|
else:
|
|
body = urwid.ListBox(
|
|
[
|
|
urwid.Text(""),
|
|
urwid.Text(
|
|
[
|
|
("highlight", "No response. Press "),
|
|
("key", "e"),
|
|
("highlight", " and edit any aspect to add one."),
|
|
]
|
|
)
|
|
]
|
|
)
|
|
self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
|
|
self.master.statusbar.redraw()
|
|
|
|
def refresh_flow(self, c=None):
|
|
if c == self.flow:
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response:
|
|
self.view_response()
|
|
else:
|
|
self.view_request()
|
|
|
|
def set_method_raw(self, m):
|
|
if m:
|
|
self.flow.request.method = m
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def edit_method(self, m):
|
|
if m == "e":
|
|
self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw)
|
|
else:
|
|
for i in self.method_options:
|
|
if i[1] == m:
|
|
self.flow.request.method = i[0].upper()
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def save_body(self, path):
|
|
if not path:
|
|
return
|
|
self.state.last_saveload = path
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
c = self.flow.request
|
|
else:
|
|
c = self.flow.response
|
|
path = os.path.expanduser(path)
|
|
try:
|
|
f = file(path, "wb")
|
|
f.write(str(c.content))
|
|
f.close()
|
|
except IOError, v:
|
|
self.master.statusbar.message(v.strerror)
|
|
|
|
def set_url(self, url):
|
|
request = self.flow.request
|
|
if not request.set_url(str(url)):
|
|
return "Invalid URL."
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def set_resp_code(self, code):
|
|
response = self.flow.response
|
|
try:
|
|
response.code = int(code)
|
|
except ValueError:
|
|
return None
|
|
import BaseHTTPServer
|
|
if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)):
|
|
response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0]
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def set_resp_msg(self, msg):
|
|
response = self.flow.response
|
|
response.msg = msg
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def set_headers(self, lst, conn):
|
|
conn.headers = flow.ODictCaseless(lst)
|
|
|
|
def set_query(self, lst, conn):
|
|
conn.set_query(flow.ODict(lst))
|
|
|
|
def set_path_components(self, lst, conn):
|
|
conn.set_path_components([i[0] for i in lst])
|
|
|
|
def set_form(self, lst, conn):
|
|
conn.set_form_urlencoded(flow.ODict(lst))
|
|
|
|
def edit_form(self, conn):
|
|
self.master.view_grideditor(
|
|
grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn)
|
|
)
|
|
|
|
def edit_form_confirm(self, key, conn):
|
|
if key == "y":
|
|
self.edit_form(conn)
|
|
|
|
def edit(self, part):
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
conn = self.flow.request
|
|
else:
|
|
if not self.flow.response:
|
|
self.flow.response = flow.Response(
|
|
self.flow.request,
|
|
self.flow.request.httpversion,
|
|
200, "OK", flow.ODictCaseless(), "", None
|
|
)
|
|
self.flow.response.reply = controller.DummyReply()
|
|
conn = self.flow.response
|
|
|
|
self.flow.backup()
|
|
if part == "r":
|
|
c = self.master.spawn_editor(conn.content or "")
|
|
conn.content = c.rstrip("\n") # what?
|
|
elif part == "f":
|
|
if not conn.get_form_urlencoded() and conn.content:
|
|
self.master.prompt_onekey(
|
|
"Existing body is not a URL-encoded form. Clear and edit?",
|
|
[
|
|
("yes", "y"),
|
|
("no", "n"),
|
|
],
|
|
self.edit_form_confirm,
|
|
conn
|
|
)
|
|
else:
|
|
self.edit_form(conn)
|
|
elif part == "h":
|
|
self.master.view_grideditor(grideditor.HeaderEditor(self.master, conn.headers.lst, self.set_headers, conn))
|
|
elif part == "p":
|
|
p = conn.get_path_components()
|
|
p = [[i] for i in p]
|
|
self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, conn))
|
|
elif part == "q":
|
|
self.master.view_grideditor(grideditor.QueryEditor(self.master, conn.get_query().lst, self.set_query, conn))
|
|
elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.master.prompt_edit("URL", conn.get_url(), self.set_url)
|
|
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.master.prompt_onekey("Method", self.method_options, self.edit_method)
|
|
elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
|
|
self.master.prompt_edit("Code", str(conn.code), self.set_resp_code)
|
|
elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
|
|
self.master.prompt_edit("Message", conn.msg, self.set_resp_msg)
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def _view_nextprev_flow(self, np, flow):
|
|
try:
|
|
idx = self.state.view.index(flow)
|
|
except IndexError:
|
|
return
|
|
if np == "next":
|
|
new_flow, new_idx = self.state.get_next(idx)
|
|
else:
|
|
new_flow, new_idx = self.state.get_prev(idx)
|
|
if new_flow is None:
|
|
self.master.statusbar.message("No more flows!")
|
|
return
|
|
self.master.view_flow(new_flow)
|
|
|
|
def view_next_flow(self, flow):
|
|
return self._view_nextprev_flow("next", flow)
|
|
|
|
def view_prev_flow(self, flow):
|
|
return self._view_nextprev_flow("prev", flow)
|
|
|
|
def change_this_display_mode(self, t):
|
|
self.state.add_flow_setting(
|
|
self.flow,
|
|
(self.state.view_flow_mode, "prettyview"),
|
|
contentview.get_by_shortcut(t)
|
|
)
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def delete_body(self, t):
|
|
if t == "m":
|
|
val = flow.CONTENT_MISSING
|
|
else:
|
|
val = None
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.flow.request.content = val
|
|
else:
|
|
self.flow.response.content = val
|
|
self.master.refresh_flow(self.flow)
|
|
|
|
def keypress(self, size, key):
|
|
if key == " ":
|
|
self.view_next_flow(self.flow)
|
|
return
|
|
|
|
key = common.shortcuts(key)
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
conn = self.flow.request
|
|
else:
|
|
conn = self.flow.response
|
|
|
|
if key == "q":
|
|
self.master.view_flowlist()
|
|
key = None
|
|
elif key == "tab":
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.view_response()
|
|
else:
|
|
self.view_request()
|
|
elif key in ("up", "down", "page up", "page down"):
|
|
# Why doesn't this just work??
|
|
self.w.keypress(size, key)
|
|
elif key == "a":
|
|
self.flow.accept_intercept()
|
|
self.master.view_flow(self.flow)
|
|
elif key == "A":
|
|
self.master.accept_all()
|
|
self.master.view_flow(self.flow)
|
|
elif key == "b":
|
|
if conn:
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.master.path_prompt(
|
|
"Save request body: ",
|
|
self.state.last_saveload,
|
|
self.save_body
|
|
)
|
|
else:
|
|
self.master.path_prompt(
|
|
"Save response body: ",
|
|
self.state.last_saveload,
|
|
self.save_body
|
|
)
|
|
elif key == "d":
|
|
if self.state.flow_count() == 1:
|
|
self.master.view_flowlist()
|
|
elif self.state.view.index(self.flow) == len(self.state.view)-1:
|
|
self.view_prev_flow(self.flow)
|
|
else:
|
|
self.view_next_flow(self.flow)
|
|
f = self.flow
|
|
f.kill(self.master)
|
|
self.state.delete_flow(f)
|
|
elif key == "D":
|
|
f = self.master.duplicate_flow(self.flow)
|
|
self.master.view_flow(f)
|
|
self.master.statusbar.message("Duplicated.")
|
|
elif key == "e":
|
|
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
|
|
self.master.prompt_onekey(
|
|
"Edit request",
|
|
(
|
|
("query", "q"),
|
|
("path", "p"),
|
|
("url", "u"),
|
|
("header", "h"),
|
|
("form", "f"),
|
|
("raw body", "r"),
|
|
("method", "m"),
|
|
),
|
|
self.edit
|
|
)
|
|
else:
|
|
self.master.prompt_onekey(
|
|
"Edit response",
|
|
(
|
|
("code", "c"),
|
|
("message", "m"),
|
|
("header", "h"),
|
|
("raw body", "r"),
|
|
),
|
|
self.edit
|
|
)
|
|
key = None
|
|
elif key == "f":
|
|
self.master.statusbar.message("Loading all body data...")
|
|
self.state.add_flow_setting(
|
|
self.flow,
|
|
(self.state.view_flow_mode, "fullcontents"),
|
|
True
|
|
)
|
|
self.master.refresh_flow(self.flow)
|
|
self.master.statusbar.message("")
|
|
elif key == "m":
|
|
p = list(contentview.view_prompts)
|
|
p.insert(0, ("clear", "c"))
|
|
self.master.prompt_onekey(
|
|
"Display mode",
|
|
p,
|
|
self.change_this_display_mode
|
|
)
|
|
key = None
|
|
elif key == "p":
|
|
self.view_prev_flow(self.flow)
|
|
elif key == "r":
|
|
self.flow.backup()
|
|
r = self.master.replay_request(self.flow)
|
|
if r:
|
|
self.master.statusbar.message(r)
|
|
self.master.refresh_flow(self.flow)
|
|
elif key == "V":
|
|
if not self.flow.modified():
|
|
self.master.statusbar.message("Flow not modified.")
|
|
return
|
|
self.state.revert(self.flow)
|
|
self.master.refresh_flow(self.flow)
|
|
self.master.statusbar.message("Reverted.")
|
|
elif key == "W":
|
|
self.master.path_prompt(
|
|
"Save this flow: ",
|
|
self.state.last_saveload,
|
|
self.master.save_one_flow,
|
|
self.flow
|
|
)
|
|
elif key == "v":
|
|
if conn and conn.content:
|
|
t = conn.headers["content-type"] or [None]
|
|
t = t[0]
|
|
if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
|
|
self.master.spawn_external_viewer(conn.content, t)
|
|
else:
|
|
self.master.statusbar.message("Error! Set $EDITOR or $PAGER.")
|
|
elif key == "|":
|
|
self.master.path_prompt(
|
|
"Send flow to script: ", self.state.last_script,
|
|
self.master.run_script_once, self.flow
|
|
)
|
|
elif key == "x":
|
|
self.master.prompt_onekey(
|
|
"Delete body",
|
|
(
|
|
("completely", "c"),
|
|
("mark as missing", "m"),
|
|
),
|
|
self.delete_body
|
|
)
|
|
key = None
|
|
elif key == "X":
|
|
self.master.view_flowdetails(self.flow)
|
|
elif key == "z":
|
|
if conn:
|
|
self.flow.backup()
|
|
e = conn.headers.get_first("content-encoding", "identity")
|
|
if e != "identity":
|
|
if not conn.decode():
|
|
self.master.statusbar.message("Could not decode - invalid data?")
|
|
else:
|
|
self.master.prompt_onekey(
|
|
"Select encoding: ",
|
|
(
|
|
("gzip", "z"),
|
|
("deflate", "d"),
|
|
),
|
|
self.encode_callback,
|
|
conn
|
|
)
|
|
self.master.refresh_flow(self.flow)
|
|
elif key == "/":
|
|
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
|
|
search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
|
|
self.master.prompt(search_prompt,
|
|
None,
|
|
self.search)
|
|
elif key == "n":
|
|
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
|
|
if last_search_string:
|
|
message = self.search(last_search_string)
|
|
if message:
|
|
self.master.statusbar.message(message)
|
|
else:
|
|
self.master.statusbar.message("no previous searches have been made")
|
|
else:
|
|
return key
|
|
|
|
def encode_callback(self, key, conn):
|
|
encoding_map = {
|
|
"z": "gzip",
|
|
"d": "deflate",
|
|
}
|
|
conn.encode(encoding_map[key])
|
|
self.master.refresh_flow(self.flow)
|