diff --git a/mitmproxy/command.py b/mitmproxy/command.py index cf345c22c..625e87e5e 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -11,49 +11,9 @@ import functools import sys from mitmproxy import exceptions +from mitmproxy import lexer import mitmproxy.types -def escape_and_quote(value): - """ - This function takes the output from the lexer and puts it between quotes - in the following cases: - * There is a space in the string: The only way a token from the lexer can have a space in it is if it was between quotes - * There is one or more quotes in the middle of the string: The only way for a token to have a quote in it that is not escaped is if it was escaped prior to being processed by the lexer. For example, the string `"s1 \" s2"` would come back from the lexer as `s1 " s2`. - - Any quotes that are in the middle of the string and that are not escaped will also be escaped (by placing a \ in front of it). - This function only deals with double quotes and they are the only ones that should be used. - """ - - new_value = "" - last_pos = len(value) - 1 - - for pos, char in enumerate(value): - if pos == 0: - new_value += char - continue - - # if pos == last_pos: - # new_value += char - # break - - if char in " \n\r\t": - new_value += char - continue - - if char == '"': - if value[pos-1] != '\\': - new_value += '\\' - - new_value += char - - value = new_value - - if ((" " in value) or ('"' in value)) and not (value.startswith("\"") or value.startswith("'")): - return "\"%s\"" % value - - return value - - def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: sig = inspect.signature(f) try: @@ -62,13 +22,8 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: raise exceptions.CommandError("command argument mismatch: %s" % v.args[0]) -def lexer(s): - # mypy mis-identifies shlex.shlex as abstract - lex = shlex.shlex(s, posix=True) # type: ignore - lex.wordchars += "." - lex.whitespace_split = True - lex.commenters = '' - return lex +def get_lexer(s): + return lexer.Lexer(s) def typename(t: type) -> str: @@ -199,7 +154,7 @@ class CommandManager(mitmproxy.types._CommandBase): """ buf = io.StringIO(cmdstr) parts: typing.List[str] = [] - lex = lexer(buf) + lex = get_lexer(buf) while 1: remainder = cmdstr[buf.tell():] try: @@ -245,7 +200,7 @@ class CommandManager(mitmproxy.types._CommandBase): # ctx.log.info('[gilga] before parse.append. value = %s' % parts[i]) parse.append( ParseResult( - value=escape_and_quote(parts[i]), + value=parts[i], type=typ, valid=valid, ) diff --git a/mitmproxy/lexer.py b/mitmproxy/lexer.py new file mode 100644 index 000000000..5187a718f --- /dev/null +++ b/mitmproxy/lexer.py @@ -0,0 +1,154 @@ +from enum import Enum +import io +from typing import Union +import pdb + + +class State(Enum): + QUOTE = 1 + ESCAPE = 2 + TEXT = 3 + + +class Lexer: + + def __init__(self, text: Union[str, io.StringIO]): + self._tokens = [] + self._count = 0 + self._parsed = False + + self._state = State.TEXT + self._states = [] + self._text_pos = 0 + self._quote_start_pos = 0 + + if isinstance(text, str): + self.text = io.StringIO(text) + else: + self.text = text + + def __iter__(self): + return self + + def __next__(self): + t = self.get_token() + + if t == '': + raise StopIteration + + return t + + def get_token(self): + + try: + return self.parse() + except ValueError as e: + raise + + if len(self._tokens) > 0: + ret = self._tokens[0] + self._tokens = self._tokens[1:] + else: + ret = None + return ret + + #def get_remainder(self): + # try: + # self.parse() + # except ValueError as e: + # return self.text + # + + # return ' '.join(self._tokens) + + def parse(self): + acc = '' + quote = '' # used by the parser + tokens = [] + self._state = State.TEXT + text = self.text + i = 0 + + #self.text.seek(self._text_pos) + + while True: + ch = self.text.read(1) + self._text_pos += 1 + + #pdb.set_trace() + + + # If this is the last char of the string, let's save the token + if ch == '' or ch is None: + break + + if self._state == State.QUOTE: + if ch == '\\': + self._states.append(self._state) + self._state = State.ESCAPE + acc += ch + elif ch == quote: + self._state = self._states.pop() + acc += ch + else: + acc += ch + + elif self._state == State.ESCAPE: + acc += ch + self._state = self._states.pop() + + elif self._state == State.TEXT: + if ch == ' ': + if acc != '': + break + elif ch == '"' or ch == "'": + quote = ch + self._quote_start_pos = self._text_pos + self._states.append(self._state) + self._state = State.QUOTE + acc += ch + elif ch == '\\': + # TODO: Does it make sense to go to State.ESCAPE from State.TEXT? + self._states.append(self._state) + self._state = State.ESCAPE + acc += ch + else: + acc += ch + else: + print("This shouldn't have happened") + exit(-1) + + self._token = acc + + if self._state == State.QUOTE: + raise ValueError("No closing quotation for quote in position %d" % self._quote_start_pos) + + return self._token + + +if __name__ == '__main__': + + cases = [] + cases.append(r'abc') + cases.append(r'Hello World') + cases.append(r'"Hello \" World"') + cases.append(r"'Hello \' World'") + cases.append(r'"\""') + cases.append(r'abc "def\" \x bla \z \\ \e \ " xpto') + cases.append(r'') + cases.append(r' ') + cases.append(r' ') + cases.append(r' ') + cases.append(r' ') + cases.append(r'Hello World ') + + for s in cases: + lex = Lexer(s) + tokens = list(lex) + + if len(tokens) == 1: + print('%s = %d token' % (str(tokens), len(tokens))) + else: + print('%s = %d tokens' % (str(tokens), len(tokens))) + +