Merge pull request #2707 from cortesi/addtypes

commands: refactor types
This commit is contained in:
Aldo Cortesi 2017-12-19 07:50:52 +13:00 committed by GitHub
commit 6ef6286d8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 591 additions and 313 deletions

View File

@ -3,6 +3,7 @@ from mitmproxy import ctx
from mitmproxy import io
from mitmproxy import flow
from mitmproxy import command
import mitmproxy.types
import typing
@ -37,7 +38,7 @@ class ClientPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.client.file")
def load_file(self, path: command.Path) -> None:
def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:

View File

@ -6,6 +6,7 @@ from mitmproxy import command
from mitmproxy import flow
from mitmproxy import optmanager
from mitmproxy.net.http import status_codes
import mitmproxy.types
class Core:
@ -96,7 +97,7 @@ class Core:
]
@command.command("flow.set")
@command.argument("spec", type=command.Choice("flow.set.options"))
@command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
@ -187,7 +188,7 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
@command.argument("enc", type=command.Choice("flow.encode.options"))
@command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
@ -216,7 +217,7 @@ class Core:
return ["gzip", "deflate", "br"]
@command.command("options.load")
def options_load(self, path: command.Path) -> None:
def options_load(self, path: mitmproxy.types.Path) -> None:
"""
Load options from a file.
"""
@ -228,7 +229,7 @@ class Core:
) from e
@command.command("options.save")
def options_save(self, path: command.Path) -> None:
def options_save(self, path: mitmproxy.types.Path) -> None:
"""
Save options to a file.
"""

View File

@ -7,6 +7,7 @@ from mitmproxy import flow
from mitmproxy import ctx
from mitmproxy import certs
from mitmproxy.utils import strutils
import mitmproxy.types
import pyperclip
@ -51,8 +52,8 @@ class Cut:
def cut(
self,
flows: typing.Sequence[flow.Flow],
cuts: typing.Sequence[command.Cut]
) -> command.Cuts:
cuts: mitmproxy.types.CutSpec,
) -> mitmproxy.types.Data:
"""
Cut data from a set of flows. Cut specifications are attribute paths
from the base of the flow object, with a few conveniences - "port"
@ -62,17 +63,17 @@ class Cut:
or "false", "bytes" are preserved, and all other values are
converted to strings.
"""
ret = []
ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]]
for f in flows:
ret.append([extract(c, f) for c in cuts])
return ret
return ret # type: ignore
@command.command("cut.save")
def save(
self,
flows: typing.Sequence[flow.Flow],
cuts: typing.Sequence[command.Cut],
path: command.Path
cuts: mitmproxy.types.CutSpec,
path: mitmproxy.types.Path
) -> None:
"""
Save cuts to file. If there are multiple flows or cuts, the format
@ -84,7 +85,7 @@ class Cut:
append = False
if path.startswith("+"):
append = True
path = command.Path(path[1:])
path = mitmproxy.types.Path(path[1:])
if len(cuts) == 1 and len(flows) == 1:
with open(path, "ab" if append else "wb") as fp:
if fp.tell() > 0:
@ -110,7 +111,7 @@ class Cut:
def clip(
self,
flows: typing.Sequence[flow.Flow],
cuts: typing.Sequence[command.Cut],
cuts: mitmproxy.types.CutSpec,
) -> None:
"""
Send cuts to the clipboard. If there are multiple flows or cuts, the

View File

@ -5,6 +5,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.utils import strutils
from mitmproxy.net.http.http1 import assemble
import mitmproxy.types
import pyperclip
@ -49,7 +50,7 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None:
def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""

View File

@ -7,6 +7,7 @@ from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flow
import mitmproxy.types
class Save:
@ -50,7 +51,7 @@ class Save:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
@command.command("save.file")
def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None:
def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None:
"""
Save flows to a file. If the path starts with a +, flows are
appended to the file, otherwise it is over-written.

View File

@ -9,6 +9,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy import io
from mitmproxy import command
import mitmproxy.types
class ServerPlayback:
@ -31,7 +32,7 @@ class ServerPlayback:
ctx.master.addons.trigger("update", [])
@command.command("replay.server.file")
def load_file(self, path: command.Path) -> None:
def load_file(self, path: mitmproxy.types.Path) -> None:
try:
flows = io.read_flows_from_paths([path])
except exceptions.FlowReadException as e:

View File

@ -351,7 +351,7 @@ class View(collections.Sequence):
ctx.master.addons.trigger("update", updated)
@command.command("view.load")
def load_file(self, path: command.Path) -> None:
def load_file(self, path: mitmproxy.types.Path) -> None:
"""
Load flows into the view, without processing them with addons.
"""

View File

@ -12,7 +12,7 @@ import sys
from mitmproxy.utils import typecheck
from mitmproxy import exceptions
from mitmproxy import flow
import mitmproxy.types
def lexer(s):
@ -24,113 +24,14 @@ def lexer(s):
return lex
# This is an awkward location for these values, but it's better than having
# the console core import and depend on an addon. FIXME: Add a way for
# addons to add custom types and manage their completion and validation.
valid_flow_prefixes = [
"@all",
"@focus",
"@shown",
"@hidden",
"@marked",
"@unmarked",
"~q",
"~s",
"~a",
"~hq",
"~hs",
"~b",
"~bq",
"~bs",
"~t",
"~d",
"~m",
"~u",
"~c",
]
Cuts = typing.Sequence[
typing.Sequence[typing.Union[str, bytes]]
]
class Cut(str):
# This is an awkward location for these values, but it's better than having
# the console core import and depend on an addon. FIXME: Add a way for
# addons to add custom types and manage their completion and validation.
valid_prefixes = [
"request.method",
"request.scheme",
"request.host",
"request.http_version",
"request.port",
"request.path",
"request.url",
"request.text",
"request.content",
"request.raw_content",
"request.timestamp_start",
"request.timestamp_end",
"request.header[",
"response.status_code",
"response.reason",
"response.text",
"response.content",
"response.timestamp_start",
"response.timestamp_end",
"response.raw_content",
"response.header[",
"client_conn.address.port",
"client_conn.address.host",
"client_conn.tls_version",
"client_conn.sni",
"client_conn.ssl_established",
"server_conn.address.port",
"server_conn.address.host",
"server_conn.ip_address.host",
"server_conn.tls_version",
"server_conn.sni",
"server_conn.ssl_established",
]
class Path(str):
pass
class Cmd(str):
pass
class Arg(str):
pass
def typename(t: type) -> str:
"""
Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
Translates a type to an explanatory string.
"""
if isinstance(t, Choice):
return "choice"
elif t == typing.Sequence[flow.Flow]:
return "[flow]"
elif t == typing.Sequence[str]:
return "[str]"
elif t == typing.Sequence[Cut]:
return "[cut]"
elif t == Cuts:
return "[cuts]"
elif t == flow.Flow:
return "flow"
elif issubclass(t, (str, int, bool)):
return t.__name__.lower()
else: # pragma: no cover
to = mitmproxy.types.CommandTypes.get(t, None)
if not to:
raise NotImplementedError(t)
return to.display
class Command:
@ -168,7 +69,7 @@ class Command:
ret = " -> " + ret
return "%s %s%s" % (self.path, params, ret)
def call(self, args: typing.Sequence[str]):
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
Call the command with a list of arguments. At this point, all
arguments are strings.
@ -255,13 +156,13 @@ class CommandManager:
typ = None # type: typing.Type
for i in range(len(parts)):
if i == 0:
typ = Cmd
typ = mitmproxy.types.Cmd
if parts[i] in self.commands:
params.extend(self.commands[parts[i]].paramtypes)
elif params:
typ = params.pop(0)
# FIXME: Do we need to check that Arg is positional?
if typ == Cmd and params and params[0] == Arg:
if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
if parts[i] in self.commands:
params[:] = self.commands[parts[i]].paramtypes
else:
@ -269,7 +170,7 @@ class CommandManager:
parse.append(ParseResult(value=parts[i], type=typ))
return parse
def call_args(self, path, args):
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command using a list of string arguments. May raise CommandError.
"""
@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
if isinstance(argtype, Choice):
cmd = argtype.options_command
opts = manager.call(cmd)
if spec not in opts:
raise exceptions.CommandError(
"Invalid choice: see %s for options" % cmd
)
return spec
elif issubclass(argtype, str):
return spec
elif argtype == bool:
if spec == "true":
return True
elif spec == "false":
return False
else:
raise exceptions.CommandError(
"Booleans are 'true' or 'false', got %s" % spec
)
elif issubclass(argtype, int):
try:
return int(spec)
except ValueError as e:
raise exceptions.CommandError("Expected an integer, got %s." % spec)
elif argtype == typing.Sequence[flow.Flow]:
return manager.call_args("view.resolve", [spec])
elif argtype == Cuts:
return manager.call_args("cut", [spec])
elif argtype == flow.Flow:
flows = manager.call_args("view.resolve", [spec])
if len(flows) != 1:
raise exceptions.CommandError(
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
elif argtype in (typing.Sequence[str], typing.Sequence[Cut]):
return [i.strip() for i in spec.split(",")]
else:
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
try:
return t.parse(manager, argtype, spec) # type: ignore
except exceptions.TypeError as e:
raise exceptions.CommandError from e
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
@ -360,21 +229,11 @@ def command(path):
return decorator
class Choice:
def __init__(self, options_command):
self.options_command = options_command
def __instancecheck__(self, instance):
# return false here so that arguments are piped through parsearg,
# which does extended validation.
return False
def argument(name, type):
"""
Set the type of a command argument at runtime.
This is useful for more specific types such as command.Choice, which we cannot annotate
directly as mypy does not like that.
Set the type of a command argument at runtime. This is useful for more
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__

View File

@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException):
pass
class TypeError(MitmproxyException):
pass
"""
Net-layer exceptions
"""

View File

@ -1,6 +1,4 @@
import abc
import glob
import os
import typing
import urwid
@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords
import mitmproxy.flow
import mitmproxy.master
import mitmproxy.command
import mitmproxy.types
class Completer: # pragma: no cover
@ -39,30 +38,6 @@ class ListCompleter(Completer):
return ret
# Generates the completion options for a specific starting input
def pathOptions(start: str) -> typing.Sequence[str]:
if not start:
start = "./"
path = os.path.expanduser(start)
ret = []
if os.path.isdir(path):
files = glob.glob(os.path.join(path, "*"))
prefix = start
else:
files = glob.glob(path + "*")
prefix = os.path.dirname(start)
prefix = prefix or "./"
for f in files:
display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
if os.path.isdir(f):
display += "/"
ret.append(display)
if not ret:
ret = [start]
ret.sort()
return ret
CompletionState = typing.NamedTuple(
"CompletionState",
[
@ -106,48 +81,12 @@ class CommandBuffer():
if not self.completion:
parts = self.master.commands.parse_partial(self.buf[:self.cursor])
last = parts[-1]
if last.type == mitmproxy.command.Cmd:
ct = mitmproxy.types.CommandTypes.get(last.type, None)
if ct:
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
self.master.commands.commands.keys(),
),
parse = parts,
)
if last.type == typing.Sequence[mitmproxy.command.Cut]:
spec = parts[-1].value.split(",")
opts = []
for pref in mitmproxy.command.Cut.valid_prefixes:
spec[-1] = pref
opts.append(",".join(spec))
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
opts,
),
parse = parts,
)
elif isinstance(last.type, mitmproxy.command.Choice):
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
self.master.commands.call(last.type.options_command),
),
parse = parts,
)
elif last.type == mitmproxy.command.Path:
self.completion = CompletionState(
completer = ListCompleter(
"",
pathOptions(parts[1].value)
),
parse = parts,
)
elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow):
self.completion = CompletionState(
completer = ListCompleter(
"",
mitmproxy.command.valid_flow_prefixes,
ct.completion(self.master.commands, last.type, parts[-1].value)
),
parse = parts,
)

View File

@ -7,6 +7,8 @@ from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import contentviews
from mitmproxy.utils import strutils
import mitmproxy.types
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
@ -218,8 +220,8 @@ class ConsoleAddon:
self,
prompt: str,
choices: typing.Sequence[str],
cmd: command.Cmd,
*args: command.Arg
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
@ -241,7 +243,7 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
@ -352,7 +354,7 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
@command.argument("part", type=command.Choice("console.edit.focus.options"))
@command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
def edit_focus(self, part: str) -> None:
"""
Edit a component of the currently focused flow.
@ -404,14 +406,14 @@ class ConsoleAddon:
self._grideditor().cmd_delete()
@command.command("console.grideditor.load")
def grideditor_load(self, path: command.Path) -> None:
def grideditor_load(self, path: mitmproxy.types.Path) -> None:
"""
Read a file into the currrent cell.
"""
self._grideditor().cmd_read_file(path)
@command.command("console.grideditor.load_escaped")
def grideditor_load_escaped(self, path: command.Path) -> None:
def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None:
"""
Read a file containing a Python-style escaped string into the
currrent cell.
@ -419,7 +421,7 @@ class ConsoleAddon:
self._grideditor().cmd_read_file_escaped(path)
@command.command("console.grideditor.save")
def grideditor_save(self, path: command.Path) -> None:
def grideditor_save(self, path: mitmproxy.types.Path) -> None:
"""
Save data to file as a CSV.
"""
@ -440,7 +442,7 @@ class ConsoleAddon:
self._grideditor().cmd_spawn_editor()
@command.command("console.flowview.mode.set")
@command.argument("mode", type=command.Choice("console.flowview.mode.options"))
@command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options"))
def flowview_mode_set(self, mode: str) -> None:
"""
Set the display mode for the current flow view.
@ -498,8 +500,8 @@ class ConsoleAddon:
self,
contexts: typing.Sequence[str],
key: str,
cmd: command.Cmd,
*args: command.Arg
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.Arg
) -> None:
"""
Bind a shortcut key.

330
mitmproxy/types.py Normal file
View File

@ -0,0 +1,330 @@
import os
import glob
import typing
from mitmproxy import exceptions
from mitmproxy import flow
class Path(str):
pass
class Cmd(str):
pass
class Arg(str):
pass
class CutSpec(typing.Sequence[str]):
pass
class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]):
pass
class Choice:
def __init__(self, options_command):
self.options_command = options_command
def __instancecheck__(self, instance): # pragma: no cover
# return false here so that arguments are piped through parsearg,
# which does extended validation.
return False
# One of the many charming things about mypy is that introducing type
# annotations can cause circular dependencies where there were none before.
# Rather than putting types and the CommandManger in the same file, we introduce
# a stub type with the signature we use.
class _CommandStub:
commands = {} # type: typing.Mapping[str, typing.Any]
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
pass
def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
pass
class BaseType:
typ = object # type: typing.Type
display = "" # type: str
def completion(
self, manager: _CommandStub, t: type, s: str
) -> typing.Sequence[str]: # pragma: no cover
pass
def parse(
self, manager: _CommandStub, t: type, s: str
) -> typing.Any: # pragma: no cover
pass
class Bool(BaseType):
typ = bool
display = "bool"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return ["false", "true"]
def parse(self, manager: _CommandStub, t: type, s: str) -> bool:
if s == "true":
return True
elif s == "false":
return False
else:
raise exceptions.TypeError(
"Booleans are 'true' or 'false', got %s" % s
)
class Str(BaseType):
typ = str
display = "str"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
return s
class Int(BaseType):
typ = int
display = "int"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandStub, t: type, s: str) -> int:
try:
return int(s)
except ValueError as e:
raise exceptions.TypeError from e
class PathType(BaseType):
typ = Path
display = "path"
def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]:
if not start:
start = "./"
path = os.path.expanduser(start)
ret = []
if os.path.isdir(path):
files = glob.glob(os.path.join(path, "*"))
prefix = start
else:
files = glob.glob(path + "*")
prefix = os.path.dirname(start)
prefix = prefix or "./"
for f in files:
display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
if os.path.isdir(f):
display += "/"
ret.append(display)
if not ret:
ret = [start]
ret.sort()
return ret
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
return s
class CmdType(BaseType):
typ = Cmd
display = "cmd"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return list(manager.commands.keys())
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
return s
class ArgType(BaseType):
typ = Arg
display = "arg"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
return s
class StrSeq(BaseType):
typ = typing.Sequence[str]
display = "[str]"
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return [x.strip() for x in s.split(",")]
class CutSpecType(BaseType):
typ = CutSpec
display = "[cut]"
valid_prefixes = [
"request.method",
"request.scheme",
"request.host",
"request.http_version",
"request.port",
"request.path",
"request.url",
"request.text",
"request.content",
"request.raw_content",
"request.timestamp_start",
"request.timestamp_end",
"request.header[",
"response.status_code",
"response.reason",
"response.text",
"response.content",
"response.timestamp_start",
"response.timestamp_end",
"response.raw_content",
"response.header[",
"client_conn.address.port",
"client_conn.address.host",
"client_conn.tls_version",
"client_conn.sni",
"client_conn.ssl_established",
"server_conn.address.port",
"server_conn.address.host",
"server_conn.ip_address.host",
"server_conn.tls_version",
"server_conn.sni",
"server_conn.ssl_established",
]
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
spec = s.split(",")
opts = []
for pref in self.valid_prefixes:
spec[-1] = pref
opts.append(",".join(spec))
return opts
def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec:
parts = s.split(",") # type: typing.Any
return parts
class BaseFlowType(BaseType):
valid_prefixes = [
"@all",
"@focus",
"@shown",
"@hidden",
"@marked",
"@unmarked",
"~q",
"~s",
"~a",
"~hq",
"~hs",
"~b",
"~bq",
"~bs",
"~t",
"~d",
"~m",
"~u",
"~c",
]
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
return self.valid_prefixes
class FlowType(BaseFlowType):
typ = flow.Flow
display = "flow"
def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow:
flows = manager.call_args("view.resolve", [s])
if len(flows) != 1:
raise exceptions.TypeError(
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
class FlowsType(BaseFlowType):
typ = typing.Sequence[flow.Flow]
display = "[flow]"
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]:
return manager.call_args("view.resolve", [s])
class DataType:
typ = Data
display = "[data]"
def completion(
self, manager: _CommandStub, t: type, s: str
) -> typing.Sequence[str]: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
def parse(
self, manager: _CommandStub, t: type, s: str
) -> typing.Any: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
class ChoiceType:
typ = Choice
display = "choice"
def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]:
return manager.call(t.options_command)
def parse(self, manager: _CommandStub, t: Choice, s: str) -> str:
opts = manager.call(t.options_command)
if s not in opts:
raise exceptions.TypeError("Invalid choice.")
return s
class TypeManager:
def __init__(self, *types):
self.typemap = {}
for t in types:
self.typemap[t.typ] = t()
def get(self, t: type, default=None) -> BaseType:
if type(t) in self.typemap:
return self.typemap[type(t)]
return self.typemap.get(t, default)
CommandTypes = TypeManager(
ArgType,
Bool,
ChoiceType,
CmdType,
CutSpecType,
DataType,
FlowType,
FlowsType,
Int,
PathType,
Str,
StrSeq,
)

View File

@ -4,6 +4,7 @@ from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
import mitmproxy.types
import io
import pytest
@ -25,7 +26,7 @@ class TAddon:
return foo
@command.command("subcommand")
def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str:
def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
return "ok"
@command.command("empty")
@ -39,12 +40,12 @@ class TAddon:
def choices(self) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.argument("arg", type=command.Choice("choices"))
@command.argument("arg", type=mitmproxy.types.Choice("choices"))
def choose(self, arg: str) -> typing.Sequence[str]:
return ["one", "two", "three"]
@command.command("path")
def path(self, arg: command.Path) -> None:
def path(self, arg: mitmproxy.types.Path) -> None:
pass
@ -79,45 +80,45 @@ class TestCommand:
[
"foo bar",
[
command.ParseResult(value = "foo", type = command.Cmd),
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
command.ParseResult(value = "bar", type = str)
],
],
[
"foo 'bar",
[
command.ParseResult(value = "foo", type = command.Cmd),
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
command.ParseResult(value = "'bar", type = str)
]
],
["a", [command.ParseResult(value = "a", type = command.Cmd)]],
["", [command.ParseResult(value = "", type = command.Cmd)]],
["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]],
["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]],
[
"cmd3 1",
[
command.ParseResult(value = "cmd3", type = command.Cmd),
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "1", type = int),
]
],
[
"cmd3 ",
[
command.ParseResult(value = "cmd3", type = command.Cmd),
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "", type = int),
]
],
[
"subcommand ",
[
command.ParseResult(value = "subcommand", type = command.Cmd),
command.ParseResult(value = "", type = command.Cmd),
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
command.ParseResult(value = "", type = mitmproxy.types.Cmd),
]
],
[
"subcommand cmd3 ",
[
command.ParseResult(value = "subcommand", type = command.Cmd),
command.ParseResult(value = "cmd3", type = command.Cmd),
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
command.ParseResult(value = "", type = int),
]
],
@ -154,15 +155,15 @@ def test_typename():
assert command.typename(str) == "str"
assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
assert command.typename(command.Cuts) == "[cuts]"
assert command.typename(typing.Sequence[command.Cut]) == "[cut]"
assert command.typename(mitmproxy.types.Data) == "[data]"
assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
assert command.typename(flow.Flow) == "flow"
assert command.typename(typing.Sequence[str]) == "[str]"
assert command.typename(command.Choice("foo")) == "choice"
assert command.typename(command.Path) == "path"
assert command.typename(command.Cmd) == "cmd"
assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
assert command.typename(mitmproxy.types.Path) == "path"
assert command.typename(mitmproxy.types.Cmd) == "cmd"
class DummyConsole:
@ -172,7 +173,7 @@ class DummyConsole:
return [tflow.tflow(resp=True)] * n
@command.command("cut")
def cut(self, spec: str) -> command.Cuts:
def cut(self, spec: str) -> mitmproxy.types.Data:
return [["test"]]
@ -201,10 +202,6 @@ def test_parsearg():
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", Exception)
assert command.parsearg(
tctx.master.commands, "foo", command.Cuts
) == [["test"]]
assert command.parsearg(
tctx.master.commands, "foo", typing.Sequence[str]
) == ["foo"]
@ -215,18 +212,18 @@ def test_parsearg():
a = TAddon()
tctx.master.commands.add("choices", a.choices)
assert command.parsearg(
tctx.master.commands, "one", command.Choice("choices"),
tctx.master.commands, "one", mitmproxy.types.Choice("choices"),
) == "one"
with pytest.raises(exceptions.CommandError):
assert command.parsearg(
tctx.master.commands, "invalid", command.Choice("choices"),
tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"),
)
assert command.parsearg(
tctx.master.commands, "foo", command.Path
tctx.master.commands, "foo", mitmproxy.types.Path
) == "foo"
assert command.parsearg(
tctx.master.commands, "foo", command.Cmd
tctx.master.commands, "foo", mitmproxy.types.Cmd
) == "foo"
@ -272,5 +269,5 @@ def test_choice():
basic typechecking for choices should fail as we cannot verify if strings are a valid choice
at this point.
"""
c = command.Choice("foo")
c = mitmproxy.types.Choice("foo")
assert not typecheck.check_command_type("foo", c)

View File

View File

@ -0,0 +1,175 @@
import pytest
import os
import typing
import contextlib
from mitmproxy.test import tutils
import mitmproxy.exceptions
import mitmproxy.types
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import command
from mitmproxy import flow
from . import test_command
@contextlib.contextmanager
def chdir(path: str):
old_dir = os.getcwd()
os.chdir(path)
yield
os.chdir(old_dir)
def test_bool():
with taddons.context() as tctx:
b = mitmproxy.types.Bool()
assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"]
assert b.parse(tctx.master.commands, bool, "true") is True
assert b.parse(tctx.master.commands, bool, "false") is False
with pytest.raises(mitmproxy.exceptions.TypeError):
b.parse(tctx.master.commands, bool, "foo")
def test_str():
with taddons.context() as tctx:
b = mitmproxy.types.Str()
assert b.completion(tctx.master.commands, str, "") == []
assert b.parse(tctx.master.commands, str, "foo") == "foo"
def test_int():
with taddons.context() as tctx:
b = mitmproxy.types.Int()
assert b.completion(tctx.master.commands, int, "b") == []
assert b.parse(tctx.master.commands, int, "1") == 1
assert b.parse(tctx.master.commands, int, "999") == 999
with pytest.raises(mitmproxy.exceptions.TypeError):
b.parse(tctx.master.commands, int, "foo")
def test_path():
with taddons.context() as tctx:
b = mitmproxy.types.PathType()
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
def normPathOpts(prefix, match):
ret = []
for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match):
s = s[len(prefix):]
s = s.replace(os.sep, "/")
ret.append(s)
return ret
cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
with chdir(cd):
assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
assert b.completion(
tctx.master.commands, mitmproxy.types.Path, "nonexistent"
) == ["nonexistent"]
def test_cmd():
with taddons.context() as tctx:
tctx.master.addons.add(test_command.TAddon())
b = mitmproxy.types.CmdType()
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo"
assert len(
b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
) == len(tctx.master.commands.commands.keys())
def test_cutspec():
with taddons.context() as tctx:
b = mitmproxy.types.CutSpecType()
b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"]
assert b.completion(
tctx.master.commands, mitmproxy.types.CutSpec, "request.p"
) == b.valid_prefixes
ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f")
assert ret[0].startswith("request.port,")
assert len(ret) == len(b.valid_prefixes)
def test_arg():
with taddons.context() as tctx:
b = mitmproxy.types.ArgType()
assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
def test_strseq():
with taddons.context() as tctx:
b = mitmproxy.types.StrSeq()
assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
class DummyConsole:
@command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
@command.command("cut")
def cut(self, spec: str) -> mitmproxy.types.Data:
return [["test"]]
@command.command("options")
def options(self) -> typing.Sequence[str]:
return ["one", "two", "three"]
def test_flow():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
b = mitmproxy.types.FlowType()
assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes)
assert b.parse(tctx.master.commands, flow.Flow, "1")
with pytest.raises(mitmproxy.exceptions.TypeError):
assert b.parse(tctx.master.commands, flow.Flow, "0")
with pytest.raises(mitmproxy.exceptions.TypeError):
assert b.parse(tctx.master.commands, flow.Flow, "2")
def test_flows():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
b = mitmproxy.types.FlowsType()
assert len(
b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "")
) == len(b.valid_prefixes)
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
def test_data():
with taddons.context() as tctx:
b = mitmproxy.types.DataType()
with pytest.raises(mitmproxy.exceptions.TypeError):
b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
with pytest.raises(mitmproxy.exceptions.TypeError):
b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
def test_choice():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
b = mitmproxy.types.ChoiceType()
comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "")
assert comp == ["one", "two", "three"]
assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one"
with pytest.raises(mitmproxy.exceptions.TypeError):
b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid")
def test_typemanager():
assert mitmproxy.types.CommandTypes.get(bool, None)
assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None)

View File

@ -1,36 +1,6 @@
import os
import contextlib
from mitmproxy.tools.console.commander import commander
from mitmproxy.test import taddons
from mitmproxy.test import tutils
@contextlib.contextmanager
def chdir(path: str):
old_dir = os.getcwd()
os.chdir(path)
yield
os.chdir(old_dir)
def normPathOpts(prefix, match):
ret = []
for s in commander.pathOptions(match):
s = s[len(prefix):]
s = s.replace(os.sep, "/")
ret.append(s)
return ret
def test_pathOptions():
cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
with chdir(cd):
assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
assert commander.pathOptions("nonexistent") == ["nonexistent"]
class TestListCompleter:

View File

@ -4,7 +4,6 @@ from unittest import mock
import pytest
from mitmproxy.utils import typecheck
from mitmproxy import command
class TBase:
@ -95,9 +94,6 @@ def test_check_command_type():
assert(typecheck.check_command_type(None, None))
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()