Logic to handle multiple instances using CommandHistory.

This commit is contained in:
Henrique 2019-11-25 13:08:09 -05:00
parent 5b582a76a8
commit 4464648c38
2 changed files with 99 additions and 13 deletions

View File

@ -1,7 +1,7 @@
import atexit
import collections
import os
import typing
import atexit
from mitmproxy import command
from mitmproxy import ctx
@ -19,12 +19,12 @@ class CommandHistory:
if not os.path.exists(_command_history_dir):
os.makedirs(_command_history_dir)
_command_history_path = os.path.join(_command_history_dir, 'command_history')
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
_history_lines: typing.List[str] = []
if os.path.exists(_command_history_path):
_history_lines = open(_command_history_path, 'r').readlines()
if os.path.exists(self.command_history_path):
_history_lines = open(self.command_history_path, 'r').readlines()
self.command_history_file = open(_command_history_path, 'w')
self.command_history_file = open(self.command_history_path, 'w')
for l in _history_lines:
self.add_command(l.strip())
@ -32,7 +32,8 @@ class CommandHistory:
atexit.register(self.cleanup)
def cleanup(self):
if self.command_history_file:
self._reload_saved_commands()
if self.command_history_file and not self.command_history_file.closed:
self.command_history_file.close()
@property
@ -99,9 +100,10 @@ class CommandHistory:
if command.strip() == '':
return
self._reload_saved_commands()
if command in self.saved_commands:
self.saved_commands.remove(command)
self.saved_commands.append(command)
_history_str = "\n".join(self.saved_commands)
@ -111,3 +113,17 @@ class CommandHistory:
self.command_history_file.flush()
self.restart()
def _reload_saved_commands(self):
# First read all commands from the file to merge anything that may
# have come from a different instance of the mitmproxy or sister tools
if not os.path.exists(self.command_history_path):
return
_history_lines = open(self.command_history_path, 'r').readlines()
self.saved_commands.clear()
for l in _history_lines:
l = l.strip()
if l in self.saved_commands:
self.saved_commands.remove(l)
self.saved_commands.append(l.strip())

View File

@ -25,7 +25,7 @@ def tctx():
yield tctx
# This runs after each test
ch.command_history_file.close() # Makes windows happy
ch.cleanup()
shutil.rmtree(confdir)
@ -42,7 +42,7 @@ class TestCommandHistory:
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
history.command_history_file.close()
history.cleanup()
def test_add_command(self, tctx):
history = command_history.CommandHistory(3)
@ -77,7 +77,7 @@ class TestCommandHistory:
saved_commands = [cmd for cmd in history.saved_commands]
assert saved_commands == ['cmd4', 'cmd3', 'cmd2']
history.command_history_file.close()
history.cleanup()
def test_get_next_and_prev(self, tctx):
history = command_history.CommandHistory(5)
@ -152,7 +152,7 @@ class TestCommandHistory:
assert history.get_next() == ''
assert history.get_next() == ''
history.command_history_file.close()
history.cleanup()
def test_clear(self, tctx):
history = command_history.CommandHistory(3)
@ -169,7 +169,7 @@ class TestCommandHistory:
assert history.get_prev() == ''
assert history.get_prev() == ''
history.command_history_file.close()
history.cleanup()
def test_filter(self, tctx):
history = command_history.CommandHistory(3)
@ -203,4 +203,74 @@ class TestCommandHistory:
assert history.get_next() == ''
assert history.get_next() == ''
history.command_history_file.close()
history.cleanup()
def test_multiple_instances(self, tctx):
instances = [
command_history.CommandHistory(10),
command_history.CommandHistory(10),
command_history.CommandHistory(10)
]
for i in instances:
saved_commands = [cmd for cmd in i.saved_commands]
assert saved_commands == []
instances[0].add_command('cmd1')
saved_commands = [cmd for cmd in instances[0].saved_commands]
assert saved_commands == ['cmd1']
# These instances haven't yet added a new command, so they haven't
# yet reloaded their commands from the command file.
# This is expected, because if the user is filtering a command on
# another window, we don't want to interfere with that
saved_commands = [cmd for cmd in instances[1].saved_commands]
assert saved_commands == []
saved_commands = [cmd for cmd in instances[2].saved_commands]
assert saved_commands == []
# Since the second instanced added a new command, its list of
# saved commands has been updated to have the commands from the
# first instance + its own commands
instances[1].add_command('cmd2')
saved_commands = [cmd for cmd in instances[1].saved_commands]
assert saved_commands == ['cmd1', 'cmd2']
saved_commands = [cmd for cmd in instances[0].saved_commands]
assert saved_commands == ['cmd1']
# Third instance is still empty as it has not yet ran any command
saved_commands = [cmd for cmd in instances[2].saved_commands]
assert saved_commands == []
instances[2].add_command('cmd3')
saved_commands = [cmd for cmd in instances[2].saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
instances[0].add_command('cmd4')
saved_commands = [cmd for cmd in instances[0].saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances.append(command_history.CommandHistory(10))
saved_commands = [cmd for cmd in instances[3].saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances[0].add_command('cmd_before_close')
instances.pop(0)
saved_commands = [cmd for cmd in instances[0].saved_commands]
assert saved_commands == ['cmd1', 'cmd2']
instances[0].add_command('new_cmd')
saved_commands = [cmd for cmd in instances[0].saved_commands]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']
instances.pop(0)
instances.pop(0)
instances.pop(0)
_path = os.path.join(tctx.options.confdir, 'command_history')
lines = open(_path, 'r').readlines()
saved_commands = [cmd.strip() for cmd in lines]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']