Merge pull request #193 from droope/search-functionality

Search functionality
This commit is contained in:
Aldo Cortesi 2014-01-04 14:34:20 -08:00
commit 7d37e0ce10
5 changed files with 354 additions and 44 deletions

View File

@ -12,6 +12,7 @@ def _mkhelp():
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
("l", "set limit filter pattern"),
("/", "same as above"),
("L", "load saved flows"),
("r", "replay request"),
("V", "revert changes to request"),
@ -244,7 +245,7 @@ class FlowListBox(urwid.ListBox):
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
elif key == "l":
elif key == "l" or key == "/":
self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit)
elif key == "L":
self.master.path_prompt(

View File

@ -63,6 +63,8 @@ def _mkhelp():
("tab", "toggle request/response view"),
("space", "next flow"),
("|", "run script on this flow"),
("/", "search in response body (case sensitive)"),
("n", "repeat previous search"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
@ -85,7 +87,9 @@ class FlowViewHeader(common.WWrap):
class CallbackCache:
@utils.LRUCache(200)
#commented decorator because it was breaking search functionality (caching after
# searches.) If it can be made to only cache the first time, it'd be great.
#@utils.LRUCache(200)
def _callback(self, method, *args, **kwargs):
return getattr(self.obj, method)(*args, **kwargs)
@ -109,8 +113,12 @@ class FlowView(common.WWrap):
("options", "o"),
("edit raw", "e"),
]
highlight_color = "focusfield"
def __init__(self, master, state, flow):
self.master, self.state, self.flow = master, state, flow
self.last_displayed_body = None
if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
self.view_response()
else:
@ -129,7 +137,8 @@ class FlowView(common.WWrap):
limit = sys.maxint
else:
limit = contentview.VIEW_CUTOFF
return cache.callback(
description, text_objects = cache.callback(
self, "_cached_content_view",
viewmode,
tuple(tuple(i) for i in conn.headers.lst),
@ -137,49 +146,84 @@ class FlowView(common.WWrap):
limit
)
def conn_text(self, conn):
txt = common.format_keyvals(
return (description, text_objects)
def cont_view_handle_missing(self, conn, viewmode):
if conn.content == flow.CONTENT_MISSING:
msg, body = "", [urwid.Text([("error", "[content missing]")])], 0
else:
msg, body = self.content_view(viewmode, conn)
return (msg, body)
def viewmode_get(self, override):
return self.state.default_body_view if override is None else override
def override_get(self):
return self.state.get_flow_setting(self.flow,
(self.state.view_flow_mode, "prettyview"))
def conn_text_raw(self, conn):
"""
Based on a request/response, conn, returns the elements for
display.
"""
headers = common.format_keyvals(
[(h+":", v) for (h, v) in conn.headers.lst],
key = "header",
val = "text"
)
if conn.content is not None:
override = self.state.get_flow_setting(
self.flow,
(self.state.view_flow_mode, "prettyview"),
)
viewmode = self.state.default_body_view if override is None else override
if conn.content == flow.CONTENT_MISSING:
msg, body = "", [urwid.Text([("error", "[content missing]")])]
else:
msg, body = self.content_view(viewmode, conn)
cols = [
urwid.Text(
[
("heading", msg),
]
)
]
if override is not None:
cols.append(
urwid.Text(
[
" ",
('heading', "["),
('heading_key', "m"),
('heading', (":%s]"%viewmode.name)),
],
align="right"
)
)
title = urwid.AttrWrap(urwid.Columns(cols), "heading")
txt.append(title)
txt.extend(body)
override = self.override_get()
viewmode = self.viewmode_get(override)
msg, body = self.cont_view_handle_missing(conn, viewmode)
elif conn.content == flow.CONTENT_MISSING:
pass
return urwid.ListBox(txt)
return headers, msg, body
def conn_text_merge(self, headers, msg, body):
"""
Grabs what is returned by conn_text_raw and merges them all
toghether, mainly used by conn_text and search
"""
override = self.override_get()
viewmode = self.viewmode_get(override)
cols = [urwid.Text(
[
("heading", msg),
]
)
]
if override is not None:
cols.append(urwid.Text([
" ",
('heading', "["),
('heading_key', "m"),
('heading', (":%s]"%viewmode.name)),
],
align="right"
)
)
title = urwid.AttrWrap(urwid.Columns(cols), "heading")
headers.append(title)
headers.extend(body)
return headers
def conn_text(self, conn):
"""
Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
"""
headers, msg, body = self.conn_text_raw(conn)
merged = self.conn_text_merge(headers, msg, body)
return urwid.ListBox(merged)
def _tab(self, content, attr):
p = urwid.Text(content)
@ -215,6 +259,140 @@ class FlowView(common.WWrap):
)
return f
def search_wrapped_around(self, last_find_line, last_search_index):
"""
returns true if search wrapped around the bottom.
"""
current_find_line = self.state.get_flow_setting(self.flow,
"last_find_line")
current_search_index = self.state.get_flow_setting(self.flow,
"last_search_index")
if current_find_line <= last_find_line:
return True
elif current_find_line == last_find_line:
if current_search_index <= last_search_index:
return True
return False
def search(self, search_string):
"""
similar to view_response or view_request, but instead of just
displaying the conn, it highlights a word that the user is
searching for and handles all the logic surrounding that.
"""
if search_string == "":
search_string = self.state.get_flow_setting(self.flow,
"last_search_string")
if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
text = self.flow.request
const = common.VIEW_FLOW_REQUEST
else:
text = self.flow.response
const = common.VIEW_FLOW_RESPONSE
if not self.flow.response:
return "no response to search in"
last_find_line = self.state.get_flow_setting(self.flow,
"last_find_line")
last_search_index = self.state.get_flow_setting(self.flow,
"last_search_index")
# generate the body, highlight the words and get focus
headers, msg, body = self.conn_text_raw(text)
body, focus_position = self.search_highlight_text(body, search_string)
if focus_position == None:
# no results found.
return "no matches for '%s'" % search_string
# UI stuff.
merged = self.conn_text_merge(headers, msg, body)
list_box = urwid.ListBox(merged)
list_box.set_focus(focus_position + 2)
self.w = self.wrap_body(const, list_box)
self.master.statusbar.redraw()
self.last_displayed_body = list_box
if self.search_wrapped_around(last_find_line, last_search_index):
return "search hit BOTTOM, continuing at TOP"
def search_get_start(self, search_string):
start_line = 0
start_index = 0
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
if search_string == last_search_string:
start_line = self.state.get_flow_setting(self.flow, "last_find_line")
start_index = self.state.get_flow_setting(self.flow,
"last_search_index")
if start_index == None:
start_index = 0
else:
start_index += len(search_string)
if start_line == None:
start_line = 0
else:
self.state.add_flow_setting(self.flow, "last_search_string",
search_string)
return (start_line, start_index)
def search_highlight_text(self, text_objects, search_string, looping = False):
start_line, start_index = self.search_get_start(search_string)
i = start_line
found = False
for text_object in text_objects[start_line:]:
if i != start_line:
start_index = 0
text, style = text_object.get_text()
find_index = text.find(search_string, start_index)
if find_index != -1:
before = text[:find_index]
after = text[find_index+len(search_string):]
new_text = urwid.Text(
[
before,
(self.highlight_color, search_string),
after,
]
)
self.state.add_flow_setting(self.flow, "last_search_index",
find_index)
self.state.add_flow_setting(self.flow, "last_find_line", i)
text_objects[i] = new_text
found = True
break
i += 1
if found:
focus_pos = i
else :
# loop from the beginning, but not forever.
if (start_line == 0 and start_index == 0) or looping:
focus_pos = None
else:
self.state.add_flow_setting(self.flow, "last_search_index", 0)
self.state.add_flow_setting(self.flow, "last_find_line", 0)
text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
return text_objects, focus_pos
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
@ -574,6 +752,20 @@ class FlowView(common.WWrap):
conn
)
self.master.refresh_flow(self.flow)
elif key == "/":
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
self.master.prompt(search_prompt,
None,
self.search)
elif key == "n":
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
if last_search_string:
message = self.search(last_search_string)
if message:
self.master.statusbar.message(message)
else:
self.master.statusbar.message("no previous searches have been made")
else:
return key

View File

@ -1463,7 +1463,7 @@ class FlowMaster(controller.Master):
def run_script_hook(self, name, *args, **kwargs):
for script in self.scripts:
self.run_single_script_hook(script, name, *args, **kwargs)
def set_stickycookie(self, txt):
if txt:
flt = filt.parse(txt)

View File

@ -275,3 +275,101 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")
def test_search_highlights():
# Default text in requests is content. We will search for nt once, and
# expect the first bit to be highlighted. We will do it again and expect the
# second to be.
f = tutils.tflowview()
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
def test_search_returns_useful_messages():
f = tutils.tflowview()
# original string is content. this string should not be in there.
response = f.search("oranges and other fruit.")
assert response == "no matches for 'oranges and other fruit.'"
def test_search_highlights_clears_prev():
f = tutils.tflowview(request_contents="this is string\nstring is string")
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# search again, it should not be highlighted again.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
def test_search_highlights_multi_line():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# should highlight second line, first appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
# should highlight third line, second appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
def test_search_loops():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# get to the end.
f.search("string")
f.search("string")
f.search("string")
# should highlight the first line.
message = f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
assert message == "search hit BOTTOM, continuing at TOP"
def test_search_focuses():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
# should be focusing on the 2nd text line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert f.last_displayed_body.focus == text_object
def test_search_does_not_crash_on_bad():
"""
this used to crash, kept for reference.
"""
f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
f.search("AFTERCUTOFF")
# pretend F
f.state.add_flow_setting(
f.flow,
(f.state.view_flow_mode, "fullcontents"),
True
)
f.master.refresh_flow(f.flow)
# text changed, now this string will exist. can happen when user presses F
# for full text view
f.search("AFTERCUTOFF")

View File

@ -1,8 +1,11 @@
import os, shutil, tempfile
from contextlib import contextmanager
from libmproxy import flow, utils, controller
from libmproxy.console.flowview import FlowView
from libmproxy.console import ConsoleState
from netlib import certutils
from nose.plugins.skip import SkipTest
from mock import Mock
def _SkipWindows():
raise SkipTest("Skipped on Windows.")
@ -12,13 +15,14 @@ def SkipWindows(fn):
else:
return fn
def treq(conn=None):
def treq(conn=None, content="content"):
if not conn:
conn = flow.ClientConnect(("address", 22))
conn.reply = controller.DummyReply()
headers = flow.ODictCaseless()
headers["header"] = ["qvalue"]
r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers, "content")
r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers,
content)
r.reply = controller.DummyReply()
return r
@ -41,8 +45,9 @@ def terr(req=None):
return err
def tflow():
r = treq()
def tflow(r=None):
if r == None:
r = treq()
return flow.Flow(r)
@ -57,6 +62,20 @@ def tflow_err():
f.error = terr(f.request)
return f
def tflowview(request_contents=None):
m = Mock()
cs = ConsoleState()
if request_contents == None:
flow = tflow()
else:
req = treq(None, request_contents)
flow = tflow(req)
fv = FlowView(m, cs, flow)
return fv
def get_body_line(last_displayed_body, line_nb):
return last_displayed_body.contents()[line_nb + 2]
@contextmanager
def tmpdir(*args, **kwargs):