mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 10:16:27 +00:00
Addressing comments from review
This commit is contained in:
parent
0f0fc15c16
commit
68b016e180
@ -10,31 +10,17 @@ from mitmproxy import ctx
|
|||||||
class CommandHistory:
|
class CommandHistory:
|
||||||
def __init__(self, size: int = 300) -> None:
|
def __init__(self, size: int = 300) -> None:
|
||||||
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
|
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
|
||||||
|
self.is_configured = False
|
||||||
|
|
||||||
self.filtered_commands: typing.Deque[str] = collections.deque()
|
self.filtered_commands: typing.Deque[str] = collections.deque()
|
||||||
self.current_index: int = -1
|
self.current_index: int = -1
|
||||||
self.filter_str: str = ''
|
self.filter_str: str = ''
|
||||||
|
self.command_history_path: str = ''
|
||||||
_command_history_dir = os.path.expanduser(ctx.options.confdir)
|
|
||||||
if not os.path.exists(_command_history_dir):
|
|
||||||
os.makedirs(_command_history_dir)
|
|
||||||
|
|
||||||
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
|
|
||||||
_history_lines: typing.List[str] = []
|
|
||||||
if os.path.exists(self.command_history_path):
|
|
||||||
_history_lines = open(self.command_history_path, 'r').readlines()
|
|
||||||
|
|
||||||
self.command_history_file = open(self.command_history_path, 'w')
|
|
||||||
|
|
||||||
for l in _history_lines:
|
|
||||||
self.add_command(l.strip())
|
|
||||||
|
|
||||||
atexit.register(self.cleanup)
|
atexit.register(self.cleanup)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self._reload_saved_commands()
|
self._sync_saved_commands()
|
||||||
if self.command_history_file and not self.command_history_file.closed:
|
|
||||||
self.command_history_file.close()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_filtered_index(self):
|
def last_filtered_index(self):
|
||||||
@ -44,9 +30,13 @@ class CommandHistory:
|
|||||||
def clear_history(self):
|
def clear_history(self):
|
||||||
self.saved_commands.clear()
|
self.saved_commands.clear()
|
||||||
self.filtered_commands.clear()
|
self.filtered_commands.clear()
|
||||||
self.command_history_file.truncate(0)
|
|
||||||
self.command_history_file.seek(0)
|
with open(self.command_history_path, 'w') as f:
|
||||||
self.command_history_file.flush()
|
f.truncate(0)
|
||||||
|
f.seek(0)
|
||||||
|
f.flush()
|
||||||
|
f.close()
|
||||||
|
|
||||||
self.restart()
|
self.restart()
|
||||||
|
|
||||||
@command.command("command_history.cancel")
|
@command.command("command_history.cancel")
|
||||||
@ -100,30 +90,55 @@ class CommandHistory:
|
|||||||
if command.strip() == '':
|
if command.strip() == '':
|
||||||
return
|
return
|
||||||
|
|
||||||
self._reload_saved_commands()
|
self._sync_saved_commands()
|
||||||
|
|
||||||
if command in self.saved_commands:
|
if command in self.saved_commands:
|
||||||
self.saved_commands.remove(command)
|
self.saved_commands.remove(command)
|
||||||
self.saved_commands.append(command)
|
self.saved_commands.append(command)
|
||||||
|
|
||||||
_history_str = "\n".join(self.saved_commands)
|
_history_str = "\n".join(self.saved_commands)
|
||||||
self.command_history_file.truncate(0)
|
with open(self.command_history_path, 'w') as f:
|
||||||
self.command_history_file.seek(0)
|
f.truncate(0)
|
||||||
self.command_history_file.write(_history_str)
|
f.seek(0)
|
||||||
self.command_history_file.flush()
|
f.write(_history_str)
|
||||||
|
f.flush()
|
||||||
|
f.close()
|
||||||
|
|
||||||
self.restart()
|
self.restart()
|
||||||
|
|
||||||
def _reload_saved_commands(self):
|
def _sync_saved_commands(self):
|
||||||
# First read all commands from the file to merge anything that may
|
# First read all commands from the file to merge anything that may
|
||||||
# have come from a different instance of the mitmproxy or sister tools
|
# have come from a different instance of the mitmproxy or sister tools
|
||||||
if not os.path.exists(self.command_history_path):
|
if not os.path.exists(self.command_history_path):
|
||||||
return
|
return
|
||||||
|
|
||||||
_history_lines = open(self.command_history_path, 'r').readlines()
|
with open(self.command_history_path, 'r') as f:
|
||||||
|
_history_lines = f.readlines()
|
||||||
|
f.close()
|
||||||
|
|
||||||
self.saved_commands.clear()
|
self.saved_commands.clear()
|
||||||
for l in _history_lines:
|
for l in _history_lines:
|
||||||
l = l.strip()
|
l = l.strip()
|
||||||
if l in self.saved_commands:
|
if l in self.saved_commands:
|
||||||
self.saved_commands.remove(l)
|
self.saved_commands.remove(l)
|
||||||
self.saved_commands.append(l.strip())
|
self.saved_commands.append(l.strip())
|
||||||
|
|
||||||
|
def configure(self, updated: typing.Set[str]):
|
||||||
|
if self.is_configured:
|
||||||
|
return
|
||||||
|
|
||||||
|
_command_history_dir = os.path.expanduser(ctx.options.confdir)
|
||||||
|
if not os.path.exists(_command_history_dir):
|
||||||
|
os.makedirs(_command_history_dir)
|
||||||
|
|
||||||
|
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
|
||||||
|
_history_lines: typing.List[str] = []
|
||||||
|
if os.path.exists(self.command_history_path):
|
||||||
|
with open(self.command_history_path, 'r') as f:
|
||||||
|
_history_lines = f.readlines()
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
for l in _history_lines:
|
||||||
|
self.add_command(l.strip())
|
||||||
|
|
||||||
|
self.is_configured = True
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
import shutil
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from mitmproxy import options
|
from mitmproxy import options
|
||||||
from mitmproxy.addons import command_history
|
from mitmproxy.addons import command_history
|
||||||
@ -9,24 +7,22 @@ from mitmproxy.test import taddons
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def tctx():
|
def tctx(tmpdir):
|
||||||
# This runs before each test
|
# This runs before each test
|
||||||
dir_id = str(uuid.uuid4())
|
dir_name = tmpdir.mkdir('mitmproxy').dirname
|
||||||
confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}")
|
confdir = dir_name
|
||||||
if not os.path.exists(confdir):
|
|
||||||
os.makedirs(confdir)
|
|
||||||
|
|
||||||
opts = options.Options()
|
opts = options.Options()
|
||||||
opts.set(*[f"confdir={confdir}"])
|
opts.set(*[f"confdir={confdir}"])
|
||||||
tctx = taddons.context(options=opts)
|
tctx = taddons.context(options=opts)
|
||||||
ch = command_history.CommandHistory()
|
ch = command_history.CommandHistory()
|
||||||
tctx.master.addons.add(ch)
|
tctx.master.addons.add(ch)
|
||||||
|
ch.configure([])
|
||||||
|
|
||||||
yield tctx
|
yield tctx
|
||||||
|
|
||||||
# This runs after each test
|
# This runs after each test
|
||||||
ch.cleanup()
|
ch.cleanup()
|
||||||
shutil.rmtree(confdir)
|
|
||||||
|
|
||||||
|
|
||||||
class TestCommandHistory:
|
class TestCommandHistory:
|
||||||
@ -38,6 +34,7 @@ class TestCommandHistory:
|
|||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
history = command_history.CommandHistory()
|
history = command_history.CommandHistory()
|
||||||
|
history.configure([])
|
||||||
|
|
||||||
saved_commands = [cmd for cmd in history.saved_commands]
|
saved_commands = [cmd for cmd in history.saved_commands]
|
||||||
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
|
||||||
@ -46,6 +43,7 @@ class TestCommandHistory:
|
|||||||
|
|
||||||
def test_add_command(self, tctx):
|
def test_add_command(self, tctx):
|
||||||
history = command_history.CommandHistory(3)
|
history = command_history.CommandHistory(3)
|
||||||
|
history.configure([])
|
||||||
|
|
||||||
history.add_command('cmd1')
|
history.add_command('cmd1')
|
||||||
history.add_command('cmd2')
|
history.add_command('cmd2')
|
||||||
@ -81,6 +79,7 @@ class TestCommandHistory:
|
|||||||
|
|
||||||
def test_get_next_and_prev(self, tctx):
|
def test_get_next_and_prev(self, tctx):
|
||||||
history = command_history.CommandHistory(5)
|
history = command_history.CommandHistory(5)
|
||||||
|
history.configure([])
|
||||||
|
|
||||||
history.add_command('cmd1')
|
history.add_command('cmd1')
|
||||||
|
|
||||||
@ -156,6 +155,7 @@ class TestCommandHistory:
|
|||||||
|
|
||||||
def test_clear(self, tctx):
|
def test_clear(self, tctx):
|
||||||
history = command_history.CommandHistory(3)
|
history = command_history.CommandHistory(3)
|
||||||
|
history.configure([])
|
||||||
|
|
||||||
history.add_command('cmd1')
|
history.add_command('cmd1')
|
||||||
history.add_command('cmd2')
|
history.add_command('cmd2')
|
||||||
@ -173,6 +173,7 @@ class TestCommandHistory:
|
|||||||
|
|
||||||
def test_filter(self, tctx):
|
def test_filter(self, tctx):
|
||||||
history = command_history.CommandHistory(3)
|
history = command_history.CommandHistory(3)
|
||||||
|
history.configure([])
|
||||||
|
|
||||||
history.add_command('cmd1')
|
history.add_command('cmd1')
|
||||||
history.add_command('cmd2')
|
history.add_command('cmd2')
|
||||||
@ -206,7 +207,6 @@ class TestCommandHistory:
|
|||||||
history.cleanup()
|
history.cleanup()
|
||||||
|
|
||||||
def test_multiple_instances(self, tctx):
|
def test_multiple_instances(self, tctx):
|
||||||
|
|
||||||
instances = [
|
instances = [
|
||||||
command_history.CommandHistory(10),
|
command_history.CommandHistory(10),
|
||||||
command_history.CommandHistory(10),
|
command_history.CommandHistory(10),
|
||||||
@ -214,6 +214,7 @@ class TestCommandHistory:
|
|||||||
]
|
]
|
||||||
|
|
||||||
for i in instances:
|
for i in instances:
|
||||||
|
i.configure([])
|
||||||
saved_commands = [cmd for cmd in i.saved_commands]
|
saved_commands = [cmd for cmd in i.saved_commands]
|
||||||
assert saved_commands == []
|
assert saved_commands == []
|
||||||
|
|
||||||
@ -253,6 +254,7 @@ class TestCommandHistory:
|
|||||||
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
|
||||||
|
|
||||||
instances.append(command_history.CommandHistory(10))
|
instances.append(command_history.CommandHistory(10))
|
||||||
|
instances[3].configure([])
|
||||||
saved_commands = [cmd for cmd in instances[3].saved_commands]
|
saved_commands = [cmd for cmd in instances[3].saved_commands]
|
||||||
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
|
||||||
|
|
||||||
@ -274,3 +276,31 @@ class TestCommandHistory:
|
|||||||
lines = open(_path, 'r').readlines()
|
lines = open(_path, 'r').readlines()
|
||||||
saved_commands = [cmd.strip() for cmd in lines]
|
saved_commands = [cmd.strip() for cmd in lines]
|
||||||
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']
|
||||||
|
|
||||||
|
instances = [
|
||||||
|
command_history.CommandHistory(10),
|
||||||
|
command_history.CommandHistory(10)
|
||||||
|
]
|
||||||
|
|
||||||
|
for i in instances:
|
||||||
|
i.configure([])
|
||||||
|
i.clear_history()
|
||||||
|
saved_commands = [cmd for cmd in i.saved_commands]
|
||||||
|
assert saved_commands == []
|
||||||
|
|
||||||
|
instances[0].add_command('cmd1')
|
||||||
|
instances[0].add_command('cmd2')
|
||||||
|
instances[1].add_command('cmd3')
|
||||||
|
instances[1].add_command('cmd4')
|
||||||
|
instances[1].add_command('cmd5')
|
||||||
|
|
||||||
|
saved_commands = [cmd for cmd in instances[1].saved_commands]
|
||||||
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5']
|
||||||
|
|
||||||
|
instances.pop()
|
||||||
|
instances.pop()
|
||||||
|
|
||||||
|
_path = os.path.join(tctx.options.confdir, 'command_history')
|
||||||
|
lines = open(_path, 'r').readlines()
|
||||||
|
saved_commands = [cmd.strip() for cmd in lines]
|
||||||
|
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5']
|
||||||
|
@ -1,7 +1,4 @@
|
|||||||
import os
|
|
||||||
import pytest
|
import pytest
|
||||||
import shutil
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from mitmproxy import options
|
from mitmproxy import options
|
||||||
from mitmproxy.addons import command_history
|
from mitmproxy.addons import command_history
|
||||||
@ -10,24 +7,22 @@ from mitmproxy.tools.console.commander import commander
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
@pytest.fixture(autouse=True)
|
||||||
def tctx():
|
def tctx(tmpdir):
|
||||||
# This runs before each test
|
# This runs before each test
|
||||||
dir_id = str(uuid.uuid4())
|
dir_name = tmpdir.mkdir('mitmproxy').dirname
|
||||||
confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}")
|
confdir = dir_name
|
||||||
if not os.path.exists(confdir):
|
|
||||||
os.makedirs(confdir)
|
|
||||||
|
|
||||||
opts = options.Options()
|
opts = options.Options()
|
||||||
opts.set(*[f"confdir={confdir}"])
|
opts.set(*[f"confdir={confdir}"])
|
||||||
tctx = taddons.context(options=opts)
|
tctx = taddons.context(options=opts)
|
||||||
ch = command_history.CommandHistory()
|
ch = command_history.CommandHistory()
|
||||||
tctx.master.addons.add(ch)
|
tctx.master.addons.add(ch)
|
||||||
|
ch.configure([])
|
||||||
|
|
||||||
yield tctx
|
yield tctx
|
||||||
|
|
||||||
# This runs after each test
|
# This runs after each test
|
||||||
ch.command_history_file.close()
|
ch.cleanup()
|
||||||
shutil.rmtree(confdir)
|
|
||||||
|
|
||||||
|
|
||||||
class TestListCompleter:
|
class TestListCompleter:
|
||||||
|
Loading…
Reference in New Issue
Block a user