mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
Implemented feature to save command history to a file. This allows users
to reuse their commands the next time they open mitmproxy
This commit is contained in:
parent
bbb7eb692f
commit
16b55f9476
@ -2,6 +2,7 @@ import abc
|
||||
import collections
|
||||
import copy
|
||||
import typing
|
||||
import os
|
||||
|
||||
import urwid
|
||||
from urwid.text_layout import calc_coords
|
||||
@ -147,29 +148,55 @@ class CommandBuffer:
|
||||
self.completion = None
|
||||
|
||||
|
||||
# TODO: This class should be a Singleton
|
||||
class CommandHistory:
|
||||
def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:
|
||||
def __init__(self, master: mitmproxy.master.Master, size: int = 300) -> None:
|
||||
self.saved_commands: collections.deque = collections.deque(
|
||||
[CommandBuffer(master, "")],
|
||||
maxlen=size
|
||||
)
|
||||
self.index: int = 0
|
||||
self.size: int = size
|
||||
self.master: mitmproxy.master.Master = master
|
||||
|
||||
_command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history')
|
||||
if os.path.exists(_command_history_path):
|
||||
with open(_command_history_path, 'r') as f:
|
||||
for l in f.readlines():
|
||||
cbuf = CommandBuffer(master, l.strip())
|
||||
self.add_command(cbuf)
|
||||
f.close()
|
||||
|
||||
self.command_history_file = open(_command_history_path, 'w')
|
||||
|
||||
@property
|
||||
def last_index(self):
|
||||
return len(self.saved_commands) - 1
|
||||
|
||||
def clear_history(self):
|
||||
"""
|
||||
Needed for test suite.
|
||||
TODO: Maybe create a command to clear the history?
|
||||
"""
|
||||
self.saved_commands: collections.deque = collections.deque(
|
||||
[CommandBuffer(self.master, "")],
|
||||
maxlen=self.size
|
||||
)
|
||||
|
||||
self.index = 0
|
||||
self.command_history_file.truncate(0)
|
||||
self.command_history_file.seek(0)
|
||||
self.command_history_file.flush()
|
||||
|
||||
def get_next(self) -> typing.Optional[CommandBuffer]:
|
||||
if self.index < self.last_index:
|
||||
self.index = self.index + 1
|
||||
return self.saved_commands[self.index]
|
||||
return None
|
||||
return self.saved_commands[self.index]
|
||||
|
||||
def get_prev(self) -> typing.Optional[CommandBuffer]:
|
||||
if self.index > 0:
|
||||
self.index = self.index - 1
|
||||
return self.saved_commands[self.index]
|
||||
return None
|
||||
return self.saved_commands[self.index]
|
||||
|
||||
def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
|
||||
if self.index == self.last_index or execution:
|
||||
@ -184,6 +211,15 @@ class CommandHistory:
|
||||
if execution:
|
||||
self.index = self.last_index
|
||||
|
||||
# This prevents the constructor from trying to overwrite the file
|
||||
# that it is currently reading
|
||||
if hasattr(self, 'command_history_file'):
|
||||
_history_str = "\n".join([c.text for c in self.saved_commands])
|
||||
self.command_history_file.truncate(0)
|
||||
self.command_history_file.seek(0)
|
||||
self.command_history_file.write(_history_str)
|
||||
self.command_history_file.flush()
|
||||
|
||||
|
||||
class CommandEdit(urwid.WidgetWrap):
|
||||
leader = ": "
|
||||
|
@ -97,6 +97,7 @@ class TestCommandEdit:
|
||||
def test_up_and_down(self):
|
||||
with taddons.context() as tctx:
|
||||
history = commander.CommandHistory(tctx.master, size=3)
|
||||
history.clear_history()
|
||||
edit = commander.CommandEdit(tctx.master, '', history)
|
||||
|
||||
buf = commander.CommandBuffer(tctx.master, 'cmd1')
|
||||
@ -112,6 +113,7 @@ class TestCommandEdit:
|
||||
assert edit.get_edit_text() == 'cmd1'
|
||||
|
||||
history = commander.CommandHistory(tctx.master, size=5)
|
||||
history.clear_history()
|
||||
edit = commander.CommandEdit(tctx.master, '', history)
|
||||
edit.keypress(1, 'a')
|
||||
edit.keypress(1, 'b')
|
||||
@ -139,6 +141,7 @@ class TestCommandHistory:
|
||||
def fill_history(self, commands):
|
||||
with taddons.context() as tctx:
|
||||
history = commander.CommandHistory(tctx.master, size=3)
|
||||
history.clear_history()
|
||||
for c in commands:
|
||||
cbuf = commander.CommandBuffer(tctx.master, c)
|
||||
history.add_command(cbuf)
|
||||
@ -183,7 +186,7 @@ class TestCommandHistory:
|
||||
for i in range(3):
|
||||
assert history.get_next().text == expected_items[i]
|
||||
# We are at the last item of the history
|
||||
assert history.get_next() is None
|
||||
assert history.get_next().text == expected_items[-1]
|
||||
|
||||
def test_get_prev(self):
|
||||
commands = ["command1", "command2"]
|
||||
@ -194,7 +197,7 @@ class TestCommandHistory:
|
||||
for i in range(3):
|
||||
assert history.get_prev().text == expected_items[i]
|
||||
# We are at the first item of the history
|
||||
assert history.get_prev() is None
|
||||
assert history.get_prev().text == expected_items[-1]
|
||||
|
||||
|
||||
class TestCommandBuffer:
|
||||
|
Loading…
Reference in New Issue
Block a user