mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 14:58:38 +00:00
commands: view.go
bind G to "view.go -1" bind g to "view.go 0"
This commit is contained in:
parent
0b090f7ae1
commit
217addbf31
@ -244,18 +244,52 @@ class View(collections.Sequence):
|
||||
self._refilter()
|
||||
self.sig_store_refresh.send(self)
|
||||
|
||||
def add(self, f: mitmproxy.flow.Flow) -> None:
|
||||
def add(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
|
||||
"""
|
||||
Adds a flow to the state. If the flow already exists, it is
|
||||
ignored.
|
||||
"""
|
||||
if f.id not in self._store:
|
||||
self._store[f.id] = f
|
||||
if self.filter(f):
|
||||
self._base_add(f)
|
||||
if self.focus_follow:
|
||||
self.focus.flow = f
|
||||
self.sig_view_add.send(self, flow=f)
|
||||
for f in flows:
|
||||
if f.id not in self._store:
|
||||
self._store[f.id] = f
|
||||
if self.filter(f):
|
||||
self._base_add(f)
|
||||
if self.focus_follow:
|
||||
self.focus.flow = f
|
||||
self.sig_view_add.send(self, flow=f)
|
||||
|
||||
def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]:
|
||||
"""
|
||||
Get flow with the given id from the store.
|
||||
Returns None if the flow is not found.
|
||||
"""
|
||||
return self._store.get(flow_id)
|
||||
|
||||
@command.command("view.go")
|
||||
def go(self, dst: int) -> None:
|
||||
"""
|
||||
Go to a specified offset. Positive offests are from the beginning of
|
||||
the view, negative from the end of the view, so that 0 is the first
|
||||
flow, -1 is the last flow.
|
||||
"""
|
||||
if dst < 0:
|
||||
dst = len(self) + dst
|
||||
if dst < 0:
|
||||
dst = 0
|
||||
if dst > len(self) - 1:
|
||||
dst = len(self) - 1
|
||||
self.focus.flow = self[dst]
|
||||
|
||||
@command.command("view.duplicate")
|
||||
def duplicate(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
|
||||
"""
|
||||
Duplicates the specified flows, and sets the focus to the first
|
||||
duplicate.
|
||||
"""
|
||||
dups = [f.copy() for f in flows]
|
||||
if dups:
|
||||
self.add(dups)
|
||||
self.focus.flow = dups[0]
|
||||
|
||||
@command.command("view.remove")
|
||||
def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
|
||||
@ -272,35 +306,6 @@ class View(collections.Sequence):
|
||||
del self._store[f.id]
|
||||
self.sig_store_remove.send(self, flow=f)
|
||||
|
||||
def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]:
|
||||
"""
|
||||
Get flow with the given id from the store.
|
||||
Returns None if the flow is not found.
|
||||
"""
|
||||
return self._store.get(flow_id)
|
||||
|
||||
# Event handlers
|
||||
def configure(self, updated):
|
||||
if "view_filter" in updated:
|
||||
filt = None
|
||||
if ctx.options.view_filter:
|
||||
filt = flowfilter.parse(ctx.options.view_filter)
|
||||
if not filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid interception filter: %s" % ctx.options.view_filter
|
||||
)
|
||||
self.set_filter(filt)
|
||||
if "console_order" in updated:
|
||||
if ctx.options.console_order not in self.orders:
|
||||
raise exceptions.OptionsError(
|
||||
"Unknown flow order: %s" % ctx.options.console_order
|
||||
)
|
||||
self.set_order(self.orders[ctx.options.console_order])
|
||||
if "console_order_reversed" in updated:
|
||||
self.set_reversed(ctx.options.console_order_reversed)
|
||||
if "console_focus_follow" in updated:
|
||||
self.focus_follow = ctx.options.console_focus_follow
|
||||
|
||||
@command.command("view.resolve")
|
||||
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
|
||||
"""
|
||||
@ -324,8 +329,30 @@ class View(collections.Sequence):
|
||||
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
|
||||
return [i for i in self._store.values() if filt(i)]
|
||||
|
||||
# Event handlers
|
||||
def configure(self, updated):
|
||||
if "view_filter" in updated:
|
||||
filt = None
|
||||
if ctx.options.view_filter:
|
||||
filt = flowfilter.parse(ctx.options.view_filter)
|
||||
if not filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid interception filter: %s" % ctx.options.view_filter
|
||||
)
|
||||
self.set_filter(filt)
|
||||
if "console_order" in updated:
|
||||
if ctx.options.console_order not in self.orders:
|
||||
raise exceptions.OptionsError(
|
||||
"Unknown flow order: %s" % ctx.options.console_order
|
||||
)
|
||||
self.set_order(self.orders[ctx.options.console_order])
|
||||
if "console_order_reversed" in updated:
|
||||
self.set_reversed(ctx.options.console_order_reversed)
|
||||
if "console_focus_follow" in updated:
|
||||
self.focus_follow = ctx.options.console_focus_follow
|
||||
|
||||
def request(self, f):
|
||||
self.add(f)
|
||||
self.add([f])
|
||||
|
||||
def error(self, f):
|
||||
self.update([f])
|
||||
|
@ -109,6 +109,11 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
"""
|
||||
if argtype == str:
|
||||
return spec
|
||||
if argtype == int:
|
||||
try:
|
||||
return int(spec)
|
||||
except ValueError as e:
|
||||
raise exceptions.CommandError("Expected an integer, got %s." % spec)
|
||||
elif argtype == typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("view.resolve", [spec])
|
||||
elif argtype == flow.Flow:
|
||||
|
@ -150,11 +150,7 @@ class FlowItem(urwid.WidgetWrap):
|
||||
def keypress(self, xxx_todo_changeme, key):
|
||||
(maxcol,) = xxx_todo_changeme
|
||||
key = common.shortcuts(key)
|
||||
if key == "D":
|
||||
cp = self.flow.copy()
|
||||
self.master.view.add(cp)
|
||||
self.master.view.focus.flow = cp
|
||||
elif key == "m":
|
||||
if key == "m":
|
||||
self.flow.marked = not self.flow.marked
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "r":
|
||||
@ -316,12 +312,6 @@ class FlowListBox(urwid.ListBox):
|
||||
key = common.shortcuts(key)
|
||||
if key == "Z":
|
||||
self.master.view.clear_not_marked()
|
||||
elif key == "g":
|
||||
if len(self.master.view):
|
||||
self.master.view.focus.index = 0
|
||||
elif key == "G":
|
||||
if len(self.master.view):
|
||||
self.master.view.focus.index = len(self.master.view) - 1
|
||||
elif key == "L":
|
||||
signals.status_prompt_path.send(
|
||||
self,
|
||||
|
@ -150,7 +150,10 @@ def default_keymap(km):
|
||||
km.add("A", "flow.resume @all", context="flowlist")
|
||||
km.add("a", "flow.resume @focus", context="flowlist")
|
||||
km.add("d", "view.remove @focus", context="flowlist")
|
||||
km.add("D", "view.duplicate @focus", context="flowlist")
|
||||
km.add("F", "set console_focus_follow=toggle", context="flowlist")
|
||||
km.add("g", "view.go 0", context="flowlist")
|
||||
km.add("G", "view.go -1", context="flowlist")
|
||||
km.add("v", "set console_order_reversed=toggle", context="flowlist")
|
||||
km.add("f", "console.command 'set view_filter='", context="flowlist")
|
||||
km.add("e", "set console_eventlog=toggle", context="flowlist")
|
||||
|
@ -323,7 +323,7 @@ class FlowHandler(RequestHandler):
|
||||
class DuplicateFlow(RequestHandler):
|
||||
def post(self, flow_id):
|
||||
f = self.flow.copy()
|
||||
self.view.add(f)
|
||||
self.view.add([f])
|
||||
self.write(f.id)
|
||||
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
from mitmproxy.addons import core
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy import exceptions
|
||||
import pytest
|
||||
|
||||
@ -15,3 +16,13 @@ def test_set():
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
tctx.command(sa.set, "nonexistent")
|
||||
|
||||
|
||||
def test_resume():
|
||||
sa = core.Core()
|
||||
with taddons.context():
|
||||
f = tflow.tflow()
|
||||
assert not sa.resume([f])
|
||||
f.intercept()
|
||||
sa.resume([f])
|
||||
assert not f.reply.state == "taken"
|
||||
|
@ -4,7 +4,6 @@ from mitmproxy.test import tflow
|
||||
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import options
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.test import taddons
|
||||
|
||||
@ -26,9 +25,9 @@ def test_order_refresh():
|
||||
v.sig_view_refresh.connect(save)
|
||||
|
||||
tf = tflow.tflow(resp=True)
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(v, console_order="time")
|
||||
v.add(tf)
|
||||
v.add([tf])
|
||||
tf.request.timestamp_start = 1
|
||||
assert not sargs
|
||||
v.update([tf])
|
||||
@ -133,13 +132,13 @@ def test_filter():
|
||||
|
||||
def test_load():
|
||||
v = view.View()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(v)
|
||||
|
||||
|
||||
def test_resolve():
|
||||
v = view.View()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
assert tctx.command(v.resolve, "@all") == []
|
||||
assert tctx.command(v.resolve, "@focus") == []
|
||||
assert tctx.command(v.resolve, "@shown") == []
|
||||
@ -184,9 +183,46 @@ def test_resolve():
|
||||
tctx.command(v.resolve, "~")
|
||||
|
||||
|
||||
def test_go():
|
||||
v = view.View()
|
||||
with taddons.context():
|
||||
v.add([
|
||||
tflow.tflow(),
|
||||
tflow.tflow(),
|
||||
tflow.tflow(),
|
||||
tflow.tflow(),
|
||||
tflow.tflow(),
|
||||
])
|
||||
assert v.focus.index == 0
|
||||
v.go(-1)
|
||||
assert v.focus.index == 4
|
||||
v.go(0)
|
||||
assert v.focus.index == 0
|
||||
v.go(1)
|
||||
assert v.focus.index == 1
|
||||
v.go(999)
|
||||
assert v.focus.index == 4
|
||||
v.go(-999)
|
||||
assert v.focus.index == 0
|
||||
|
||||
|
||||
def test_duplicate():
|
||||
v = view.View()
|
||||
with taddons.context():
|
||||
f = [
|
||||
tflow.tflow(),
|
||||
tflow.tflow(),
|
||||
]
|
||||
v.add(f)
|
||||
assert len(v) == 2
|
||||
v.duplicate(f)
|
||||
assert len(v) == 4
|
||||
assert v.focus.index == 2
|
||||
|
||||
|
||||
def test_order():
|
||||
v = view.View()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
v.request(tft(method="get", start=1))
|
||||
v.request(tft(method="put", start=2))
|
||||
v.request(tft(method="get", start=3))
|
||||
@ -280,7 +316,7 @@ def test_signals():
|
||||
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
|
||||
|
||||
# Simple add
|
||||
v.add(tft())
|
||||
v.add([tft()])
|
||||
assert rec_add
|
||||
assert not any([rec_update, rec_remove, rec_refresh])
|
||||
|
||||
@ -317,22 +353,22 @@ def test_signals():
|
||||
|
||||
def test_focus_follow():
|
||||
v = view.View()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(v, console_focus_follow=True, view_filter="~m get")
|
||||
|
||||
v.add(tft(start=5))
|
||||
v.add([tft(start=5)])
|
||||
assert v.focus.index == 0
|
||||
|
||||
v.add(tft(start=4))
|
||||
v.add([tft(start=4)])
|
||||
assert v.focus.index == 0
|
||||
assert v.focus.flow.request.timestamp_start == 4
|
||||
|
||||
v.add(tft(start=7))
|
||||
v.add([tft(start=7)])
|
||||
assert v.focus.index == 2
|
||||
assert v.focus.flow.request.timestamp_start == 7
|
||||
|
||||
mod = tft(method="put", start=6)
|
||||
v.add(mod)
|
||||
v.add([mod])
|
||||
assert v.focus.index == 2
|
||||
assert v.focus.flow.request.timestamp_start == 7
|
||||
|
||||
@ -345,7 +381,7 @@ def test_focus_follow():
|
||||
def test_focus():
|
||||
# Special case - initialising with a view that already contains data
|
||||
v = view.View()
|
||||
v.add(tft())
|
||||
v.add([tft()])
|
||||
f = view.Focus(v)
|
||||
assert f.index is 0
|
||||
assert f.flow is v[0]
|
||||
@ -356,7 +392,7 @@ def test_focus():
|
||||
assert f.index is None
|
||||
assert f.flow is None
|
||||
|
||||
v.add(tft(start=1))
|
||||
v.add([tft(start=1)])
|
||||
assert f.index == 0
|
||||
assert f.flow is v[0]
|
||||
|
||||
@ -366,11 +402,11 @@ def test_focus():
|
||||
with pytest.raises(ValueError):
|
||||
f.__setattr__("index", 99)
|
||||
|
||||
v.add(tft(start=0))
|
||||
v.add([tft(start=0)])
|
||||
assert f.index == 1
|
||||
assert f.flow is v[1]
|
||||
|
||||
v.add(tft(start=2))
|
||||
v.add([tft(start=2)])
|
||||
assert f.index == 1
|
||||
assert f.flow is v[1]
|
||||
|
||||
@ -391,10 +427,12 @@ def test_focus():
|
||||
assert f.index is None
|
||||
assert f.flow is None
|
||||
|
||||
v.add(tft(method="get", start=0))
|
||||
v.add(tft(method="get", start=1))
|
||||
v.add(tft(method="put", start=2))
|
||||
v.add(tft(method="get", start=3))
|
||||
v.add([
|
||||
tft(method="get", start=0),
|
||||
tft(method="get", start=1),
|
||||
tft(method="put", start=2),
|
||||
tft(method="get", start=3),
|
||||
])
|
||||
|
||||
f.flow = v[2]
|
||||
assert f.flow.request.method == "PUT"
|
||||
@ -414,7 +452,7 @@ def test_settings():
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
v.settings[f]
|
||||
v.add(f)
|
||||
v.add([f])
|
||||
v.settings[f]["foo"] = "bar"
|
||||
assert v.settings[f]["foo"] == "bar"
|
||||
assert len(list(v.settings)) == 1
|
||||
@ -423,7 +461,7 @@ def test_settings():
|
||||
v.settings[f]
|
||||
assert not v.settings.keys()
|
||||
|
||||
v.add(f)
|
||||
v.add([f])
|
||||
v.settings[f]["foo"] = "bar"
|
||||
assert v.settings.keys()
|
||||
v.clear()
|
||||
@ -432,7 +470,7 @@ def test_settings():
|
||||
|
||||
def test_configure():
|
||||
v = view.View()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(v, view_filter="~q")
|
||||
with pytest.raises(Exception, match="Invalid interception filter"):
|
||||
tctx.configure(v, view_filter="~~")
|
||||
|
@ -76,6 +76,10 @@ def test_parsearg():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
|
||||
assert command.parsearg(tctx.master.commands, "1", int) == 1
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", int)
|
||||
|
||||
assert len(command.parsearg(
|
||||
tctx.master.commands, "2", typing.Sequence[flow.Flow]
|
||||
)) == 2
|
||||
|
@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False)
|
||||
f = tflow.tflow(resp=True)
|
||||
f.id = "42"
|
||||
m.view.add(f)
|
||||
m.view.add(tflow.tflow(err=True))
|
||||
m.view.add([f])
|
||||
m.view.add([tflow.tflow(err=True)])
|
||||
m.add_log("test log", "info")
|
||||
self.master = m
|
||||
self.view = m.view
|
||||
@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
|
||||
# restore
|
||||
for f in flows:
|
||||
self.view.add(f)
|
||||
self.view.add([f])
|
||||
self.events.data = events
|
||||
|
||||
def test_resume(self):
|
||||
@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
assert self.fetch("/flows/42", method="DELETE").code == 200
|
||||
|
||||
assert not self.view.get_by_id("42")
|
||||
self.view.add(f)
|
||||
self.view.add([f])
|
||||
|
||||
assert self.fetch("/flows/1234", method="DELETE").code == 404
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user