Merge pull request #2283 from cortesi/cmdall

Commands, core update event
This commit is contained in:
Aldo Cortesi 2017-04-29 09:14:22 +12:00 committed by GitHub
commit 139c4e6db3
11 changed files with 251 additions and 158 deletions

View File

@ -1,6 +1,9 @@
import typing
from mitmproxy import ctx from mitmproxy import ctx
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import command from mitmproxy import command
from mitmproxy import flow
class Core: class Core:
@ -16,3 +19,13 @@ class Core:
ctx.options.set(spec) ctx.options.set(spec)
except exceptions.OptionsError as e: except exceptions.OptionsError as e:
raise exceptions.CommandError(e) from e raise exceptions.CommandError(e) from e
@command.command("flow.resume")
def resume(self, flows: typing.Sequence[flow.Flow]) -> None:
"""
Resume flows if they are intercepted.
"""
intercepted = [i for i in flows if i.intercepted]
for f in intercepted:
f.resume()
ctx.master.addons.trigger("update", intercepted)

View File

@ -18,6 +18,7 @@ import sortedcontainers
import mitmproxy.flow import mitmproxy.flow
from mitmproxy import flowfilter from mitmproxy import flowfilter
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import ctx from mitmproxy import ctx
from mitmproxy import http # noqa from mitmproxy import http # noqa
@ -223,7 +224,7 @@ class View(collections.Sequence):
self.filter = flt or matchall self.filter = flt or matchall
self._refilter() self._refilter()
def clear(self): def clear(self) -> None:
""" """
Clears both the store and view. Clears both the store and view.
""" """
@ -243,11 +244,12 @@ class View(collections.Sequence):
self._refilter() self._refilter()
self.sig_store_refresh.send(self) self.sig_store_refresh.send(self)
def add(self, f: mitmproxy.flow.Flow) -> None: def add(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
""" """
Adds a flow to the state. If the flow already exists, it is Adds a flow to the state. If the flow already exists, it is
ignored. ignored.
""" """
for f in flows:
if f.id not in self._store: if f.id not in self._store:
self._store[f.id] = f self._store[f.id] = f
if self.filter(f): if self.filter(f):
@ -256,43 +258,6 @@ class View(collections.Sequence):
self.focus.flow = f self.focus.flow = f
self.sig_view_add.send(self, flow=f) self.sig_view_add.send(self, flow=f)
def remove(self, f: mitmproxy.flow.Flow):
"""
Removes the flow from the underlying store and the view.
"""
if f.id in self._store:
if f in self._view:
self._view.remove(f)
self.sig_view_remove.send(self, flow=f)
del self._store[f.id]
self.sig_store_remove.send(self, flow=f)
def update(self, f: mitmproxy.flow.Flow):
"""
Updates a flow. If the flow is not in the state, it's ignored.
"""
if f.id in self._store:
if self.filter(f):
if f not in self._view:
self._base_add(f)
if self.focus_follow:
self.focus.flow = f
self.sig_view_add.send(self, flow=f)
else:
# This is a tad complicated. The sortedcontainers
# implementation assumes that the order key is stable. If
# it changes mid-way Very Bad Things happen. We detect when
# this happens, and re-fresh the item.
self.order_key.refresh(f)
self.sig_view_update.send(self, flow=f)
else:
try:
self._view.remove(f)
self.sig_view_remove.send(self, flow=f)
except ValueError:
# The value was not in the view
pass
def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]: def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]:
""" """
Get flow with the given id from the store. Get flow with the given id from the store.
@ -300,6 +265,70 @@ class View(collections.Sequence):
""" """
return self._store.get(flow_id) return self._store.get(flow_id)
@command.command("view.go")
def go(self, dst: int) -> None:
"""
Go to a specified offset. Positive offests are from the beginning of
the view, negative from the end of the view, so that 0 is the first
flow, -1 is the last flow.
"""
if dst < 0:
dst = len(self) + dst
if dst < 0:
dst = 0
if dst > len(self) - 1:
dst = len(self) - 1
self.focus.flow = self[dst]
@command.command("view.duplicate")
def duplicate(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
"""
Duplicates the specified flows, and sets the focus to the first
duplicate.
"""
dups = [f.copy() for f in flows]
if dups:
self.add(dups)
self.focus.flow = dups[0]
@command.command("view.remove")
def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
"""
Removes the flow from the underlying store and the view.
"""
for f in flows:
if f.id in self._store:
if f.killable:
f.kill()
if f in self._view:
self._view.remove(f)
self.sig_view_remove.send(self, flow=f)
del self._store[f.id]
self.sig_store_remove.send(self, flow=f)
@command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
if spec == "@all":
return [i for i in self._store.values()]
if spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
elif spec == "@shown":
return [i for i in self]
elif spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
elif spec == "@marked":
return [i for i in self._store.values() if i.marked]
elif spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
filt = flowfilter.parse(spec)
if not filt:
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
return [i for i in self._store.values() if filt(i)]
# Event handlers # Event handlers
def configure(self, updated): def configure(self, updated):
if "view_filter" in updated: if "view_filter" in updated:
@ -322,46 +351,50 @@ class View(collections.Sequence):
if "console_focus_follow" in updated: if "console_focus_follow" in updated:
self.focus_follow = ctx.options.console_focus_follow self.focus_follow = ctx.options.console_focus_follow
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
if spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
elif spec == "@shown":
return [i for i in self]
elif spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
elif spec == "@marked":
return [i for i in self._store.values() if i.marked]
elif spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
filt = flowfilter.parse(spec)
if not filt:
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
return [i for i in self._store.values() if filt(i)]
def load(self, l):
l.add_command("console.resolve", self.resolve)
def request(self, f): def request(self, f):
self.add(f) self.add([f])
def error(self, f): def error(self, f):
self.update(f) self.update([f])
def response(self, f): def response(self, f):
self.update(f) self.update([f])
def intercept(self, f): def intercept(self, f):
self.update(f) self.update([f])
def resume(self, f): def resume(self, f):
self.update(f) self.update([f])
def kill(self, f): def kill(self, f):
self.update(f) self.update([f])
def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
"""
Updates a list of flows. If flow is not in the state, it's ignored.
"""
for f in flows:
if f.id in self._store:
if self.filter(f):
if f not in self._view:
self._base_add(f)
if self.focus_follow:
self.focus.flow = f
self.sig_view_add.send(self, flow=f)
else:
# This is a tad complicated. The sortedcontainers
# implementation assumes that the order key is stable. If
# it changes mid-way Very Bad Things happen. We detect when
# this happens, and re-fresh the item.
self.order_key.refresh(f)
self.sig_view_update.send(self, flow=f)
else:
try:
self._view.remove(f)
self.sig_view_remove.send(self, flow=f)
except ValueError:
# The value was not in the view
pass
class Focus: class Focus:

View File

@ -109,10 +109,15 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
""" """
if argtype == str: if argtype == str:
return spec return spec
if argtype == int:
try:
return int(spec)
except ValueError as e:
raise exceptions.CommandError("Expected an integer, got %s." % spec)
elif argtype == typing.Sequence[flow.Flow]: elif argtype == typing.Sequence[flow.Flow]:
return manager.call_args("console.resolve", [spec]) return manager.call_args("view.resolve", [spec])
elif argtype == flow.Flow: elif argtype == flow.Flow:
flows = manager.call_args("console.resolve", [spec]) flows = manager.call_args("view.resolve", [spec])
if len(flows) != 1: if len(flows) != 1:
raise exceptions.CommandError( raise exceptions.CommandError(
"Command requires one flow, specification matched %s." % len(flows) "Command requires one flow, specification matched %s." % len(flows)

View File

@ -35,6 +35,7 @@ Events = frozenset([
"load", "load",
"running", "running",
"tick", "tick",
"update",
]) ])

View File

@ -150,18 +150,7 @@ class FlowItem(urwid.WidgetWrap):
def keypress(self, xxx_todo_changeme, key): def keypress(self, xxx_todo_changeme, key):
(maxcol,) = xxx_todo_changeme (maxcol,) = xxx_todo_changeme
key = common.shortcuts(key) key = common.shortcuts(key)
if key == "a": if key == "m":
self.flow.resume()
self.master.view.update(self.flow)
elif key == "d":
if self.flow.killable:
self.flow.kill()
self.master.view.remove(self.flow)
elif key == "D":
cp = self.flow.copy()
self.master.view.add(cp)
self.master.view.focus.flow = cp
elif key == "m":
self.flow.marked = not self.flow.marked self.flow.marked = not self.flow.marked
signals.flowlist_change.send(self) signals.flowlist_change.send(self)
elif key == "r": elif key == "r":
@ -222,14 +211,14 @@ class FlowItem(urwid.WidgetWrap):
callback = common.export_to_clip_or_file, callback = common.export_to_clip_or_file,
args = (None, self.flow, common.ask_save_path) args = (None, self.flow, common.ask_save_path)
) )
elif key == "C": # elif key == "C":
signals.status_prompt_onekey.send( # signals.status_prompt_onekey.send(
self, # self,
prompt = "Export to clipboard", # prompt = "Export to clipboard",
keys = [(e[0], e[1]) for e in export.EXPORTERS], # keys = [(e[0], e[1]) for e in export.EXPORTERS],
callback = common.export_to_clip_or_file, # callback = common.export_to_clip_or_file,
args = (None, self.flow, common.copy_to_clipboard_or_prompt) # args = (None, self.flow, common.copy_to_clipboard_or_prompt)
) # )
elif key == "b": elif key == "b":
common.ask_save_body(None, self.flow) common.ask_save_body(None, self.flow)
else: else:
@ -321,21 +310,8 @@ class FlowListBox(urwid.ListBox):
def keypress(self, size, key): def keypress(self, size, key):
key = common.shortcuts(key) key = common.shortcuts(key)
if key == "A": if key == "Z":
for f in self.master.view:
if f.intercepted:
f.resume()
self.master.view.update(f)
elif key == "z":
self.master.view.clear()
elif key == "Z":
self.master.view.clear_not_marked() self.master.view.clear_not_marked()
elif key == "g":
if len(self.master.view):
self.master.view.focus.index = 0
elif key == "G":
if len(self.master.view):
self.master.view.focus.index = len(self.master.view) - 1
elif key == "L": elif key == "L":
signals.status_prompt_path.send( signals.status_prompt_path.send(
self, self,

View File

@ -147,11 +147,18 @@ def default_keymap(km):
km.add("i", "console.command 'set intercept='") km.add("i", "console.command 'set intercept='")
km.add("W", "console.command 'set save_stream_file='") km.add("W", "console.command 'set save_stream_file='")
km.add("A", "flow.resume @all", context="flowlist")
km.add("a", "flow.resume @focus", context="flowlist")
km.add("d", "view.remove @focus", context="flowlist")
km.add("D", "view.duplicate @focus", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist") km.add("F", "set console_focus_follow=toggle", context="flowlist")
km.add("g", "view.go 0", context="flowlist")
km.add("G", "view.go -1", context="flowlist")
km.add("v", "set console_order_reversed=toggle", context="flowlist") km.add("v", "set console_order_reversed=toggle", context="flowlist")
km.add("f", "console.command 'set view_filter='", context="flowlist") km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("e", "set console_eventlog=toggle", context="flowlist") km.add("e", "set console_eventlog=toggle", context="flowlist")
km.add("w", "console.command 'save.file @shown '", context="flowlist") km.add("w", "console.command 'save.file @shown '", context="flowlist")
km.add("z", "view.remove @all", context="flowlist")
km.add("enter", "console.view.flow @focus", context="flowlist") km.add("enter", "console.view.flow @focus", context="flowlist")

View File

@ -246,7 +246,7 @@ class ResumeFlows(RequestHandler):
def post(self): def post(self):
for f in self.view: for f in self.view:
f.resume() f.resume()
self.view.update(f) self.view.update([f])
class KillFlows(RequestHandler): class KillFlows(RequestHandler):
@ -254,27 +254,27 @@ class KillFlows(RequestHandler):
for f in self.view: for f in self.view:
if f.killable: if f.killable:
f.kill() f.kill()
self.view.update(f) self.view.update([f])
class ResumeFlow(RequestHandler): class ResumeFlow(RequestHandler):
def post(self, flow_id): def post(self, flow_id):
self.flow.resume() self.flow.resume()
self.view.update(self.flow) self.view.update([self.flow])
class KillFlow(RequestHandler): class KillFlow(RequestHandler):
def post(self, flow_id): def post(self, flow_id):
if self.flow.killable: if self.flow.killable:
self.flow.kill() self.flow.kill()
self.view.update(self.flow) self.view.update([self.flow])
class FlowHandler(RequestHandler): class FlowHandler(RequestHandler):
def delete(self, flow_id): def delete(self, flow_id):
if self.flow.killable: if self.flow.killable:
self.flow.kill() self.flow.kill()
self.view.remove(self.flow) self.view.remove([self.flow])
def put(self, flow_id): def put(self, flow_id):
flow = self.flow flow = self.flow
@ -317,13 +317,13 @@ class FlowHandler(RequestHandler):
except APIError: except APIError:
flow.revert() flow.revert()
raise raise
self.view.update(flow) self.view.update([flow])
class DuplicateFlow(RequestHandler): class DuplicateFlow(RequestHandler):
def post(self, flow_id): def post(self, flow_id):
f = self.flow.copy() f = self.flow.copy()
self.view.add(f) self.view.add([f])
self.write(f.id) self.write(f.id)
@ -331,14 +331,14 @@ class RevertFlow(RequestHandler):
def post(self, flow_id): def post(self, flow_id):
if self.flow.modified(): if self.flow.modified():
self.flow.revert() self.flow.revert()
self.view.update(self.flow) self.view.update([self.flow])
class ReplayFlow(RequestHandler): class ReplayFlow(RequestHandler):
def post(self, flow_id): def post(self, flow_id):
self.flow.backup() self.flow.backup()
self.flow.response = None self.flow.response = None
self.view.update(self.flow) self.view.update([self.flow])
try: try:
self.master.replay_request(self.flow) self.master.replay_request(self.flow)
@ -351,7 +351,7 @@ class FlowContent(RequestHandler):
self.flow.backup() self.flow.backup()
message = getattr(self.flow, message) message = getattr(self.flow, message)
message.content = self.filecontents message.content = self.filecontents
self.view.update(self.flow) self.view.update([self.flow])
def get(self, flow_id, message): def get(self, flow_id, message):
message = getattr(self.flow, message) message = getattr(self.flow, message)

View File

@ -1,5 +1,6 @@
from mitmproxy.addons import core from mitmproxy.addons import core
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import exceptions from mitmproxy import exceptions
import pytest import pytest
@ -15,3 +16,13 @@ def test_set():
with pytest.raises(exceptions.CommandError): with pytest.raises(exceptions.CommandError):
tctx.command(sa.set, "nonexistent") tctx.command(sa.set, "nonexistent")
def test_resume():
sa = core.Core()
with taddons.context():
f = tflow.tflow()
assert not sa.resume([f])
f.intercept()
sa.resume([f])
assert not f.reply.state == "taken"

View File

@ -4,7 +4,6 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view from mitmproxy.addons import view
from mitmproxy import flowfilter from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.test import taddons from mitmproxy.test import taddons
@ -26,12 +25,12 @@ def test_order_refresh():
v.sig_view_refresh.connect(save) v.sig_view_refresh.connect(save)
tf = tflow.tflow(resp=True) tf = tflow.tflow(resp=True)
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
tctx.configure(v, console_order="time") tctx.configure(v, console_order="time")
v.add(tf) v.add([tf])
tf.request.timestamp_start = 1 tf.request.timestamp_start = 1
assert not sargs assert not sargs
v.update(tf) v.update([tf])
assert sargs assert sargs
@ -133,13 +132,14 @@ def test_filter():
def test_load(): def test_load():
v = view.View() v = view.View()
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
tctx.master.addons.add(v) tctx.master.addons.add(v)
def test_resolve(): def test_resolve():
v = view.View() v = view.View()
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
assert tctx.command(v.resolve, "@all") == []
assert tctx.command(v.resolve, "@focus") == [] assert tctx.command(v.resolve, "@focus") == []
assert tctx.command(v.resolve, "@shown") == [] assert tctx.command(v.resolve, "@shown") == []
assert tctx.command(v.resolve, "@hidden") == [] assert tctx.command(v.resolve, "@hidden") == []
@ -149,6 +149,7 @@ def test_resolve():
v.request(tft(method="get")) v.request(tft(method="get"))
assert len(tctx.command(v.resolve, "~m get")) == 1 assert len(tctx.command(v.resolve, "~m get")) == 1
assert len(tctx.command(v.resolve, "@focus")) == 1 assert len(tctx.command(v.resolve, "@focus")) == 1
assert len(tctx.command(v.resolve, "@all")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 1 assert len(tctx.command(v.resolve, "@shown")) == 1
assert len(tctx.command(v.resolve, "@unmarked")) == 1 assert len(tctx.command(v.resolve, "@unmarked")) == 1
assert tctx.command(v.resolve, "@hidden") == [] assert tctx.command(v.resolve, "@hidden") == []
@ -156,6 +157,7 @@ def test_resolve():
v.request(tft(method="put")) v.request(tft(method="put"))
assert len(tctx.command(v.resolve, "@focus")) == 1 assert len(tctx.command(v.resolve, "@focus")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 2 assert len(tctx.command(v.resolve, "@shown")) == 2
assert len(tctx.command(v.resolve, "@all")) == 2
assert tctx.command(v.resolve, "@hidden") == [] assert tctx.command(v.resolve, "@hidden") == []
assert tctx.command(v.resolve, "@marked") == [] assert tctx.command(v.resolve, "@marked") == []
@ -175,14 +177,52 @@ def test_resolve():
assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"] assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
assert m(tctx.command(v.resolve, "@marked")) == ["GET"] assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"] assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
assert m(tctx.command(v.resolve, "@all")) == ["GET", "PUT", "GET", "PUT"]
with pytest.raises(exceptions.CommandError, match="Invalid flow filter"): with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
tctx.command(v.resolve, "~") tctx.command(v.resolve, "~")
def test_go():
v = view.View()
with taddons.context():
v.add([
tflow.tflow(),
tflow.tflow(),
tflow.tflow(),
tflow.tflow(),
tflow.tflow(),
])
assert v.focus.index == 0
v.go(-1)
assert v.focus.index == 4
v.go(0)
assert v.focus.index == 0
v.go(1)
assert v.focus.index == 1
v.go(999)
assert v.focus.index == 4
v.go(-999)
assert v.focus.index == 0
def test_duplicate():
v = view.View()
with taddons.context():
f = [
tflow.tflow(),
tflow.tflow(),
]
v.add(f)
assert len(v) == 2
v.duplicate(f)
assert len(v) == 4
assert v.focus.index == 2
def test_order(): def test_order():
v = view.View() v = view.View()
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
v.request(tft(method="get", start=1)) v.request(tft(method="get", start=1))
v.request(tft(method="put", start=2)) v.request(tft(method="put", start=2))
v.request(tft(method="get", start=3)) v.request(tft(method="get", start=3))
@ -230,14 +270,14 @@ def test_update():
assert f in v assert f in v
f.request.method = "put" f.request.method = "put"
v.update(f) v.update([f])
assert f not in v assert f not in v
f.request.method = "get" f.request.method = "get"
v.update(f) v.update([f])
assert f in v assert f in v
v.update(f) v.update([f])
assert f in v assert f in v
@ -276,7 +316,7 @@ def test_signals():
assert not any([rec_add, rec_update, rec_remove, rec_refresh]) assert not any([rec_add, rec_update, rec_remove, rec_refresh])
# Simple add # Simple add
v.add(tft()) v.add([tft()])
assert rec_add assert rec_add
assert not any([rec_update, rec_remove, rec_refresh]) assert not any([rec_update, rec_remove, rec_refresh])
@ -291,14 +331,14 @@ def test_signals():
# An update that results in a flow being added to the view # An update that results in a flow being added to the view
clearrec() clearrec()
v[0].request.method = "PUT" v[0].request.method = "PUT"
v.update(v[0]) v.update([v[0]])
assert rec_remove assert rec_remove
assert not any([rec_update, rec_refresh, rec_add]) assert not any([rec_update, rec_refresh, rec_add])
# An update that does not affect the view just sends update # An update that does not affect the view just sends update
v.set_filter(flowfilter.parse("~m put")) v.set_filter(flowfilter.parse("~m put"))
clearrec() clearrec()
v.update(v[0]) v.update([v[0]])
assert rec_update assert rec_update
assert not any([rec_remove, rec_refresh, rec_add]) assert not any([rec_remove, rec_refresh, rec_add])
@ -307,33 +347,33 @@ def test_signals():
v.set_filter(flowfilter.parse("~m get")) v.set_filter(flowfilter.parse("~m get"))
assert not len(v) assert not len(v)
clearrec() clearrec()
v.update(f) v.update([f])
assert not any([rec_add, rec_update, rec_remove, rec_refresh]) assert not any([rec_add, rec_update, rec_remove, rec_refresh])
def test_focus_follow(): def test_focus_follow():
v = view.View() v = view.View()
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
tctx.configure(v, console_focus_follow=True, view_filter="~m get") tctx.configure(v, console_focus_follow=True, view_filter="~m get")
v.add(tft(start=5)) v.add([tft(start=5)])
assert v.focus.index == 0 assert v.focus.index == 0
v.add(tft(start=4)) v.add([tft(start=4)])
assert v.focus.index == 0 assert v.focus.index == 0
assert v.focus.flow.request.timestamp_start == 4 assert v.focus.flow.request.timestamp_start == 4
v.add(tft(start=7)) v.add([tft(start=7)])
assert v.focus.index == 2 assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7 assert v.focus.flow.request.timestamp_start == 7
mod = tft(method="put", start=6) mod = tft(method="put", start=6)
v.add(mod) v.add([mod])
assert v.focus.index == 2 assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 7 assert v.focus.flow.request.timestamp_start == 7
mod.request.method = "GET" mod.request.method = "GET"
v.update(mod) v.update([mod])
assert v.focus.index == 2 assert v.focus.index == 2
assert v.focus.flow.request.timestamp_start == 6 assert v.focus.flow.request.timestamp_start == 6
@ -341,7 +381,7 @@ def test_focus_follow():
def test_focus(): def test_focus():
# Special case - initialising with a view that already contains data # Special case - initialising with a view that already contains data
v = view.View() v = view.View()
v.add(tft()) v.add([tft()])
f = view.Focus(v) f = view.Focus(v)
assert f.index is 0 assert f.index is 0
assert f.flow is v[0] assert f.flow is v[0]
@ -352,7 +392,7 @@ def test_focus():
assert f.index is None assert f.index is None
assert f.flow is None assert f.flow is None
v.add(tft(start=1)) v.add([tft(start=1)])
assert f.index == 0 assert f.index == 0
assert f.flow is v[0] assert f.flow is v[0]
@ -362,11 +402,11 @@ def test_focus():
with pytest.raises(ValueError): with pytest.raises(ValueError):
f.__setattr__("index", 99) f.__setattr__("index", 99)
v.add(tft(start=0)) v.add([tft(start=0)])
assert f.index == 1 assert f.index == 1
assert f.flow is v[1] assert f.flow is v[1]
v.add(tft(start=2)) v.add([tft(start=2)])
assert f.index == 1 assert f.index == 1
assert f.flow is v[1] assert f.flow is v[1]
@ -374,22 +414,25 @@ def test_focus():
assert f.index == 0 assert f.index == 0
f.index = 1 f.index = 1
v.remove(v[1]) v.remove([v[1]])
v[1].intercept()
assert f.index == 1 assert f.index == 1
assert f.flow is v[1] assert f.flow is v[1]
v.remove(v[1]) v.remove([v[1]])
assert f.index == 0 assert f.index == 0
assert f.flow is v[0] assert f.flow is v[0]
v.remove(v[0]) v.remove([v[0]])
assert f.index is None assert f.index is None
assert f.flow is None assert f.flow is None
v.add(tft(method="get", start=0)) v.add([
v.add(tft(method="get", start=1)) tft(method="get", start=0),
v.add(tft(method="put", start=2)) tft(method="get", start=1),
v.add(tft(method="get", start=3)) tft(method="put", start=2),
tft(method="get", start=3),
])
f.flow = v[2] f.flow = v[2]
assert f.flow.request.method == "PUT" assert f.flow.request.method == "PUT"
@ -409,16 +452,16 @@ def test_settings():
with pytest.raises(KeyError): with pytest.raises(KeyError):
v.settings[f] v.settings[f]
v.add(f) v.add([f])
v.settings[f]["foo"] = "bar" v.settings[f]["foo"] = "bar"
assert v.settings[f]["foo"] == "bar" assert v.settings[f]["foo"] == "bar"
assert len(list(v.settings)) == 1 assert len(list(v.settings)) == 1
v.remove(f) v.remove([f])
with pytest.raises(KeyError): with pytest.raises(KeyError):
v.settings[f] v.settings[f]
assert not v.settings.keys() assert not v.settings.keys()
v.add(f) v.add([f])
v.settings[f]["foo"] = "bar" v.settings[f]["foo"] = "bar"
assert v.settings.keys() assert v.settings.keys()
v.clear() v.clear()
@ -427,7 +470,7 @@ def test_settings():
def test_configure(): def test_configure():
v = view.View() v = view.View()
with taddons.context(options=options.Options()) as tctx: with taddons.context() as tctx:
tctx.configure(v, view_filter="~q") tctx.configure(v, view_filter="~q")
with pytest.raises(Exception, match="Invalid interception filter"): with pytest.raises(Exception, match="Invalid interception filter"):
tctx.configure(v, view_filter="~~") tctx.configure(v, view_filter="~~")

View File

@ -65,7 +65,7 @@ def test_typename():
class DummyConsole: class DummyConsole:
def load(self, l): def load(self, l):
l.add_command("console.resolve", self.resolve) l.add_command("view.resolve", self.resolve)
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec) n = int(spec)
@ -76,6 +76,10 @@ def test_parsearg():
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole()) tctx.master.addons.add(DummyConsole())
assert command.parsearg(tctx.master.commands, "foo", str) == "foo" assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
assert command.parsearg(tctx.master.commands, "1", int) == 1
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", int)
assert len(command.parsearg( assert len(command.parsearg(
tctx.master.commands, "2", typing.Sequence[flow.Flow] tctx.master.commands, "2", typing.Sequence[flow.Flow]
)) == 2 )) == 2

View File

@ -23,8 +23,8 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False) m = webmaster.WebMaster(o, proxy.DummyServer(), with_termlog=False)
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
f.id = "42" f.id = "42"
m.view.add(f) m.view.add([f])
m.view.add(tflow.tflow(err=True)) m.view.add([tflow.tflow(err=True)])
m.add_log("test log", "info") m.add_log("test log", "info")
self.master = m self.master = m
self.view = m.view self.view = m.view
@ -78,7 +78,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
# restore # restore
for f in flows: for f in flows:
self.view.add(f) self.view.add([f])
self.events.data = events self.events.data = events
def test_resume(self): def test_resume(self):
@ -110,7 +110,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
assert self.fetch("/flows/42", method="DELETE").code == 200 assert self.fetch("/flows/42", method="DELETE").code == 200
assert not self.view.get_by_id("42") assert not self.view.get_by_id("42")
self.view.add(f) self.view.add([f])
assert self.fetch("/flows/1234", method="DELETE").code == 404 assert self.fetch("/flows/1234", method="DELETE").code == 404
@ -162,7 +162,7 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
f = self.view.get_by_id(resp.body.decode()) f = self.view.get_by_id(resp.body.decode())
assert f assert f
assert f.id != "42" assert f.id != "42"
self.view.remove(f) self.view.remove([f])
def test_flow_revert(self): def test_flow_revert(self):
f = self.view.get_by_id("42") f = self.view.get_by_id("42")