mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
simplify command history addon
This commit is contained in:
parent
68b016e180
commit
06ef7350f5
@ -1,6 +1,5 @@
|
|||||||
import atexit
|
|
||||||
import collections
|
|
||||||
import os
|
import os
|
||||||
|
import pathlib
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from mitmproxy import command
|
from mitmproxy import command
|
||||||
@ -8,137 +7,77 @@ from mitmproxy import ctx
|
|||||||
|
|
||||||
|
|
||||||
class CommandHistory:
|
class CommandHistory:
|
||||||
def __init__(self, size: int = 300) -> None:
|
VACUUM_SIZE = 1024
|
||||||
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
|
|
||||||
self.is_configured = False
|
|
||||||
|
|
||||||
self.filtered_commands: typing.Deque[str] = collections.deque()
|
def __init__(self) -> None:
|
||||||
self.current_index: int = -1
|
self.history: typing.List[str] = []
|
||||||
self.filter_str: str = ''
|
self.filtered_history: typing.List[str] = [""]
|
||||||
self.command_history_path: str = ''
|
self.current_index: int = 0
|
||||||
|
|
||||||
atexit.register(self.cleanup)
|
def load(self, loader):
|
||||||
|
loader.add_option(
|
||||||
def cleanup(self):
|
"command_history", bool, True,
|
||||||
self._sync_saved_commands()
|
"""Persist command history between mitmproxy invocations."""
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_filtered_index(self):
|
def history_file(self) -> pathlib.Path:
|
||||||
return len(self.filtered_commands) - 1
|
return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
|
||||||
|
|
||||||
@command.command("command_history.clear")
|
def running(self):
|
||||||
def clear_history(self):
|
# FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
|
||||||
self.saved_commands.clear()
|
# confdir or command_history as updated.
|
||||||
self.filtered_commands.clear()
|
self.configure("command_history")
|
||||||
|
|
||||||
with open(self.command_history_path, 'w') as f:
|
def configure(self, updated):
|
||||||
f.truncate(0)
|
if "command_history" in updated or "confdir" in updated:
|
||||||
f.seek(0)
|
if ctx.options.command_history and self.history_file.is_file():
|
||||||
f.flush()
|
self.history = self.history_file.read_text().splitlines()
|
||||||
f.close()
|
|
||||||
|
|
||||||
self.restart()
|
def done(self):
|
||||||
|
if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE:
|
||||||
|
# vacuum history so that it doesn't grow indefinitely.
|
||||||
|
history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n"
|
||||||
|
self.history_file.write_text(history_str)
|
||||||
|
|
||||||
@command.command("command_history.cancel")
|
@command.command("commands.history.add")
|
||||||
def restart(self) -> None:
|
|
||||||
self.filtered_commands = self.saved_commands.copy()
|
|
||||||
self.current_index = -1
|
|
||||||
|
|
||||||
@command.command("command_history.next")
|
|
||||||
def get_next(self) -> str:
|
|
||||||
|
|
||||||
if self.current_index == -1 or self.current_index == self.last_filtered_index:
|
|
||||||
self.current_index = -1
|
|
||||||
return ''
|
|
||||||
elif self.current_index < self.last_filtered_index:
|
|
||||||
self.current_index += 1
|
|
||||||
|
|
||||||
ret = self.filtered_commands[self.current_index]
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@command.command("command_history.prev")
|
|
||||||
def get_prev(self) -> str:
|
|
||||||
|
|
||||||
if self.current_index == -1:
|
|
||||||
if self.last_filtered_index >= 0:
|
|
||||||
self.current_index = self.last_filtered_index
|
|
||||||
else:
|
|
||||||
return ''
|
|
||||||
|
|
||||||
elif self.current_index > 0:
|
|
||||||
self.current_index -= 1
|
|
||||||
|
|
||||||
ret = self.filtered_commands[self.current_index]
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@command.command("command_history.filter")
|
|
||||||
def set_filter(self, command: str) -> None:
|
|
||||||
self.filter_str = command
|
|
||||||
|
|
||||||
_filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
|
|
||||||
self.filtered_commands = collections.deque(_filtered_commands)
|
|
||||||
|
|
||||||
if command and command not in self.filtered_commands:
|
|
||||||
self.filtered_commands.append(command)
|
|
||||||
|
|
||||||
self.current_index = -1
|
|
||||||
|
|
||||||
@command.command("command_history.add")
|
|
||||||
def add_command(self, command: str) -> None:
|
def add_command(self, command: str) -> None:
|
||||||
if command.strip() == '':
|
if not command.strip():
|
||||||
return
|
return
|
||||||
|
|
||||||
self._sync_saved_commands()
|
self.history.append(command)
|
||||||
|
if ctx.options.command_history:
|
||||||
|
with self.history_file.open("a") as f:
|
||||||
|
f.write(f"{command}\n")
|
||||||
|
|
||||||
if command in self.saved_commands:
|
@command.command("commands.history.get")
|
||||||
self.saved_commands.remove(command)
|
def get_history(self) -> typing.Sequence[str]:
|
||||||
self.saved_commands.append(command)
|
"""Get the entire command history."""
|
||||||
|
return self.history.copy()
|
||||||
|
|
||||||
_history_str = "\n".join(self.saved_commands)
|
@command.command("commands.history.clear")
|
||||||
with open(self.command_history_path, 'w') as f:
|
def clear_history(self):
|
||||||
f.truncate(0)
|
self.history_file.unlink()
|
||||||
f.seek(0)
|
self.history = []
|
||||||
f.write(_history_str)
|
|
||||||
f.flush()
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
self.restart()
|
# Functionality to provide a filtered list that can be iterated through.
|
||||||
|
|
||||||
def _sync_saved_commands(self):
|
@command.command("commands.history.filter")
|
||||||
# First read all commands from the file to merge anything that may
|
def set_filter(self, prefix: str) -> None:
|
||||||
# have come from a different instance of the mitmproxy or sister tools
|
self.filtered_history = [
|
||||||
if not os.path.exists(self.command_history_path):
|
cmd
|
||||||
return
|
for cmd in self.history
|
||||||
|
if cmd.startswith(prefix)
|
||||||
|
]
|
||||||
|
self.filtered_history.append(prefix)
|
||||||
|
self.current_index = len(self.filtered_history) - 1
|
||||||
|
|
||||||
with open(self.command_history_path, 'r') as f:
|
@command.command("commands.history.next")
|
||||||
_history_lines = f.readlines()
|
def get_next(self) -> str:
|
||||||
f.close()
|
self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1)
|
||||||
|
return self.filtered_history[self.current_index]
|
||||||
|
|
||||||
self.saved_commands.clear()
|
@command.command("commands.history.prev")
|
||||||
for l in _history_lines:
|
def get_prev(self) -> str:
|
||||||
l = l.strip()
|
self.current_index = max(0, self.current_index - 1)
|
||||||
if l in self.saved_commands:
|
return self.filtered_history[self.current_index]
|
||||||
self.saved_commands.remove(l)
|
|
||||||
self.saved_commands.append(l.strip())
|
|
||||||
|
|
||||||
def configure(self, updated: typing.Set[str]):
|
|
||||||
if self.is_configured:
|
|
||||||
return
|
|
||||||
|
|
||||||
_command_history_dir = os.path.expanduser(ctx.options.confdir)
|
|
||||||
if not os.path.exists(_command_history_dir):
|
|
||||||
os.makedirs(_command_history_dir)
|
|
||||||
|
|
||||||
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
|
|
||||||
_history_lines: typing.List[str] = []
|
|
||||||
if os.path.exists(self.command_history_path):
|
|
||||||
with open(self.command_history_path, 'r') as f:
|
|
||||||
_history_lines = f.readlines()
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
for l in _history_lines:
|
|
||||||
self.add_command(l.strip())
|
|
||||||
|
|
||||||
self.is_configured = True
|
|
||||||
|
@ -9,8 +9,6 @@ import mitmproxy.flow
|
|||||||
import mitmproxy.master
|
import mitmproxy.master
|
||||||
import mitmproxy.types
|
import mitmproxy.types
|
||||||
|
|
||||||
from mitmproxy import command_lexer
|
|
||||||
|
|
||||||
|
|
||||||
class Completer:
|
class Completer:
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
@ -163,7 +161,7 @@ class CommandEdit(urwid.WidgetWrap):
|
|||||||
self.cbuf.backspace()
|
self.cbuf.backspace()
|
||||||
if self.cbuf.text == '':
|
if self.cbuf.text == '':
|
||||||
self.active_filter = False
|
self.active_filter = False
|
||||||
self.master.commands.execute("command_history.filter ''")
|
self.master.commands.call("commands.history.filter", "")
|
||||||
self.filter_str = ''
|
self.filter_str = ''
|
||||||
elif key == "left":
|
elif key == "left":
|
||||||
self.cbuf.left()
|
self.cbuf.left()
|
||||||
@ -173,21 +171,20 @@ class CommandEdit(urwid.WidgetWrap):
|
|||||||
if self.active_filter is False:
|
if self.active_filter is False:
|
||||||
self.active_filter = True
|
self.active_filter = True
|
||||||
self.filter_str = self.cbuf.text
|
self.filter_str = self.cbuf.text
|
||||||
_cmd = command_lexer.quote(self.cbuf.text)
|
self.master.commands.call("commands.history.filter", self.cbuf.text)
|
||||||
self.master.commands.execute("command_history.filter %s" % _cmd)
|
|
||||||
|
|
||||||
cmd = self.master.commands.execute("command_history.prev")
|
cmd = self.master.commands.execute("commands.history.prev")
|
||||||
self.cbuf = CommandBuffer(self.master, cmd)
|
self.cbuf = CommandBuffer(self.master, cmd)
|
||||||
elif key == "down":
|
elif key == "down":
|
||||||
prev_cmd = self.cbuf.text
|
prev_cmd = self.cbuf.text
|
||||||
cmd = self.master.commands.execute("command_history.next")
|
cmd = self.master.commands.execute("commands.history.next")
|
||||||
|
|
||||||
if cmd == '':
|
if cmd == '':
|
||||||
if prev_cmd == self.filter_str:
|
if prev_cmd == self.filter_str:
|
||||||
self.cbuf = CommandBuffer(self.master, prev_cmd)
|
self.cbuf = CommandBuffer(self.master, prev_cmd)
|
||||||
else:
|
else:
|
||||||
self.active_filter = False
|
self.active_filter = False
|
||||||
self.master.commands.execute("command_history.filter ''")
|
self.master.commands.call("commands.history.filter", "")
|
||||||
self.filter_str = ''
|
self.filter_str = ''
|
||||||
self.cbuf = CommandBuffer(self.master, '')
|
self.cbuf = CommandBuffer(self.master, '')
|
||||||
else:
|
else:
|
||||||
|
@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
def keypress(self, size, k):
|
def keypress(self, size, k):
|
||||||
if self.prompting:
|
if self.prompting:
|
||||||
if k == "esc":
|
if k == "esc":
|
||||||
self.master.commands.execute('command_history.cancel')
|
|
||||||
self.prompt_done()
|
self.prompt_done()
|
||||||
elif self.onekey:
|
elif self.onekey:
|
||||||
if k == "enter":
|
if k == "enter":
|
||||||
@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
elif k in self.onekey:
|
elif k in self.onekey:
|
||||||
self.prompt_execute(k)
|
self.prompt_execute(k)
|
||||||
elif k == "enter":
|
elif k == "enter":
|
||||||
cmd = command_lexer.quote(self._w.cbuf.text)
|
text = self._w.get_edit_text()
|
||||||
self.master.commands.execute(f"command_history.add {cmd}")
|
self.prompt_execute(text)
|
||||||
self.prompt_execute(self._w.get_edit_text())
|
self.master.commands.call("commands.history.add", text)
|
||||||
else:
|
else:
|
||||||
if common.is_keypress(k):
|
if common.is_keypress(k):
|
||||||
self._w.keypress(size, k)
|
self._w.keypress(size, k)
|
||||||
|
Loading…
Reference in New Issue
Block a user