Merge branch 'command-history-file' of github.com:typoon/mitmproxy into command-history-file

This commit is contained in:
Henrique 2019-11-27 08:13:19 -05:00
commit 863d2fbcb2
5 changed files with 98 additions and 214 deletions

View File

@ -1,6 +1,5 @@
import atexit
import collections
import os import os
import pathlib
import typing import typing
from mitmproxy import command from mitmproxy import command
@ -8,137 +7,77 @@ from mitmproxy import ctx
class CommandHistory: class CommandHistory:
def __init__(self, size: int = 300) -> None: VACUUM_SIZE = 1024
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
self.is_configured = False
self.filtered_commands: typing.Deque[str] = collections.deque() def __init__(self) -> None:
self.current_index: int = -1 self.history: typing.List[str] = []
self.filter_str: str = '' self.filtered_history: typing.List[str] = [""]
self.command_history_path: str = '' self.current_index: int = 0
atexit.register(self.cleanup) def load(self, loader):
loader.add_option(
def cleanup(self): "command_history", bool, True,
self._sync_saved_commands() """Persist command history between mitmproxy invocations."""
)
@property @property
def last_filtered_index(self): def history_file(self) -> pathlib.Path:
return len(self.filtered_commands) - 1 return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
@command.command("command_history.clear") def running(self):
def clear_history(self): # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
self.saved_commands.clear() # confdir or command_history as updated.
self.filtered_commands.clear() self.configure("command_history")
with open(self.command_history_path, 'w') as f: def configure(self, updated):
f.truncate(0) if "command_history" in updated or "confdir" in updated:
f.seek(0) if ctx.options.command_history and self.history_file.is_file():
f.flush() self.history = self.history_file.read_text().splitlines()
f.close()
self.restart() def done(self):
if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE:
# vacuum history so that it doesn't grow indefinitely.
history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n"
self.history_file.write_text(history_str)
@command.command("command_history.cancel") @command.command("commands.history.add")
def restart(self) -> None:
self.filtered_commands = self.saved_commands.copy()
self.current_index = -1
@command.command("command_history.next")
def get_next(self) -> str:
if self.current_index == -1 or self.current_index == self.last_filtered_index:
self.current_index = -1
return ''
elif self.current_index < self.last_filtered_index:
self.current_index += 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.prev")
def get_prev(self) -> str:
if self.current_index == -1:
if self.last_filtered_index >= 0:
self.current_index = self.last_filtered_index
else:
return ''
elif self.current_index > 0:
self.current_index -= 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.filter")
def set_filter(self, command: str) -> None:
self.filter_str = command
_filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
self.filtered_commands = collections.deque(_filtered_commands)
if command and command not in self.filtered_commands:
self.filtered_commands.append(command)
self.current_index = -1
@command.command("command_history.add")
def add_command(self, command: str) -> None: def add_command(self, command: str) -> None:
if command.strip() == '': if not command.strip():
return return
self._sync_saved_commands() self.history.append(command)
if ctx.options.command_history:
with self.history_file.open("a") as f:
f.write(f"{command}\n")
if command in self.saved_commands: @command.command("commands.history.get")
self.saved_commands.remove(command) def get_history(self) -> typing.Sequence[str]:
self.saved_commands.append(command) """Get the entire command history."""
return self.history.copy()
_history_str = "\n".join(self.saved_commands) @command.command("commands.history.clear")
with open(self.command_history_path, 'w') as f: def clear_history(self):
f.truncate(0) self.history_file.unlink()
f.seek(0) self.history = []
f.write(_history_str)
f.flush()
f.close()
self.restart() # Functionality to provide a filtered list that can be iterated through.
def _sync_saved_commands(self): @command.command("commands.history.filter")
# First read all commands from the file to merge anything that may def set_filter(self, prefix: str) -> None:
# have come from a different instance of the mitmproxy or sister tools self.filtered_history = [
if not os.path.exists(self.command_history_path): cmd
return for cmd in self.history
if cmd.startswith(prefix)
]
self.filtered_history.append(prefix)
self.current_index = len(self.filtered_history) - 1
with open(self.command_history_path, 'r') as f: @command.command("commands.history.next")
_history_lines = f.readlines() def get_next(self) -> str:
f.close() self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1)
return self.filtered_history[self.current_index]
self.saved_commands.clear() @command.command("commands.history.prev")
for l in _history_lines: def get_prev(self) -> str:
l = l.strip() self.current_index = max(0, self.current_index - 1)
if l in self.saved_commands: return self.filtered_history[self.current_index]
self.saved_commands.remove(l)
self.saved_commands.append(l.strip())
def configure(self, updated: typing.Set[str]):
if self.is_configured:
return
_command_history_dir = os.path.expanduser(ctx.options.confdir)
if not os.path.exists(_command_history_dir):
os.makedirs(_command_history_dir)
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
_history_lines: typing.List[str] = []
if os.path.exists(self.command_history_path):
with open(self.command_history_path, 'r') as f:
_history_lines = f.readlines()
f.close()
for l in _history_lines:
self.add_command(l.strip())
self.is_configured = True

View File

@ -9,8 +9,6 @@ import mitmproxy.flow
import mitmproxy.master import mitmproxy.master
import mitmproxy.types import mitmproxy.types
from mitmproxy import command_lexer
class Completer: class Completer:
@abc.abstractmethod @abc.abstractmethod
@ -200,7 +198,7 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf.backspace() self.cbuf.backspace()
if self.cbuf.text == '': if self.cbuf.text == '':
self.active_filter = False self.active_filter = False
self.master.commands.execute("command_history.filter ''") self.master.commands.call("commands.history.filter", "")
self.filter_str = '' self.filter_str = ''
elif key == "left" or key == "ctrl b": elif key == "left" or key == "ctrl b":
self.cbuf.left() self.cbuf.left()
@ -210,20 +208,19 @@ class CommandEdit(urwid.WidgetWrap):
if self.active_filter is False: if self.active_filter is False:
self.active_filter = True self.active_filter = True
self.filter_str = self.cbuf.text self.filter_str = self.cbuf.text
_cmd = command_lexer.quote(self.cbuf.text) self.master.commands.call("commands.history.filter", self.cbuf.text)
self.master.commands.execute("command_history.filter %s" % _cmd) cmd = self.master.commands.execute("commands.history.prev")
cmd = self.master.commands.execute("command_history.prev")
self.cbuf = CommandBuffer(self.master, cmd) self.cbuf = CommandBuffer(self.master, cmd)
elif key == "down" or key == "ctrl n": elif key == "down" or key == "ctrl n":
prev_cmd = self.cbuf.text prev_cmd = self.cbuf.text
cmd = self.master.commands.execute("command_history.next") cmd = self.master.commands.execute("commands.history.next")
if cmd == '': if cmd == '':
if prev_cmd == self.filter_str: if prev_cmd == self.filter_str:
self.cbuf = CommandBuffer(self.master, prev_cmd) self.cbuf = CommandBuffer(self.master, prev_cmd)
else: else:
self.active_filter = False self.active_filter = False
self.master.commands.execute("command_history.filter ''") self.master.commands.call("commands.history.filter", "")
self.filter_str = '' self.filter_str = ''
self.cbuf = CommandBuffer(self.master, '') self.cbuf = CommandBuffer(self.master, '')
else: else:

View File

@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap):
def keypress(self, size, k): def keypress(self, size, k):
if self.prompting: if self.prompting:
if k == "esc": if k == "esc":
self.master.commands.execute('command_history.cancel')
self.prompt_done() self.prompt_done()
elif self.onekey: elif self.onekey:
if k == "enter": if k == "enter":
@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap):
elif k in self.onekey: elif k in self.onekey:
self.prompt_execute(k) self.prompt_execute(k)
elif k == "enter": elif k == "enter":
cmd = command_lexer.quote(self._w.cbuf.text) text = self._w.get_edit_text()
self.master.commands.execute(f"command_history.add {cmd}") self.prompt_execute(text)
self.prompt_execute(self._w.get_edit_text()) self.master.commands.call("commands.history.add", text)
else: else:
if common.is_keypress(k): if common.is_keypress(k):
self._w.keypress(size, k) self._w.keypress(size, k)

View File

@ -1,81 +1,30 @@
import os import os
import pytest
from mitmproxy import options
from mitmproxy.addons import command_history from mitmproxy.addons import command_history
from mitmproxy.test import taddons from mitmproxy.test import taddons
@pytest.fixture(autouse=True)
def tctx(tmpdir):
# This runs before each test
dir_name = tmpdir.mkdir('mitmproxy').dirname
confdir = dir_name
opts = options.Options()
opts.set(*[f"confdir={confdir}"])
tctx = taddons.context(options=opts)
ch = command_history.CommandHistory()
tctx.master.addons.add(ch)
ch.configure([])
yield tctx
# This runs after each test
ch.cleanup()
class TestCommandHistory: class TestCommandHistory:
def test_existing_command_history(self, tctx): def test_load_from_file(self, tmpdir):
commands = ['cmd1', 'cmd2', 'cmd3'] commands = ['cmd1', 'cmd2', 'cmd3']
confdir = tctx.options.confdir with open(tmpdir.join('command_history'), 'w') as f:
f = open(os.path.join(confdir, 'command_history'), 'w') f.write("\n".join(commands))
f.write("\n".join(commands))
f.close()
ch = command_history.CommandHistory()
with taddons.context(ch) as tctx:
tctx.options.confdir = str(tmpdir)
assert ch.history == commands
def test_add_command(self):
history = command_history.CommandHistory() history = command_history.CommandHistory()
history.configure([])
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
history.cleanup()
def test_add_command(self, tctx):
history = command_history.CommandHistory(3)
history.configure([])
history.add_command('cmd1') history.add_command('cmd1')
history.add_command('cmd2') history.add_command('cmd2')
saved_commands = [cmd for cmd in history.saved_commands] assert history.history == ['cmd1', 'cmd2']
assert saved_commands == ['cmd1', 'cmd2']
history.add_command('') history.add_command('')
saved_commands = [cmd for cmd in history.saved_commands] assert history.history == ['cmd1', 'cmd2']
assert saved_commands == ['cmd1', 'cmd2']
# The history size is only 3. So, we forget the first
# one command, when adding fourth command
history.add_command('cmd3')
history.add_command('cmd4')
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd2', 'cmd3', 'cmd4']
history.add_command('')
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd2', 'cmd3', 'cmd4']
# Commands with the same text are not repeated in the history one by one
history.add_command('cmd3')
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd2', 'cmd4', 'cmd3']
history.add_command('cmd2')
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd4', 'cmd3', 'cmd2']
history.cleanup()
def test_get_next_and_prev(self, tctx): def test_get_next_and_prev(self, tctx):
history = command_history.CommandHistory(5) history = command_history.CommandHistory(5)
@ -161,7 +110,7 @@ class TestCommandHistory:
history.add_command('cmd2') history.add_command('cmd2')
history.clear_history() history.clear_history()
saved_commands = [cmd for cmd in history.saved_commands] saved_commands = [cmd for cmd in history.history]
assert saved_commands == [] assert saved_commands == []
assert history.get_next() == '' assert history.get_next() == ''
@ -215,57 +164,57 @@ class TestCommandHistory:
for i in instances: for i in instances:
i.configure([]) i.configure([])
saved_commands = [cmd for cmd in i.saved_commands] saved_commands = [cmd for cmd in i.history]
assert saved_commands == [] assert saved_commands == []
instances[0].add_command('cmd1') instances[0].add_command('cmd1')
saved_commands = [cmd for cmd in instances[0].saved_commands] saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1'] assert saved_commands == ['cmd1']
# These instances haven't yet added a new command, so they haven't # These instances haven't yet added a new command, so they haven't
# yet reloaded their commands from the command file. # yet reloaded their commands from the command file.
# This is expected, because if the user is filtering a command on # This is expected, because if the user is filtering a command on
# another window, we don't want to interfere with that # another window, we don't want to interfere with that
saved_commands = [cmd for cmd in instances[1].saved_commands] saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == [] assert saved_commands == []
saved_commands = [cmd for cmd in instances[2].saved_commands] saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == [] assert saved_commands == []
# Since the second instanced added a new command, its list of # Since the second instanced added a new command, its list of
# saved commands has been updated to have the commands from the # saved commands has been updated to have the commands from the
# first instance + its own commands # first instance + its own commands
instances[1].add_command('cmd2') instances[1].add_command('cmd2')
saved_commands = [cmd for cmd in instances[1].saved_commands] saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == ['cmd1', 'cmd2'] assert saved_commands == ['cmd1', 'cmd2']
saved_commands = [cmd for cmd in instances[0].saved_commands] saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1'] assert saved_commands == ['cmd1']
# Third instance is still empty as it has not yet ran any command # Third instance is still empty as it has not yet ran any command
saved_commands = [cmd for cmd in instances[2].saved_commands] saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == [] assert saved_commands == []
instances[2].add_command('cmd3') instances[2].add_command('cmd3')
saved_commands = [cmd for cmd in instances[2].saved_commands] saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
instances[0].add_command('cmd4') instances[0].add_command('cmd4')
saved_commands = [cmd for cmd in instances[0].saved_commands] saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances.append(command_history.CommandHistory(10)) instances.append(command_history.CommandHistory(10))
instances[3].configure([]) instances[3].configure([])
saved_commands = [cmd for cmd in instances[3].saved_commands] saved_commands = [cmd for cmd in instances[3].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances[0].add_command('cmd_before_close') instances[0].add_command('cmd_before_close')
instances.pop(0) instances.pop(0)
saved_commands = [cmd for cmd in instances[0].saved_commands] saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2'] assert saved_commands == ['cmd1', 'cmd2']
instances[0].add_command('new_cmd') instances[0].add_command('new_cmd')
saved_commands = [cmd for cmd in instances[0].saved_commands] saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']
instances.pop(0) instances.pop(0)
@ -285,7 +234,7 @@ class TestCommandHistory:
for i in instances: for i in instances:
i.configure([]) i.configure([])
i.clear_history() i.clear_history()
saved_commands = [cmd for cmd in i.saved_commands] saved_commands = [cmd for cmd in i.history]
assert saved_commands == [] assert saved_commands == []
instances[0].add_command('cmd1') instances[0].add_command('cmd1')
@ -294,7 +243,7 @@ class TestCommandHistory:
instances[1].add_command('cmd4') instances[1].add_command('cmd4')
instances[1].add_command('cmd5') instances[1].add_command('cmd5')
saved_commands = [cmd for cmd in instances[1].saved_commands] saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5']
instances.pop() instances.pop()

View File

@ -114,8 +114,8 @@ class TestCommandEdit:
def test_up_and_down(self, tctx): def test_up_and_down(self, tctx):
edit = commander.CommandEdit(tctx.master, '') edit = commander.CommandEdit(tctx.master, '')
tctx.master.commands.execute('command_history.clear') tctx.master.commands.execute('commands.history.clear')
tctx.master.commands.execute('command_history.add "cmd1"') tctx.master.commands.execute('commands.history.add "cmd1"')
edit.keypress(1, 'up') edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd1' assert edit.get_edit_text() == 'cmd1'
@ -131,9 +131,9 @@ class TestCommandEdit:
edit = commander.CommandEdit(tctx.master, '') edit = commander.CommandEdit(tctx.master, '')
tctx.master.commands.execute('command_history.clear') tctx.master.commands.execute('commands.history.clear')
tctx.master.commands.execute('command_history.add "cmd1"') tctx.master.commands.execute('commands.history.add "cmd1"')
tctx.master.commands.execute('command_history.add "cmd2"') tctx.master.commands.execute('commands.history.add "cmd2"')
edit.keypress(1, 'up') edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd2' assert edit.get_edit_text() == 'cmd2'
@ -168,7 +168,7 @@ class TestCommandEdit:
assert edit.get_edit_text() == 'abc' assert edit.get_edit_text() == 'abc'
edit = commander.CommandEdit(tctx.master, '') edit = commander.CommandEdit(tctx.master, '')
tctx.master.commands.execute('command_history.add "cmd3"') tctx.master.commands.execute('commands.history.add "cmd3"')
edit.keypress(1, 'z') edit.keypress(1, 'z')
edit.keypress(1, 'up') edit.keypress(1, 'up')