This commit is contained in:
Legend Tang 2015-04-06 11:28:20 +08:00
commit 20d3d7e1b8
11 changed files with 369 additions and 125 deletions

View File

@ -15,8 +15,8 @@ import urwid
import weakref
from .. import controller, flow, script
from . import flowlist, flowview, help, common, window, signals
from . import grideditor, palettes, contentview, flowdetailview, statusbar
from . import flowlist, flowview, help, window, signals, options
from . import grideditor, palettes, contentview, statusbar
EVENTLOG_SIZE = 500
@ -73,6 +73,8 @@ class ConsoleState(flow.State):
elif idx < 0:
idx = 0
self.focus = idx
else:
self.focus = None
def set_focus_flow(self, f):
self.set_focus(self.view.index(f))
@ -227,6 +229,16 @@ class ConsoleMaster(flow.FlowMaster):
def sig_pop_view_state(self, sender):
if self.view_stack:
self.loop.widget = self.view_stack.pop()
else:
signals.status_prompt_onekey.send(
self,
prompt = "Quit",
keys = (
("yes", "y"),
("no", "n"),
),
callback = self.quit,
)
def sig_push_view_state(self, sender):
self.view_stack.append(self.loop.widget)
@ -393,7 +405,6 @@ class ConsoleMaster(flow.FlowMaster):
self.ui.set_terminal_properties(256)
self.ui.register_palette(self.palette.palette())
self.flow_list_walker = flowlist.FlowListWalker(self, self.state)
self.help_context = None
self.loop = urwid.MainLoop(
urwid.SolidFill("x"),
screen = self.ui,
@ -444,23 +455,34 @@ class ConsoleMaster(flow.FlowMaster):
sys.stderr.flush()
self.shutdown()
def view_help(self):
def view_help(self, helpctx):
signals.push_view_state.send(self)
self.loop.widget = window.Window(
self,
help.HelpView(self.help_context),
help.HelpView(helpctx),
None,
statusbar.StatusBar(self, help.footer)
statusbar.StatusBar(self, help.footer),
None
)
def view_options(self):
signals.push_view_state.send(self)
self.loop.widget = window.Window(
self,
options.Options(self),
None,
statusbar.StatusBar(self, options.footer),
None
)
def view_grideditor(self, ge):
signals.push_view_state.send(self)
self.help_context = ge.make_help()
self.loop.widget = window.Window(
self,
ge,
None,
statusbar.StatusBar(self, grideditor.FOOTER)
statusbar.StatusBar(self, grideditor.FOOTER),
ge.make_help()
)
def view_flowlist(self):
@ -474,24 +496,24 @@ class ConsoleMaster(flow.FlowMaster):
else:
body = flowlist.FlowListBox(self)
self.help_context = flowlist.help_context
self.loop.widget = window.Window(
self,
body,
None,
statusbar.StatusBar(self, flowlist.footer)
statusbar.StatusBar(self, flowlist.footer),
flowlist.help_context
)
self.loop.draw_screen()
def view_flow(self, flow, tab_offset=0):
signals.push_view_state.send(self)
self.state.set_focus_flow(flow)
self.help_context = flowview.help_context
self.loop.widget = window.Window(
self,
flowview.FlowView(self, self.state, flow, tab_offset),
flowview.FlowViewHeader(self, flow),
statusbar.StatusBar(self, flowview.footer)
statusbar.StatusBar(self, flowview.footer),
flowview.help_context
)
def _write_flows(self, path, flows):
@ -574,24 +596,6 @@ class ConsoleMaster(flow.FlowMaster):
if a != "n":
raise urwid.ExitMainLoop
def _change_options(self, a):
if a == "a":
self.anticache = not self.anticache
if a == "c":
self.anticomp = not self.anticomp
if a == "h":
self.showhost = not self.showhost
self.sync_list_view()
self.refresh_focus()
elif a == "k":
self.killextra = not self.killextra
elif a == "n":
self.refresh_server_playback = not self.refresh_server_playback
elif a == "u":
self.server.config.no_upstream_cert =\
not self.server.config.no_upstream_cert
signals.update_settings.send(self)
def shutdown(self):
self.state.killall(self)
flow.FlowMaster.shutdown(self)

View File

@ -37,14 +37,14 @@ def is_keypress(k):
return True
def highlight_key(s, k):
def highlight_key(str, key, textattr="text", keyattr="key"):
l = []
parts = s.split(k, 1)
parts = str.split(key, 1)
if parts[0]:
l.append(("text", parts[0]))
l.append(("key", k))
l.append((textattr, parts[0]))
l.append((keyattr, key))
if parts[1]:
l.append(("text", parts[1]))
l.append((textattr, parts[1]))
return l

View File

@ -1,8 +1,17 @@
from __future__ import absolute_import
import urwid
from . import common, signals, searchable
from . import common, searchable
from .. import utils
def maybe_timestamp(base, attr):
if base and getattr(base, attr):
return utils.format_timestamp_with_milli(getattr(base, attr))
else:
return "active"
pass
def flowdetails(state, flow):
text = []
@ -81,16 +90,61 @@ def flowdetails(state, flow):
parts = []
parts.append(["Client conn. established", utils.format_timestamp_with_milli(cc.timestamp_start) if (cc and cc.timestamp_start) else "active"])
parts.append(["Server conn. initiated", utils.format_timestamp_with_milli(sc.timestamp_start) if sc else "active" ])
parts.append(["Server conn. TCP handshake", utils.format_timestamp_with_milli(sc.timestamp_tcp_setup) if (sc and sc.timestamp_tcp_setup) else "active"])
parts.append(
[
"Client conn. established",
maybe_timestamp(cc, "timestamp_start")
]
)
parts.append(
[
"Server conn. initiated",
maybe_timestamp(sc, "timestamp_start")
]
)
parts.append(
[
"Server conn. TCP handshake",
maybe_timestamp(sc, "timestamp_tcp_setup")
]
)
if sc.ssl_established:
parts.append(["Server conn. SSL handshake", utils.format_timestamp_with_milli(sc.timestamp_ssl_setup) if sc.timestamp_ssl_setup else "active"])
parts.append(["Client conn. SSL handshake", utils.format_timestamp_with_milli(cc.timestamp_ssl_setup) if (cc and cc.timestamp_ssl_setup) else "active"])
parts.append(["First request byte", utils.format_timestamp_with_milli(req.timestamp_start)])
parts.append(["Request complete", utils.format_timestamp_with_milli(req.timestamp_end) if req.timestamp_end else "active"])
parts.append(["First response byte", utils.format_timestamp_with_milli(resp.timestamp_start) if resp else "active"])
parts.append(["Response complete", utils.format_timestamp_with_milli(resp.timestamp_end) if (resp and resp.timestamp_end) else "active"])
parts.append(
[
"Server conn. SSL handshake",
maybe_timestamp(sc, "timestamp_ssl_setup")
]
)
parts.append(
[
"Client conn. SSL handshake",
maybe_timestamp(cc, "timestamp_ssl_setup")
]
)
parts.append(
[
"First request byte",
maybe_timestamp(req, "timestamp_start")
]
)
parts.append(
[
"Request complete",
maybe_timestamp(req, "timestamp_end")
]
)
parts.append(
[
"First response byte",
maybe_timestamp(resp, "timestamp_start")
]
)
parts.append(
[
"Response complete",
maybe_timestamp(resp, "timestamp_end")
]
)
# sort operations by timestamp
parts = sorted(parts, key=lambda p: p[1])

View File

@ -68,6 +68,7 @@ def _mkhelp():
("x", "delete body"),
("z", "encode/decode a request/response"),
("tab", "next tab"),
("h, l", "previous tab, next tab"),
("space", "next flow"),
("|", "run script on this flow"),
("/", "search (case sensitive)"),
@ -434,6 +435,8 @@ class FlowView(tabs.Tabs):
signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
key = super(self.__class__, self).keypress(size, key)
if key == " ":
self.view_next_flow(self.flow)
return
@ -446,10 +449,7 @@ class FlowView(tabs.Tabs):
else:
conn = None
if key == "q":
signals.pop_view_state.send(self)
return None
elif key in ("up", "down", "page up", "page down"):
if key in ("up", "down", "page up", "page down"):
# Why doesn't this just work??
self._w.keypress(size, key)
elif key == "a":
@ -499,7 +499,7 @@ class FlowView(tabs.Tabs):
args = (self.flow,)
)
if not conn and key in "befgmxvz":
if not conn and key in set(list("befgmxvz")):
signals.status_message.send(
message = "Tab to the request or response",
expire = 1
@ -601,10 +601,7 @@ class FlowView(tabs.Tabs):
args = (conn,)
)
signals.flow_change.send(self, flow = self.flow)
else:
return super(self.__class__, self).keypress(size, key)
else:
return super(self.__class__, self).keypress(size, key)
return key
def encode_callback(self, key, conn):
encoding_map = {

View File

@ -89,32 +89,7 @@ class HelpView(urwid.ListBox):
common.highlight_key("amf", "f") +
[("text", ": AMF (requires PyAMF)")]
),
("o", "toggle options:"),
(None,
common.highlight_key("anticache", "a") +
[("text", ": prevent cached responses")]
),
(None,
common.highlight_key("anticomp", "c") +
[("text", ": prevent compressed responses")]
),
(None,
common.highlight_key("showhost", "h") +
[("text", ": use Host header for URL display")]
),
(None,
common.highlight_key("killextra", "k") +
[("text", ": kill requests not part of server replay")]
),
(None,
common.highlight_key("norefresh", "n") +
[("text", ": disable server replay response refresh")]
),
(None,
common.highlight_key("upstream certs", "u") +
[("text", ": sniff cert info from upstream server")]
),
("o", "options"),
("q", "quit / return to flow list"),
("Q", "quit without confirm prompt"),
("R", "edit replacement patterns"),

View File

@ -0,0 +1,200 @@
import urwid
from . import common, signals
help_context = None
footer = [
('heading_key', "enter/space"), ":toggle ",
('heading_key', "C"), ":clear all ",
]
class OptionWidget(urwid.WidgetWrap):
def __init__(self, option, text, shortcut, active, focus):
self.option = option
textattr = "text"
keyattr = "key"
if focus and active:
textattr = "option_active_selected"
keyattr = "option_selected_key"
elif focus:
textattr = "option_selected"
keyattr = "option_selected_key"
elif active:
textattr = "option_active"
text = common.highlight_key(
text,
shortcut,
textattr=textattr,
keyattr=keyattr
)
opt = urwid.Text(text, align="center")
opt = urwid.AttrWrap(opt, textattr)
opt = urwid.Padding(opt, align="center", width=("relative", 20))
urwid.WidgetWrap.__init__(self, opt)
def keypress(self, size, key):
return key
def selectable(self):
return True
class OptionWalker(urwid.ListWalker):
def __init__(self, options):
urwid.ListWalker.__init__(self)
self.options = options
self.focus = 0
signals.update_settings.connect(self.sig_update_settings)
def sig_update_settings(self, sender):
self._modified()
def set_focus(self, pos):
self.focus = pos
def get_focus(self):
return self.options[self.focus].render(True), self.focus
def get_next(self, pos):
if pos >= len(self.options)-1:
return None, None
return self.options[pos+1].render(False), pos+1
def get_prev(self, pos):
if pos <= 0:
return None, None
return self.options[pos-1].render(False), pos-1
class OptionListBox(urwid.ListBox):
def __init__(self, options):
urwid.ListBox.__init__(
self,
OptionWalker(options)
)
self.options = options
self.keymap = {}
for i in options:
self.keymap[i.shortcut] = i
def keypress(self, size, key):
if key == "enter" or key == " ":
self.get_focus()[0].option.activate()
return None
key = common.shortcuts(key)
if key in self.keymap:
self.keymap[key].activate()
self.set_focus(self.options.index(self.keymap[key]))
return None
return super(self.__class__, self).keypress(size, key)
_neg = lambda: False
class Option:
def __init__(self, text, shortcut, getstate=None, activate=None):
self.text = text
self.shortcut = shortcut
self.getstate = getstate or _neg
self.activate = activate or _neg
def render(self, focus):
return OptionWidget(self, self.text, self.shortcut, self.getstate(), focus)
class Options(urwid.WidgetWrap):
def __init__(self, master):
self.master = master
self.lb = OptionListBox(
[
Option(
"Anti-Cache",
"a",
lambda: master.anticache,
self.toggle_anticache
),
Option(
"Anti-Compression",
"o",
lambda: master.anticomp,
self.toggle_anticomp
),
#Option("Header Set Patterns"),
#Option("Ignore Patterns"),
Option(
"Kill Extra",
"E",
lambda: master.killextra,
self.toggle_killextra
),
#Option("Manage Scripts"),
#Option("Replacement Patterns"),
Option(
"Show Host",
"H",
lambda: master.showhost,
self.toggle_showhost
),
#Option("Sticky Cookies"),
#Option("Sticky Auth"),
#Option("TCP Proxying"),
Option(
"No Refresh",
"R",
lambda: not master.refresh_server_playback,
self.toggle_refresh_server_playback
),
Option(
"No Upstream Certs",
"U",
lambda: master.server.config.no_upstream_cert,
self.toggle_upstream_cert
),
]
)
title = urwid.Text("Options")
title = urwid.Padding(title, align="left", width=("relative", 100))
title = urwid.AttrWrap(title, "heading")
self._w = urwid.Frame(
self.lb,
header = title
)
self.master.loop.widget.footer.update("")
def keypress(self, size, key):
if key == "C":
self.clearall()
return None
return super(self.__class__, self).keypress(size, key)
def clearall(self):
self.master.anticache = False
self.master.anticomp = False
self.master.killextra = False
self.master.showhost = False
self.master.refresh_server_playback = True
self.master.server.config.no_upstream_cert = False
signals.update_settings.send(self)
signals.status_message.send(
message = "All options cleared",
expire = 1
)
def toggle_anticache(self):
self.master.anticache = not self.master.anticache
def toggle_anticomp(self):
self.master.anticomp = not self.master.anticomp
def toggle_killextra(self):
self.master.killextra = not self.master.killextra
def toggle_showhost(self):
self.master.showhost = not self.master.showhost
def toggle_refresh_server_playback(self):
self.master.refresh_server_playback = not self.master.refresh_server_playback
def toggle_upstream_cert(self):
self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert
signals.update_settings.send(self)

View File

@ -17,6 +17,10 @@ class Palette:
# Help
'key', 'head', 'text',
# Options
'option_selected', 'option_active', 'option_active_selected',
'option_selected_key',
# List and Connections
'method', 'focus',
'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
@ -60,6 +64,12 @@ class LowDark(Palette):
head = ('white,bold', 'default'),
text = ('light gray', 'default'),
# Options
option_selected = ('light gray', 'dark blue'),
option_selected_key = ('light cyan', 'dark blue'),
option_active = ('light red', 'default'),
option_active_selected = ('light red', 'dark blue'),
# List and Connections
method = ('dark cyan', 'default'),
focus = ('yellow', 'default'),
@ -100,18 +110,24 @@ class LowLight(Palette):
Low-color light background
"""
low = dict(
title = ('dark magenta,bold', 'light blue'),
title = ('dark magenta', 'default'),
# Status bar & heading
heading = ('light gray', 'dark blue'),
heading_key = ('light cyan', 'dark blue'),
heading = ('white', 'black'),
heading_key = ('dark blue', 'black'),
heading_inactive = ('black', 'light gray'),
# Help
key = ('dark blue,bold', 'default'),
head = ('black,bold', 'default'),
key = ('dark blue', 'default'),
head = ('black', 'default'),
text = ('dark gray', 'default'),
# Options
option_selected = ('black', 'light gray'),
option_selected_key = ('dark blue', 'light gray'),
option_active = ('light red', 'default'),
option_active_selected = ('light red', 'light gray'),
# List and Connections
method = ('dark cyan', 'default'),
focus = ('black', 'default'),
@ -181,6 +197,12 @@ class SolarizedLight(LowLight):
key = (sol_blue, 'default',),
head = (sol_base00, 'default'),
# Options
option_selected = (sol_base03, sol_base2),
option_selected_key = (sol_blue, sol_base2),
option_active = (sol_orange, 'default'),
option_active_selected = (sol_orange, sol_base2),
# List and Connections
method = (sol_cyan, 'default'),
focus = (sol_base01, 'default'),
@ -212,16 +234,22 @@ class SolarizedLight(LowLight):
class SolarizedDark(LowDark):
high = dict(
title = (sol_blue, 'default'),
text = (sol_base0, 'default'),
text = (sol_base1, 'default'),
# Status bar & heading
heading = (sol_base03, sol_base1),
heading_key = (sol_blue+",bold", sol_base1),
heading = (sol_base2, sol_base01),
heading_key = (sol_blue+",bold", sol_base01),
heading_inactive = (sol_base1, sol_base02),
# Help
key = (sol_blue, 'default',),
head = (sol_base00, 'default'),
head = (sol_base2, 'default'),
# Options
option_selected = (sol_base03, sol_base00),
option_selected_key = (sol_blue, sol_base00),
option_active = (sol_orange, 'default'),
option_active_selected = (sol_orange, sol_base00),
# List and Connections
method = (sol_cyan, 'default'),

View File

@ -36,7 +36,7 @@ class Searchable(urwid.ListBox):
if key == "N":
self.find_next(True)
else:
return key
return super(self.__class__, self).keypress(size, key)
def set_search(self, text):
self.state.last_search = text

View File

@ -14,11 +14,13 @@ class Tabs(urwid.WidgetWrap):
return p
def keypress(self, size, key):
if key == "tab":
if key in ["tab", "l"]:
self.tab_offset = (self.tab_offset + 1)%(len(self.tabs))
self.show()
else:
return self._w.keypress(size, key)
elif key == "h":
self.tab_offset = (self.tab_offset - 1)%(len(self.tabs))
self.show()
return self._w.keypress(size, key)
def show(self):
headers = []
@ -33,3 +35,4 @@ class Tabs(urwid.WidgetWrap):
body = self.tabs[self.tab_offset][1](),
header = headers
)
self._w.set_focus("body")

View File

@ -1,19 +1,21 @@
import urwid
from . import common, grideditor, signals, contentview
from . import grideditor, signals, contentview
class Window(urwid.Frame):
def __init__(self, master, body, header, footer):
def __init__(self, master, body, header, footer, helpctx):
urwid.Frame.__init__(self, body, header=header, footer=footer)
self.master = master
self.helpctx = helpctx
signals.focus.connect(self.sig_focus)
def sig_focus(self, sender, section):
self.focus_position = section
def keypress(self, size, k):
k = urwid.Frame.keypress(self, self.master.loop.screen_size, k)
k = super(self.__class__, self).keypress(size, k)
if k == "?":
self.master.view_help()
self.master.view_help(self.helpctx)
elif k == "c":
if not self.master.client_playback:
signals.status_prompt_path.send(
@ -62,18 +64,12 @@ class Window(urwid.Frame):
text = self.master.state.intercept_txt,
callback = self.master.set_intercept
)
elif k == "o":
self.master.view_options()
elif k == "Q":
raise urwid.ExitMainLoop
elif k == "q":
signals.status_prompt_onekey.send(
self,
prompt = "Quit",
keys = (
("yes", "y"),
("no", "n"),
),
callback = self.master.quit,
)
signals.pop_view_state.send(self)
elif k == "M":
signals.status_prompt_onekey.send(
prompt = "Global default display mode",
@ -113,19 +109,6 @@ class Window(urwid.Frame):
),
callback = self.master.stop_server_playback_prompt,
)
elif k == "o":
signals.status_prompt_onekey.send(
prompt = "Options",
keys = (
("anticache", "a"),
("anticomp", "c"),
("showhost", "h"),
("killextra", "k"),
("norefresh", "n"),
("no-upstream-certs", "u"),
),
callback = self.master._change_options
)
elif k == "t":
signals.status_prompt.send(
prompt = "Sticky cookie filter",

View File

@ -1,9 +1,9 @@
#!/bin/bash
# Generate a test pattern with pathoc
PATHOD=http://localhost:9999
pathoc localhost:8080 "get:'$PATHOD/p/200:p0,1:b@2048b':b@2048b"
pathoc localhost:8080 "get:'$PATHOD/p/300:p0,1:b@2048b':b@2048b"
pathoc localhost:8080 "get:'$PATHOD/p/400:p0,1:b@2048b':b@2048b"
pathoc localhost:8080 "get:'$PATHOD/p/500:p0,1:b@2048b':b@2048b"
pathoc localhost:8080 "get:'$PATHOD/p/600:p0,1:b@2048b':b@2048b"
PATHOD=localhost:9999
pathoc -s -c $PATHOD localhost:8080 "get:'/p/200:p0,1:b@2048b':b@2048b"
pathoc -s -c $PATHOD localhost:8080 "get:'/p/300:p0,1:b@2048b':b@2048b"
pathoc -s -c $PATHOD localhost:8080 "get:'/p/400:p0,1:b@2048b':b@2048b"
pathoc -s -c $PATHOD localhost:8080 "get:'/p/500:p0,1:b@2048b':b@2048b"
pathoc -s -c $PATHOD localhost:8080 "get:'/p/600:p0,1:b@2048b':b@2048b"