mitmproxy/test/mitmproxy/test_command.py
2020-11-20 19:25:26 +01:00

567 lines
21 KiB
Python

import inspect
import io
import typing
import pytest
import mitmproxy.types
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TAddon:
@command.command("cmd1")
def cmd1(self, foo: str) -> str:
"""cmd1 help"""
return "ret " + foo
@command.command("cmd2")
def cmd2(self, foo: str) -> str:
return 99
@command.command("cmd3")
def cmd3(self, foo: int) -> int:
return foo
@command.command("cmd4")
def cmd4(self, a: int, b: str, c: mitmproxy.types.Path) -> str:
return "ok"
@command.command("subcommand")
def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str:
return "ok"
@command.command("empty")
def empty(self) -> None:
pass
@command.command("varargs")
def varargs(self, one: str, *var: str) -> typing.Sequence[str]:
return list(var)
def choices(self) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.argument("arg", type=mitmproxy.types.Choice("choices"))
def choose(self, arg: str) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.command("path")
def path(self, arg: mitmproxy.types.Path) -> None:
pass
@command.command("flow")
def flow(self, f: flow.Flow, s: str) -> None:
pass
class Unsupported:
pass
class TypeErrAddon:
@command.command("noret")
def noret(self):
pass
@command.command("invalidret")
def invalidret(self) -> Unsupported:
pass
@command.command("invalidarg")
def invalidarg(self, u: Unsupported):
pass
class TestCommand:
def test_typecheck(self):
with taddons.context(loadcore=False) as tctx:
cm = command.CommandManager(tctx.master)
a = TypeErrAddon()
command.Command(cm, "noret", a.noret)
with pytest.raises(exceptions.CommandError):
command.Command(cm, "invalidret", a.invalidret)
with pytest.raises(exceptions.CommandError):
assert command.Command(cm, "invalidarg", a.invalidarg)
def test_varargs(self):
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "varargs", a.varargs)
assert c.signature_help() == "varargs one *var -> str[]"
assert c.call(["one", "two", "three"]) == ["two", "three"]
def test_call(self):
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
assert c.signature_help() == "cmd.path foo -> str"
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
c.call(["foo"])
c = command.Command(cm, "cmd.three", a.cmd3)
assert c.call(["1"]) == 1
def test_parse_partial(self):
tests = [
[
"foo bar",
[
command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
"cmd1 'bar",
[
command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'bar", type=str, valid=True)
],
[],
],
[
"a",
[command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)],
[],
],
[
"",
[],
[
command.CommandParameter("", mitmproxy.types.Cmd),
command.CommandParameter("", mitmproxy.types.CmdArgs)
]
],
[
"cmd3 1",
[
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="1", type=int, valid=True),
],
[]
],
[
"cmd3 ",
[
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[command.CommandParameter('foo', int)]
],
[
"subcommand ",
[
command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[
command.CommandParameter('cmd', mitmproxy.types.Cmd),
command.CommandParameter('args', mitmproxy.types.CmdArgs, kind=inspect.Parameter.VAR_POSITIONAL),
],
],
[
"varargs one",
[
command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="one", type=str, valid=True),
],
[command.CommandParameter('var', str, kind=inspect.Parameter.VAR_POSITIONAL)]
],
[
"varargs one two three",
[
command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="one", type=str, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="two", type=str, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="three", type=str, valid=True),
],
[],
],
[
"subcommand cmd3 ",
[
command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[command.CommandParameter('foo', int)]
],
[
"cmd4",
[
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
],
[
command.CommandParameter('a', int),
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"cmd4 ",
[
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[
command.CommandParameter('a', int),
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"cmd4 1",
[
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="1", type=int, valid=True),
],
[
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"flow",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
],
[
command.CommandParameter('f', flow.Flow),
command.CommandParameter('s', str),
]
],
[
"flow ",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[
command.CommandParameter('f', flow.Flow),
command.CommandParameter('s', str),
]
],
[
"flow x",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="x", type=flow.Flow, valid=False),
],
[
command.CommandParameter('s', str),
]
],
[
"flow x ",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="x", type=flow.Flow, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[
command.CommandParameter('s', str),
]
],
[
"flow \"one two",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="\"one two", type=flow.Flow, valid=False),
],
[
command.CommandParameter('s', str),
]
],
[
"flow \"three four\"",
[
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"three four"', type=flow.Flow, valid=False),
],
[
command.CommandParameter('s', str),
]
],
[
"spaces ' '",
[
command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="' '", type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
'spaces2 " "',
[
command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='" "', type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
'"abc"',
[
command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False),
],
[],
],
[
"'def'",
[
command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False),
],
[],
],
[
"cmd10 'a' \"b\" c",
[
command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
"cmd11 'a \"b\" c'",
[
command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
'cmd12 "a \'b\' c"',
[
command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
r'cmd13 "a \"b\" c"',
[
command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
r"cmd14 'a \'b\' c'",
[
command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
" spaces_at_the_begining_are_not_stripped",
[
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
valid=False),
],
[],
],
[
" spaces_at_the_begining_are_not_stripped neither_at_the_end ",
[
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[],
],
]
with taddons.context() as tctx:
tctx.master.addons.add(TAddon())
for s, expected, expectedremain in tests:
current, remain = tctx.master.commands.parse_partial(s)
assert (s, current, expectedremain) == (s, expected, remain)
def test_simple():
with taddons.context() as tctx:
c = command.CommandManager(tctx.master)
a = TAddon()
c.add("one.two", a.cmd1)
assert (c.commands["one.two"].help == "cmd1 help")
assert (c.execute("one.two foo") == "ret foo")
assert (c.execute("one.two \"foo\"") == "ret foo")
assert (c.execute("one.two 'foo bar'") == "ret foo bar")
assert (c.call("one.two", "foo") == "ret foo")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("nonexistent")
with pytest.raises(exceptions.CommandError, match="Invalid"):
c.execute("")
with pytest.raises(exceptions.CommandError, match="argument mismatch"):
c.execute("one.two too many args")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.call("nonexistent")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("\\")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\'")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\"")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\"")
c.add("empty", a.empty)
c.execute("empty")
fp = io.StringIO()
c.dump(fp)
assert fp.getvalue()
def test_typename():
assert command.typename(str) == "str"
assert command.typename(typing.Sequence[flow.Flow]) == "flow[]"
assert command.typename(mitmproxy.types.Data) == "data[][]"
assert command.typename(mitmproxy.types.CutSpec) == "cut[]"
assert command.typename(flow.Flow) == "flow"
assert command.typename(typing.Sequence[str]) == "str[]"
assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
assert command.typename(mitmproxy.types.Path) == "path"
assert command.typename(mitmproxy.types.Cmd) == "cmd"
with pytest.raises(exceptions.CommandError, match="missing type annotation"):
command.typename(inspect._empty)
with pytest.raises(exceptions.CommandError, match="unsupported type"):
command.typename(None)
class DummyConsole:
@command.command("view.flows.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
@command.command("cut")
def cut(self, spec: str) -> mitmproxy.types.Data:
return [["test"]]
def test_parsearg():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
with pytest.raises(exceptions.CommandError, match="Unsupported"):
command.parsearg(tctx.master.commands, "foo", type)
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", int)
class TDec:
@command.command("cmd1")
def cmd1(self, foo: str) -> str:
"""cmd1 help"""
return "ret " + foo
@command.command("cmd2")
def cmd2(self, foo: str) -> str:
return 99
@command.command("empty")
def empty(self) -> None:
pass
class TAttr:
def __getattr__(self, item):
raise OSError
class TAttr2:
def __getattr__(self, item):
return TAttr2()
class TCmds(TAttr):
def __init__(self):
self.TAttr = TAttr()
self.TAttr2 = TAttr2()
@command.command("empty")
def empty(self) -> None:
pass
@pytest.mark.asyncio
async def test_collect_commands():
"""
This tests for errors thrown by getattr() or __getattr__ implementations
that return an object for .command_name.
"""
with taddons.context() as tctx:
c = command.CommandManager(tctx.master)
a = TCmds()
c.collect_commands(a)
assert "empty" in c.commands
a = TypeErrAddon()
c.collect_commands(a)
await tctx.master.await_log("Could not load")
def test_decorator():
with taddons.context() as tctx:
c = command.CommandManager(tctx.master)
a = TDec()
c.collect_commands(a)
assert "cmd1" in c.commands
assert c.execute("cmd1 bar") == "ret bar"
assert "empty" in c.commands
assert c.execute("empty") is None
with taddons.context() as tctx:
tctx.master.addons.add(a)
assert tctx.master.commands.execute("cmd1 bar") == "ret bar"
def test_verify_arg_signature():
with pytest.raises(exceptions.CommandError):
command.verify_arg_signature(lambda: None, [1, 2], {})
print('hello there')
command.verify_arg_signature(lambda a, b: None, [1, 2], {})