mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
Merge pull request #2714 from cortesi/commander1
commander: highlight commands
This commit is contained in:
commit
4f32b835f7
@ -133,9 +133,12 @@ class CommandManager(mitmproxy.types._CommandBase):
|
||||
def add(self, path: str, func: typing.Callable):
|
||||
self.commands[path] = Command(self, path, func)
|
||||
|
||||
def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]:
|
||||
def parse_partial(
|
||||
self,
|
||||
cmdstr: str
|
||||
) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
|
||||
"""
|
||||
Parse a possibly partial command. Return a sequence of (part, type) tuples.
|
||||
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
|
||||
"""
|
||||
buf = io.StringIO(cmdstr)
|
||||
parts = [] # type: typing.List[str]
|
||||
@ -188,7 +191,13 @@ class CommandManager(mitmproxy.types._CommandBase):
|
||||
valid=valid,
|
||||
)
|
||||
)
|
||||
return parse
|
||||
|
||||
remhelp = [] # type: typing.List[str]
|
||||
for x in params:
|
||||
remt = mitmproxy.types.CommandTypes.get(x, None)
|
||||
remhelp.append(remt.display)
|
||||
|
||||
return parse, remhelp
|
||||
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
|
||||
"""
|
||||
|
@ -50,9 +50,9 @@ CompletionState = typing.NamedTuple(
|
||||
class CommandBuffer():
|
||||
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
|
||||
self.master = master
|
||||
self.buf = start
|
||||
self.text = self.flatten(start)
|
||||
# Cursor is always within the range [0:len(buffer)].
|
||||
self._cursor = len(self.buf)
|
||||
self._cursor = len(self.text)
|
||||
self.completion = None # type: CompletionState
|
||||
|
||||
@property
|
||||
@ -63,13 +63,25 @@ class CommandBuffer():
|
||||
def cursor(self, x) -> None:
|
||||
if x < 0:
|
||||
self._cursor = 0
|
||||
elif x > len(self.buf):
|
||||
self._cursor = len(self.buf)
|
||||
elif x > len(self.text):
|
||||
self._cursor = len(self.text)
|
||||
else:
|
||||
self._cursor = x
|
||||
|
||||
def render(self):
|
||||
return self.buf
|
||||
parts, _ = self.master.commands.parse_partial(self.text)
|
||||
ret = []
|
||||
for p in parts:
|
||||
if p.type == mitmproxy.types.Cmd and p.valid:
|
||||
ret.append(("title", p.value))
|
||||
else:
|
||||
ret.append(("text", p.value))
|
||||
ret.append(("text", " "))
|
||||
return ret
|
||||
|
||||
def flatten(self, txt):
|
||||
parts, _ = self.master.commands.parse_partial(txt)
|
||||
return " ".join([x.value for x in parts])
|
||||
|
||||
def left(self) -> None:
|
||||
self.cursor = self.cursor - 1
|
||||
@ -79,7 +91,7 @@ class CommandBuffer():
|
||||
|
||||
def cycle_completion(self) -> None:
|
||||
if not self.completion:
|
||||
parts = self.master.commands.parse_partial(self.buf[:self.cursor])
|
||||
parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor])
|
||||
last = parts[-1]
|
||||
ct = mitmproxy.types.CommandTypes.get(last.type, None)
|
||||
if ct:
|
||||
@ -94,13 +106,13 @@ class CommandBuffer():
|
||||
nxt = self.completion.completer.cycle()
|
||||
buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
|
||||
buf = buf.strip()
|
||||
self.buf = buf
|
||||
self.cursor = len(self.buf)
|
||||
self.text = self.flatten(buf)
|
||||
self.cursor = len(self.text)
|
||||
|
||||
def backspace(self) -> None:
|
||||
if self.cursor == 0:
|
||||
return
|
||||
self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:]
|
||||
self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:])
|
||||
self.cursor = self.cursor - 1
|
||||
self.completion = None
|
||||
|
||||
@ -108,7 +120,7 @@ class CommandBuffer():
|
||||
"""
|
||||
Inserts text at the cursor.
|
||||
"""
|
||||
self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:]
|
||||
self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:])
|
||||
self.cursor += 1
|
||||
self.completion = None
|
||||
|
||||
@ -152,4 +164,4 @@ class CommandEdit(urwid.WidgetWrap):
|
||||
return x, y
|
||||
|
||||
def get_value(self):
|
||||
return self.cbuf.buf
|
||||
return self.cbuf.text
|
||||
|
@ -174,6 +174,8 @@ class _CmdType(_BaseType):
|
||||
return list(manager.commands.keys())
|
||||
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
|
||||
if s not in manager.commands:
|
||||
raise exceptions.TypeError("Unknown command: %s" % s)
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
@ -316,7 +318,10 @@ class _FlowType(_BaseFlowType):
|
||||
display = "flow"
|
||||
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
|
||||
flows = manager.call_args("view.resolve", [s])
|
||||
try:
|
||||
flows = manager.call_args("view.resolve", [s])
|
||||
except exceptions.CommandError as e:
|
||||
raise exceptions.TypeError from e
|
||||
if len(flows) != 1:
|
||||
raise exceptions.TypeError(
|
||||
"Command requires one flow, specification matched %s." % len(flows)
|
||||
@ -332,7 +337,10 @@ class _FlowsType(_BaseFlowType):
|
||||
display = "[flow]"
|
||||
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("view.resolve", [s])
|
||||
try:
|
||||
return manager.call_args("view.resolve", [s])
|
||||
except exceptions.CommandError as e:
|
||||
raise exceptions.TypeError from e
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
try:
|
||||
|
@ -78,39 +78,54 @@ class TestCommand:
|
||||
[
|
||||
"foo bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = False),
|
||||
command.ParseResult(value = "bar", type = str, valid = True)
|
||||
],
|
||||
[],
|
||||
],
|
||||
[
|
||||
"foo 'bar",
|
||||
"cmd1 'bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "'bar", type = str, valid = True)
|
||||
]
|
||||
],
|
||||
[],
|
||||
],
|
||||
[
|
||||
"a",
|
||||
[command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)],
|
||||
[],
|
||||
],
|
||||
[
|
||||
"",
|
||||
[command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)],
|
||||
[]
|
||||
],
|
||||
["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = True)]],
|
||||
["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = True)]],
|
||||
[
|
||||
"cmd3 1",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "1", type = int, valid = True),
|
||||
]
|
||||
],
|
||||
[]
|
||||
],
|
||||
[
|
||||
"cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = int, valid = False),
|
||||
]
|
||||
],
|
||||
[]
|
||||
],
|
||||
[
|
||||
"subcommand ",
|
||||
[
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = True),
|
||||
]
|
||||
command.ParseResult(
|
||||
value = "subcommand", type = mitmproxy.types.Cmd, valid = True,
|
||||
),
|
||||
command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False),
|
||||
],
|
||||
["arg"],
|
||||
],
|
||||
[
|
||||
"subcommand cmd3 ",
|
||||
@ -118,13 +133,16 @@ class TestCommand:
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = int, valid = False),
|
||||
]
|
||||
],
|
||||
[]
|
||||
],
|
||||
]
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(TAddon())
|
||||
for s, expected in tests:
|
||||
assert tctx.master.commands.parse_partial(s) == expected
|
||||
for s, expected, expectedremain in tests:
|
||||
current, remain = tctx.master.commands.parse_partial(s)
|
||||
assert current == expected
|
||||
assert expectedremain == remain
|
||||
|
||||
|
||||
def test_simple():
|
||||
@ -179,51 +197,11 @@ def test_parsearg():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
|
||||
|
||||
assert command.parsearg(tctx.master.commands, "1", int) == 1
|
||||
with pytest.raises(exceptions.CommandError, match="Unsupported"):
|
||||
command.parsearg(tctx.master.commands, "foo", type)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", int)
|
||||
|
||||
assert command.parsearg(tctx.master.commands, "true", bool) is True
|
||||
assert command.parsearg(tctx.master.commands, "false", bool) is False
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "flobble", bool)
|
||||
|
||||
assert len(command.parsearg(
|
||||
tctx.master.commands, "2", typing.Sequence[flow.Flow]
|
||||
)) == 2
|
||||
assert command.parsearg(tctx.master.commands, "1", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "2", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "0", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", Exception)
|
||||
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", typing.Sequence[str]
|
||||
) == ["foo"]
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo, bar", typing.Sequence[str]
|
||||
) == ["foo", "bar"]
|
||||
|
||||
a = TAddon()
|
||||
tctx.master.commands.add("choices", a.choices)
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "one", mitmproxy.types.Choice("choices"),
|
||||
) == "one"
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"),
|
||||
)
|
||||
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", mitmproxy.types.Path
|
||||
) == "foo"
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", mitmproxy.types.Cmd
|
||||
) == "foo"
|
||||
|
||||
|
||||
class TDec:
|
||||
@command.command("cmd1")
|
||||
|
@ -88,7 +88,9 @@ def test_cmd():
|
||||
b = mitmproxy.types._CmdType()
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "foo") is False
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") is True
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo"
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") == "cmd1"
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo")
|
||||
assert len(
|
||||
b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
|
||||
) == len(tctx.master.commands.commands.keys())
|
||||
@ -134,6 +136,8 @@ def test_strseq():
|
||||
class DummyConsole:
|
||||
@command.command("view.resolve")
|
||||
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
|
||||
if spec == "err":
|
||||
raise mitmproxy.exceptions.CommandError()
|
||||
n = int(spec)
|
||||
return [tflow.tflow(resp=True)] * n
|
||||
|
||||
@ -155,9 +159,11 @@ def test_flow():
|
||||
assert b.is_valid(tctx.master.commands, flow.Flow, tflow.tflow()) is True
|
||||
assert b.is_valid(tctx.master.commands, flow.Flow, "xx") is False
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "0")
|
||||
b.parse(tctx.master.commands, flow.Flow, "0")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "2")
|
||||
b.parse(tctx.master.commands, flow.Flow, "2")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, flow.Flow, "err")
|
||||
|
||||
|
||||
def test_flows():
|
||||
@ -173,6 +179,8 @@ def test_flows():
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "err")
|
||||
|
||||
|
||||
def test_data():
|
||||
|
@ -42,16 +42,16 @@ class TestCommandBuffer:
|
||||
with taddons.context() as tctx:
|
||||
for start, output in tests:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = start[0], start[1]
|
||||
cb.text, cb.cursor = start[0], start[1]
|
||||
cb.backspace()
|
||||
assert cb.buf == output[0]
|
||||
assert cb.text == output[0]
|
||||
assert cb.cursor == output[1]
|
||||
|
||||
def test_left(self):
|
||||
cursors = [3, 2, 1, 0, 0]
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = "abcd", 4
|
||||
cb.text, cb.cursor = "abcd", 4
|
||||
for c in cursors:
|
||||
cb.left()
|
||||
assert cb.cursor == c
|
||||
@ -60,7 +60,7 @@ class TestCommandBuffer:
|
||||
cursors = [1, 2, 3, 4, 4]
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = "abcd", 0
|
||||
cb.text, cb.cursor = "abcd", 0
|
||||
for c in cursors:
|
||||
cb.right()
|
||||
assert cb.cursor == c
|
||||
@ -74,20 +74,25 @@ class TestCommandBuffer:
|
||||
with taddons.context() as tctx:
|
||||
for start, output in tests:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = start[0], start[1]
|
||||
cb.text, cb.cursor = start[0], start[1]
|
||||
cb.insert("x")
|
||||
assert cb.buf == output[0]
|
||||
assert cb.text == output[0]
|
||||
assert cb.cursor == output[1]
|
||||
|
||||
def test_cycle_completion(self):
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf = "foo bar"
|
||||
cb.cursor = len(cb.buf)
|
||||
cb.text = "foo bar"
|
||||
cb.cursor = len(cb.text)
|
||||
cb.cycle_completion()
|
||||
|
||||
def test_render(self):
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf = "foo"
|
||||
assert cb.render() == "foo"
|
||||
cb.text = "foo"
|
||||
assert cb.render()
|
||||
|
||||
def test_flatten(self):
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
assert cb.flatten("foo bar") == "foo bar"
|
||||
|
Loading…
Reference in New Issue
Block a user