diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py index fcc3209b9..bed06e82d 100644 --- a/mitmproxy/addons/clientplayback.py +++ b/mitmproxy/addons/clientplayback.py @@ -3,6 +3,7 @@ from mitmproxy import ctx from mitmproxy import io from mitmproxy import flow from mitmproxy import command +import mitmproxy.types import typing @@ -37,7 +38,7 @@ class ClientPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.client.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 4191d4901..2b0b2f141 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -6,6 +6,7 @@ from mitmproxy import command from mitmproxy import flow from mitmproxy import optmanager from mitmproxy.net.http import status_codes +import mitmproxy.types class Core: @@ -96,7 +97,7 @@ class Core: ] @command.command("flow.set") - @command.argument("spec", type=command.Choice("flow.set.options")) + @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) def flow_set( self, flows: typing.Sequence[flow.Flow], @@ -187,7 +188,7 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - @command.argument("enc", type=command.Choice("flow.encode.options")) + @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) def encode( self, flows: typing.Sequence[flow.Flow], @@ -216,7 +217,7 @@ class Core: return ["gzip", "deflate", "br"] @command.command("options.load") - def options_load(self, path: command.Path) -> None: + def options_load(self, path: mitmproxy.types.Path) -> None: """ Load options from a file. """ @@ -228,7 +229,7 @@ class Core: ) from e @command.command("options.save") - def options_save(self, path: command.Path) -> None: + def options_save(self, path: mitmproxy.types.Path) -> None: """ Save options to a file. """ diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index efc9e5df4..b90df5497 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -7,6 +7,7 @@ from mitmproxy import flow from mitmproxy import ctx from mitmproxy import certs from mitmproxy.utils import strutils +import mitmproxy.types import pyperclip @@ -51,8 +52,8 @@ class Cut: def cut( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut] - ) -> command.Cuts: + cuts: mitmproxy.types.CutSpec, + ) -> mitmproxy.types.Data: """ Cut data from a set of flows. Cut specifications are attribute paths from the base of the flow object, with a few conveniences - "port" @@ -62,17 +63,17 @@ class Cut: or "false", "bytes" are preserved, and all other values are converted to strings. """ - ret = [] + ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]] for f in flows: ret.append([extract(c, f) for c in cuts]) - return ret + return ret # type: ignore @command.command("cut.save") def save( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], - path: command.Path + cuts: mitmproxy.types.CutSpec, + path: mitmproxy.types.Path ) -> None: """ Save cuts to file. If there are multiple flows or cuts, the format @@ -84,7 +85,7 @@ class Cut: append = False if path.startswith("+"): append = True - path = command.Path(path[1:]) + path = mitmproxy.types.Path(path[1:]) if len(cuts) == 1 and len(flows) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: @@ -110,7 +111,7 @@ class Cut: def clip( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], + cuts: mitmproxy.types.CutSpec, ) -> None: """ Send cuts to the clipboard. If there are multiple flows or cuts, the diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 5388a0e88..0169f5b1b 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -5,6 +5,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.utils import strutils from mitmproxy.net.http.http1 import assemble +import mitmproxy.types import pyperclip @@ -49,7 +50,7 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None: + def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: """ Export a flow to path. """ diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 40cd6f827..1778855d1 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -7,6 +7,7 @@ from mitmproxy import flowfilter from mitmproxy import io from mitmproxy import ctx from mitmproxy import flow +import mitmproxy.types class Save: @@ -50,7 +51,7 @@ class Save: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) @command.command("save.file") - def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None: + def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None: """ Save flows to a file. If the path starts with a +, flows are appended to the file, otherwise it is over-written. diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 46968a8d6..20fcfc2a1 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -9,6 +9,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy import io from mitmproxy import command +import mitmproxy.types class ServerPlayback: @@ -31,7 +32,7 @@ class ServerPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.server.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index e45f2baf6..3a15fd3eb 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -351,7 +351,7 @@ class View(collections.Sequence): ctx.master.addons.trigger("update", updated) @command.command("view.load") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: """ Load flows into the view, without processing them with addons. """ diff --git a/mitmproxy/command.py b/mitmproxy/command.py index c86d97925..a77658fd4 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -12,7 +12,7 @@ import sys from mitmproxy.utils import typecheck from mitmproxy import exceptions -from mitmproxy import flow +import mitmproxy.types def lexer(s): @@ -24,113 +24,14 @@ def lexer(s): return lex -# This is an awkward location for these values, but it's better than having -# the console core import and depend on an addon. FIXME: Add a way for -# addons to add custom types and manage their completion and validation. -valid_flow_prefixes = [ - "@all", - "@focus", - "@shown", - "@hidden", - "@marked", - "@unmarked", - "~q", - "~s", - "~a", - "~hq", - "~hs", - "~b", - "~bq", - "~bs", - "~t", - "~d", - "~m", - "~u", - "~c", -] - - -Cuts = typing.Sequence[ - typing.Sequence[typing.Union[str, bytes]] -] - - -class Cut(str): - # This is an awkward location for these values, but it's better than having - # the console core import and depend on an addon. FIXME: Add a way for - # addons to add custom types and manage their completion and validation. - valid_prefixes = [ - "request.method", - "request.scheme", - "request.host", - "request.http_version", - "request.port", - "request.path", - "request.url", - "request.text", - "request.content", - "request.raw_content", - "request.timestamp_start", - "request.timestamp_end", - "request.header[", - - "response.status_code", - "response.reason", - "response.text", - "response.content", - "response.timestamp_start", - "response.timestamp_end", - "response.raw_content", - "response.header[", - - "client_conn.address.port", - "client_conn.address.host", - "client_conn.tls_version", - "client_conn.sni", - "client_conn.ssl_established", - - "server_conn.address.port", - "server_conn.address.host", - "server_conn.ip_address.host", - "server_conn.tls_version", - "server_conn.sni", - "server_conn.ssl_established", - ] - - -class Path(str): - pass - - -class Cmd(str): - pass - - -class Arg(str): - pass - - def typename(t: type) -> str: """ - Translates a type to an explanatory string. If ret is True, we're - looking at a return type, else we're looking at a parameter type. + Translates a type to an explanatory string. """ - if isinstance(t, Choice): - return "choice" - elif t == typing.Sequence[flow.Flow]: - return "[flow]" - elif t == typing.Sequence[str]: - return "[str]" - elif t == typing.Sequence[Cut]: - return "[cut]" - elif t == Cuts: - return "[cuts]" - elif t == flow.Flow: - return "flow" - elif issubclass(t, (str, int, bool)): - return t.__name__.lower() - else: # pragma: no cover + to = mitmproxy.types.CommandTypes.get(t, None) + if not to: raise NotImplementedError(t) + return to.display class Command: @@ -168,7 +69,7 @@ class Command: ret = " -> " + ret return "%s %s%s" % (self.path, params, ret) - def call(self, args: typing.Sequence[str]): + def call(self, args: typing.Sequence[str]) -> typing.Any: """ Call the command with a list of arguments. At this point, all arguments are strings. @@ -255,13 +156,13 @@ class CommandManager: typ = None # type: typing.Type for i in range(len(parts)): if i == 0: - typ = Cmd + typ = mitmproxy.types.Cmd if parts[i] in self.commands: params.extend(self.commands[parts[i]].paramtypes) elif params: typ = params.pop(0) # FIXME: Do we need to check that Arg is positional? - if typ == Cmd and params and params[0] == Arg: + if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: if parts[i] in self.commands: params[:] = self.commands[parts[i]].paramtypes else: @@ -269,7 +170,7 @@ class CommandManager: parse.append(ParseResult(value=parts[i], type=typ)) return parse - def call_args(self, path, args): + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: """ Call a command using a list of string arguments. May raise CommandError. """ @@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if isinstance(argtype, Choice): - cmd = argtype.options_command - opts = manager.call(cmd) - if spec not in opts: - raise exceptions.CommandError( - "Invalid choice: see %s for options" % cmd - ) - return spec - elif issubclass(argtype, str): - return spec - elif argtype == bool: - if spec == "true": - return True - elif spec == "false": - return False - else: - raise exceptions.CommandError( - "Booleans are 'true' or 'false', got %s" % spec - ) - elif issubclass(argtype, int): - try: - return int(spec) - except ValueError as e: - raise exceptions.CommandError("Expected an integer, got %s." % spec) - elif argtype == typing.Sequence[flow.Flow]: - return manager.call_args("view.resolve", [spec]) - elif argtype == Cuts: - return manager.call_args("cut", [spec]) - elif argtype == flow.Flow: - flows = manager.call_args("view.resolve", [spec]) - if len(flows) != 1: - raise exceptions.CommandError( - "Command requires one flow, specification matched %s." % len(flows) - ) - return flows[0] - elif argtype in (typing.Sequence[str], typing.Sequence[Cut]): - return [i.strip() for i in spec.split(",")] - else: + t = mitmproxy.types.CommandTypes.get(argtype, None) + if not t: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) + try: + return t.parse(manager, argtype, spec) # type: ignore + except exceptions.TypeError as e: + raise exceptions.CommandError from e def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: @@ -360,21 +229,11 @@ def command(path): return decorator -class Choice: - def __init__(self, options_command): - self.options_command = options_command - - def __instancecheck__(self, instance): - # return false here so that arguments are piped through parsearg, - # which does extended validation. - return False - - def argument(name, type): """ - Set the type of a command argument at runtime. - This is useful for more specific types such as command.Choice, which we cannot annotate - directly as mypy does not like that. + Set the type of a command argument at runtime. This is useful for more + specific types such as mitmproxy.types.Choice, which we cannot annotate + directly as mypy does not like that. """ def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py index 715174805..d568898be 100644 --- a/mitmproxy/exceptions.py +++ b/mitmproxy/exceptions.py @@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException): pass +class TypeError(MitmproxyException): + pass + + """ Net-layer exceptions """ diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index ef32b9536..13c800920 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,6 +1,4 @@ import abc -import glob -import os import typing import urwid @@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords import mitmproxy.flow import mitmproxy.master import mitmproxy.command +import mitmproxy.types class Completer: # pragma: no cover @@ -39,30 +38,6 @@ class ListCompleter(Completer): return ret -# Generates the completion options for a specific starting input -def pathOptions(start: str) -> typing.Sequence[str]: - if not start: - start = "./" - path = os.path.expanduser(start) - ret = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = start - else: - files = glob.glob(path + "*") - prefix = os.path.dirname(start) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) - if os.path.isdir(f): - display += "/" - ret.append(display) - if not ret: - ret = [start] - ret.sort() - return ret - - CompletionState = typing.NamedTuple( "CompletionState", [ @@ -106,48 +81,12 @@ class CommandBuffer(): if not self.completion: parts = self.master.commands.parse_partial(self.buf[:self.cursor]) last = parts[-1] - if last.type == mitmproxy.command.Cmd: + ct = mitmproxy.types.CommandTypes.get(last.type, None) + if ct: self.completion = CompletionState( completer = ListCompleter( parts[-1].value, - self.master.commands.commands.keys(), - ), - parse = parts, - ) - if last.type == typing.Sequence[mitmproxy.command.Cut]: - spec = parts[-1].value.split(",") - opts = [] - for pref in mitmproxy.command.Cut.valid_prefixes: - spec[-1] = pref - opts.append(",".join(spec)) - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - opts, - ), - parse = parts, - ) - elif isinstance(last.type, mitmproxy.command.Choice): - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - self.master.commands.call(last.type.options_command), - ), - parse = parts, - ) - elif last.type == mitmproxy.command.Path: - self.completion = CompletionState( - completer = ListCompleter( - "", - pathOptions(parts[1].value) - ), - parse = parts, - ) - elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow): - self.completion = CompletionState( - completer = ListCompleter( - "", - mitmproxy.command.valid_flow_prefixes, + ct.completion(self.master.commands, last.type, parts[-1].value) ), parse = parts, ) diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 453e9e1c0..37647e60e 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -7,6 +7,8 @@ from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import contentviews from mitmproxy.utils import strutils +import mitmproxy.types + from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals @@ -218,8 +220,8 @@ class ConsoleAddon: self, prompt: str, choices: typing.Sequence[str], - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -241,7 +243,7 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg + self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -352,7 +354,7 @@ class ConsoleAddon: ] @command.command("console.edit.focus") - @command.argument("part", type=command.Choice("console.edit.focus.options")) + @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) def edit_focus(self, part: str) -> None: """ Edit a component of the currently focused flow. @@ -404,14 +406,14 @@ class ConsoleAddon: self._grideditor().cmd_delete() @command.command("console.grideditor.load") - def grideditor_load(self, path: command.Path) -> None: + def grideditor_load(self, path: mitmproxy.types.Path) -> None: """ Read a file into the currrent cell. """ self._grideditor().cmd_read_file(path) @command.command("console.grideditor.load_escaped") - def grideditor_load_escaped(self, path: command.Path) -> None: + def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None: """ Read a file containing a Python-style escaped string into the currrent cell. @@ -419,7 +421,7 @@ class ConsoleAddon: self._grideditor().cmd_read_file_escaped(path) @command.command("console.grideditor.save") - def grideditor_save(self, path: command.Path) -> None: + def grideditor_save(self, path: mitmproxy.types.Path) -> None: """ Save data to file as a CSV. """ @@ -440,7 +442,7 @@ class ConsoleAddon: self._grideditor().cmd_spawn_editor() @command.command("console.flowview.mode.set") - @command.argument("mode", type=command.Choice("console.flowview.mode.options")) + @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options")) def flowview_mode_set(self, mode: str) -> None: """ Set the display mode for the current flow view. @@ -498,8 +500,8 @@ class ConsoleAddon: self, contexts: typing.Sequence[str], key: str, - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/types.py b/mitmproxy/types.py new file mode 100644 index 000000000..e1b6f95db --- /dev/null +++ b/mitmproxy/types.py @@ -0,0 +1,330 @@ +import os +import glob +import typing + +from mitmproxy import exceptions +from mitmproxy import flow + + +class Path(str): + pass + + +class Cmd(str): + pass + + +class Arg(str): + pass + + +class CutSpec(typing.Sequence[str]): + pass + + +class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]): + pass + + +class Choice: + def __init__(self, options_command): + self.options_command = options_command + + def __instancecheck__(self, instance): # pragma: no cover + # return false here so that arguments are piped through parsearg, + # which does extended validation. + return False + + +# One of the many charming things about mypy is that introducing type +# annotations can cause circular dependencies where there were none before. +# Rather than putting types and the CommandManger in the same file, we introduce +# a stub type with the signature we use. +class _CommandStub: + commands = {} # type: typing.Mapping[str, typing.Any] + + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + +class BaseType: + typ = object # type: typing.Type + display = "" # type: str + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + pass + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + pass + + +class Bool(BaseType): + typ = bool + display = "bool" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return ["false", "true"] + + def parse(self, manager: _CommandStub, t: type, s: str) -> bool: + if s == "true": + return True + elif s == "false": + return False + else: + raise exceptions.TypeError( + "Booleans are 'true' or 'false', got %s" % s + ) + + +class Str(BaseType): + typ = str + display = "str" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class Int(BaseType): + typ = int + display = "int" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> int: + try: + return int(s) + except ValueError as e: + raise exceptions.TypeError from e + + +class PathType(BaseType): + typ = Path + display = "path" + + def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]: + if not start: + start = "./" + path = os.path.expanduser(start) + ret = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = start + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(start) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) + if os.path.isdir(f): + display += "/" + ret.append(display) + if not ret: + ret = [start] + ret.sort() + return ret + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class CmdType(BaseType): + typ = Cmd + display = "cmd" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return list(manager.commands.keys()) + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class ArgType(BaseType): + typ = Arg + display = "arg" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class StrSeq(BaseType): + typ = typing.Sequence[str] + display = "[str]" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [x.strip() for x in s.split(",")] + + +class CutSpecType(BaseType): + typ = CutSpec + display = "[cut]" + valid_prefixes = [ + "request.method", + "request.scheme", + "request.host", + "request.http_version", + "request.port", + "request.path", + "request.url", + "request.text", + "request.content", + "request.raw_content", + "request.timestamp_start", + "request.timestamp_end", + "request.header[", + + "response.status_code", + "response.reason", + "response.text", + "response.content", + "response.timestamp_start", + "response.timestamp_end", + "response.raw_content", + "response.header[", + + "client_conn.address.port", + "client_conn.address.host", + "client_conn.tls_version", + "client_conn.sni", + "client_conn.ssl_established", + + "server_conn.address.port", + "server_conn.address.host", + "server_conn.ip_address.host", + "server_conn.tls_version", + "server_conn.sni", + "server_conn.ssl_established", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + spec = s.split(",") + opts = [] + for pref in self.valid_prefixes: + spec[-1] = pref + opts.append(",".join(spec)) + return opts + + def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec: + parts = s.split(",") # type: typing.Any + return parts + + +class BaseFlowType(BaseType): + valid_prefixes = [ + "@all", + "@focus", + "@shown", + "@hidden", + "@marked", + "@unmarked", + "~q", + "~s", + "~a", + "~hq", + "~hs", + "~b", + "~bq", + "~bs", + "~t", + "~d", + "~m", + "~u", + "~c", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return self.valid_prefixes + + +class FlowType(BaseFlowType): + typ = flow.Flow + display = "flow" + + def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow: + flows = manager.call_args("view.resolve", [s]) + if len(flows) != 1: + raise exceptions.TypeError( + "Command requires one flow, specification matched %s." % len(flows) + ) + return flows[0] + + +class FlowsType(BaseFlowType): + typ = typing.Sequence[flow.Flow] + display = "[flow]" + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]: + return manager.call_args("view.resolve", [s]) + + +class DataType: + typ = Data + display = "[data]" + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + +class ChoiceType: + typ = Choice + display = "choice" + + def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]: + return manager.call(t.options_command) + + def parse(self, manager: _CommandStub, t: Choice, s: str) -> str: + opts = manager.call(t.options_command) + if s not in opts: + raise exceptions.TypeError("Invalid choice.") + return s + + +class TypeManager: + def __init__(self, *types): + self.typemap = {} + for t in types: + self.typemap[t.typ] = t() + + def get(self, t: type, default=None) -> BaseType: + if type(t) in self.typemap: + return self.typemap[type(t)] + return self.typemap.get(t, default) + + +CommandTypes = TypeManager( + ArgType, + Bool, + ChoiceType, + CmdType, + CutSpecType, + DataType, + FlowType, + FlowsType, + Int, + PathType, + Str, + StrSeq, +) diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 50ad3d55f..f9315dd24 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -4,6 +4,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons +import mitmproxy.types import io import pytest @@ -25,7 +26,7 @@ class TAddon: return foo @command.command("subcommand") - def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str: + def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: return "ok" @command.command("empty") @@ -39,12 +40,12 @@ class TAddon: def choices(self) -> typing.Sequence[str]: return ["one", "two", "three"] - @command.argument("arg", type=command.Choice("choices")) + @command.argument("arg", type=mitmproxy.types.Choice("choices")) def choose(self, arg: str) -> typing.Sequence[str]: return ["one", "two", "three"] @command.command("path") - def path(self, arg: command.Path) -> None: + def path(self, arg: mitmproxy.types.Path) -> None: pass @@ -79,45 +80,45 @@ class TestCommand: [ "foo bar", [ - command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "foo", type = mitmproxy.types.Cmd), command.ParseResult(value = "bar", type = str) ], ], [ "foo 'bar", [ - command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "foo", type = mitmproxy.types.Cmd), command.ParseResult(value = "'bar", type = str) ] ], - ["a", [command.ParseResult(value = "a", type = command.Cmd)]], - ["", [command.ParseResult(value = "", type = command.Cmd)]], + ["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]], + ["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]], [ "cmd3 1", [ - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "1", type = int), ] ], [ "cmd3 ", [ - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "", type = int), ] ], [ "subcommand ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "", type = command.Cmd), + command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd), + command.ParseResult(value = "", type = mitmproxy.types.Cmd), ] ], [ "subcommand cmd3 ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "", type = int), ] ], @@ -154,15 +155,15 @@ def test_typename(): assert command.typename(str) == "str" assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" - assert command.typename(command.Cuts) == "[cuts]" - assert command.typename(typing.Sequence[command.Cut]) == "[cut]" + assert command.typename(mitmproxy.types.Data) == "[data]" + assert command.typename(mitmproxy.types.CutSpec) == "[cut]" assert command.typename(flow.Flow) == "flow" assert command.typename(typing.Sequence[str]) == "[str]" - assert command.typename(command.Choice("foo")) == "choice" - assert command.typename(command.Path) == "path" - assert command.typename(command.Cmd) == "cmd" + assert command.typename(mitmproxy.types.Choice("foo")) == "choice" + assert command.typename(mitmproxy.types.Path) == "path" + assert command.typename(mitmproxy.types.Cmd) == "cmd" class DummyConsole: @@ -172,7 +173,7 @@ class DummyConsole: return [tflow.tflow(resp=True)] * n @command.command("cut") - def cut(self, spec: str) -> command.Cuts: + def cut(self, spec: str) -> mitmproxy.types.Data: return [["test"]] @@ -201,10 +202,6 @@ def test_parsearg(): with pytest.raises(exceptions.CommandError): command.parsearg(tctx.master.commands, "foo", Exception) - assert command.parsearg( - tctx.master.commands, "foo", command.Cuts - ) == [["test"]] - assert command.parsearg( tctx.master.commands, "foo", typing.Sequence[str] ) == ["foo"] @@ -215,18 +212,18 @@ def test_parsearg(): a = TAddon() tctx.master.commands.add("choices", a.choices) assert command.parsearg( - tctx.master.commands, "one", command.Choice("choices"), + tctx.master.commands, "one", mitmproxy.types.Choice("choices"), ) == "one" with pytest.raises(exceptions.CommandError): assert command.parsearg( - tctx.master.commands, "invalid", command.Choice("choices"), + tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"), ) assert command.parsearg( - tctx.master.commands, "foo", command.Path + tctx.master.commands, "foo", mitmproxy.types.Path ) == "foo" assert command.parsearg( - tctx.master.commands, "foo", command.Cmd + tctx.master.commands, "foo", mitmproxy.types.Cmd ) == "foo" @@ -272,5 +269,5 @@ def test_choice(): basic typechecking for choices should fail as we cannot verify if strings are a valid choice at this point. """ - c = command.Choice("foo") + c = mitmproxy.types.Choice("foo") assert not typecheck.check_command_type("foo", c) diff --git a/test/mitmproxy/test_typemanager.py b/test/mitmproxy/test_typemanager.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py new file mode 100644 index 000000000..81aaed749 --- /dev/null +++ b/test/mitmproxy/test_types.py @@ -0,0 +1,175 @@ +import pytest +import os +import typing +import contextlib + +from mitmproxy.test import tutils +import mitmproxy.exceptions +import mitmproxy.types +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy import command +from mitmproxy import flow + +from . import test_command + + +@contextlib.contextmanager +def chdir(path: str): + old_dir = os.getcwd() + os.chdir(path) + yield + os.chdir(old_dir) + + +def test_bool(): + with taddons.context() as tctx: + b = mitmproxy.types.Bool() + assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"] + assert b.parse(tctx.master.commands, bool, "true") is True + assert b.parse(tctx.master.commands, bool, "false") is False + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, bool, "foo") + + +def test_str(): + with taddons.context() as tctx: + b = mitmproxy.types.Str() + assert b.completion(tctx.master.commands, str, "") == [] + assert b.parse(tctx.master.commands, str, "foo") == "foo" + + +def test_int(): + with taddons.context() as tctx: + b = mitmproxy.types.Int() + assert b.completion(tctx.master.commands, int, "b") == [] + assert b.parse(tctx.master.commands, int, "1") == 1 + assert b.parse(tctx.master.commands, int, "999") == 999 + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, int, "foo") + + +def test_path(): + with taddons.context() as tctx: + b = mitmproxy.types.PathType() + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" + + def normPathOpts(prefix, match): + ret = [] + for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match): + s = s[len(prefix):] + s = s.replace(os.sep, "/") + ret.append(s) + return ret + + cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) + assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] + assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] + with chdir(cd): + assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] + assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] + assert b.completion( + tctx.master.commands, mitmproxy.types.Path, "nonexistent" + ) == ["nonexistent"] + + +def test_cmd(): + with taddons.context() as tctx: + tctx.master.addons.add(test_command.TAddon()) + b = mitmproxy.types.CmdType() + assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo" + assert len( + b.completion(tctx.master.commands, mitmproxy.types.Cmd, "") + ) == len(tctx.master.commands.commands.keys()) + + +def test_cutspec(): + with taddons.context() as tctx: + b = mitmproxy.types.CutSpecType() + b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"] + assert b.completion( + tctx.master.commands, mitmproxy.types.CutSpec, "request.p" + ) == b.valid_prefixes + ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f") + assert ret[0].startswith("request.port,") + assert len(ret) == len(b.valid_prefixes) + + +def test_arg(): + with taddons.context() as tctx: + b = mitmproxy.types.ArgType() + assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] + assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" + + +def test_strseq(): + with taddons.context() as tctx: + b = mitmproxy.types.StrSeq() + assert b.completion(tctx.master.commands, typing.Sequence[str], "") == [] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"] + + +class DummyConsole: + @command.command("view.resolve") + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + n = int(spec) + return [tflow.tflow(resp=True)] * n + + @command.command("cut") + def cut(self, spec: str) -> mitmproxy.types.Data: + return [["test"]] + + @command.command("options") + def options(self) -> typing.Sequence[str]: + return ["one", "two", "three"] + + +def test_flow(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.FlowType() + assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes) + assert b.parse(tctx.master.commands, flow.Flow, "1") + with pytest.raises(mitmproxy.exceptions.TypeError): + assert b.parse(tctx.master.commands, flow.Flow, "0") + with pytest.raises(mitmproxy.exceptions.TypeError): + assert b.parse(tctx.master.commands, flow.Flow, "2") + + +def test_flows(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.FlowsType() + assert len( + b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "") + ) == len(b.valid_prefixes) + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2 + + +def test_data(): + with taddons.context() as tctx: + b = mitmproxy.types.DataType() + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + + +def test_choice(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.ChoiceType() + comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "") + assert comp == ["one", "two", "three"] + assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one" + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid") + + +def test_typemanager(): + assert mitmproxy.types.CommandTypes.get(bool, None) + assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None) diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 823af06d2..34062dcb2 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,36 +1,6 @@ -import os -import contextlib from mitmproxy.tools.console.commander import commander from mitmproxy.test import taddons -from mitmproxy.test import tutils - - -@contextlib.contextmanager -def chdir(path: str): - old_dir = os.getcwd() - os.chdir(path) - yield - os.chdir(old_dir) - - -def normPathOpts(prefix, match): - ret = [] - for s in commander.pathOptions(match): - s = s[len(prefix):] - s = s.replace(os.sep, "/") - ret.append(s) - return ret - - -def test_pathOptions(): - cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) - assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] - assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] - with chdir(cd): - assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] - assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] - assert commander.pathOptions("nonexistent") == ["nonexistent"] class TestListCompleter: diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 66b1884e8..365509f15 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,7 +4,6 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck -from mitmproxy import command class TBase: @@ -95,9 +94,6 @@ def test_check_command_type(): assert(typecheck.check_command_type(None, None)) assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) assert(not typecheck.check_command_type("foo", typing.Sequence[int])) - assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) - assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) - assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) # Python 3.5 only defines __parameters__ m = mock.Mock()