From 4464648c38021bb3fcbac6f978bf40e9636dad4c Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 13:08:09 -0500 Subject: [PATCH] Logic to handle multiple instances using CommandHistory. --- mitmproxy/addons/command_history.py | 30 +++++-- test/mitmproxy/addons/test_command_history.py | 82 +++++++++++++++++-- 2 files changed, 99 insertions(+), 13 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 8c1ef052e..fb27e8051 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,7 +1,7 @@ +import atexit import collections import os import typing -import atexit from mitmproxy import command from mitmproxy import ctx @@ -19,12 +19,12 @@ class CommandHistory: if not os.path.exists(_command_history_dir): os.makedirs(_command_history_dir) - _command_history_path = os.path.join(_command_history_dir, 'command_history') + self.command_history_path = os.path.join(_command_history_dir, 'command_history') _history_lines: typing.List[str] = [] - if os.path.exists(_command_history_path): - _history_lines = open(_command_history_path, 'r').readlines() + if os.path.exists(self.command_history_path): + _history_lines = open(self.command_history_path, 'r').readlines() - self.command_history_file = open(_command_history_path, 'w') + self.command_history_file = open(self.command_history_path, 'w') for l in _history_lines: self.add_command(l.strip()) @@ -32,7 +32,8 @@ class CommandHistory: atexit.register(self.cleanup) def cleanup(self): - if self.command_history_file: + self._reload_saved_commands() + if self.command_history_file and not self.command_history_file.closed: self.command_history_file.close() @property @@ -99,9 +100,10 @@ class CommandHistory: if command.strip() == '': return + self._reload_saved_commands() + if command in self.saved_commands: self.saved_commands.remove(command) - self.saved_commands.append(command) _history_str = "\n".join(self.saved_commands) @@ -111,3 +113,17 @@ class CommandHistory: self.command_history_file.flush() self.restart() + + def _reload_saved_commands(self): + # First read all commands from the file to merge anything that may + # have come from a different instance of the mitmproxy or sister tools + if not os.path.exists(self.command_history_path): + return + + _history_lines = open(self.command_history_path, 'r').readlines() + self.saved_commands.clear() + for l in _history_lines: + l = l.strip() + if l in self.saved_commands: + self.saved_commands.remove(l) + self.saved_commands.append(l.strip()) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 405b83235..d22e1fb19 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -25,7 +25,7 @@ def tctx(): yield tctx # This runs after each test - ch.command_history_file.close() # Makes windows happy + ch.cleanup() shutil.rmtree(confdir) @@ -42,7 +42,7 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] - history.command_history_file.close() + history.cleanup() def test_add_command(self, tctx): history = command_history.CommandHistory(3) @@ -77,7 +77,7 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] - history.command_history_file.close() + history.cleanup() def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) @@ -152,7 +152,7 @@ class TestCommandHistory: assert history.get_next() == '' assert history.get_next() == '' - history.command_history_file.close() + history.cleanup() def test_clear(self, tctx): history = command_history.CommandHistory(3) @@ -169,7 +169,7 @@ class TestCommandHistory: assert history.get_prev() == '' assert history.get_prev() == '' - history.command_history_file.close() + history.cleanup() def test_filter(self, tctx): history = command_history.CommandHistory(3) @@ -203,4 +203,74 @@ class TestCommandHistory: assert history.get_next() == '' assert history.get_next() == '' - history.command_history_file.close() + history.cleanup() + + def test_multiple_instances(self, tctx): + + instances = [ + command_history.CommandHistory(10), + command_history.CommandHistory(10), + command_history.CommandHistory(10) + ] + + for i in instances: + saved_commands = [cmd for cmd in i.saved_commands] + assert saved_commands == [] + + instances[0].add_command('cmd1') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1'] + + # These instances haven't yet added a new command, so they haven't + # yet reloaded their commands from the command file. + # This is expected, because if the user is filtering a command on + # another window, we don't want to interfere with that + saved_commands = [cmd for cmd in instances[1].saved_commands] + assert saved_commands == [] + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == [] + + # Since the second instanced added a new command, its list of + # saved commands has been updated to have the commands from the + # first instance + its own commands + instances[1].add_command('cmd2') + saved_commands = [cmd for cmd in instances[1].saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1'] + + # Third instance is still empty as it has not yet ran any command + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == [] + + instances[2].add_command('cmd3') + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + + instances[0].add_command('cmd4') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] + + instances.append(command_history.CommandHistory(10)) + saved_commands = [cmd for cmd in instances[3].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] + + instances[0].add_command('cmd_before_close') + instances.pop(0) + + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + instances[0].add_command('new_cmd') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] + + instances.pop(0) + instances.pop(0) + instances.pop(0) + + _path = os.path.join(tctx.options.confdir, 'command_history') + lines = open(_path, 'r').readlines() + saved_commands = [cmd.strip() for cmd in lines] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']