mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 08:11:00 +00:00
Command history implementation
This commit is contained in:
parent
7f464b8929
commit
ffbd7c20e5
@ -1,5 +1,7 @@
|
|||||||
import abc
|
import abc
|
||||||
|
import copy
|
||||||
import typing
|
import typing
|
||||||
|
import collections
|
||||||
|
|
||||||
import urwid
|
import urwid
|
||||||
from urwid.text_layout import calc_coords
|
from urwid.text_layout import calc_coords
|
||||||
@ -156,13 +158,52 @@ class CommandBuffer:
|
|||||||
self.completion = None
|
self.completion = None
|
||||||
|
|
||||||
|
|
||||||
|
class CommandHistory:
|
||||||
|
def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None:
|
||||||
|
self.history: collections.deque = collections.deque(
|
||||||
|
[CommandBuffer(master, "")],
|
||||||
|
maxlen=size
|
||||||
|
)
|
||||||
|
self.index: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_index(self):
|
||||||
|
return len(self.history) - 1
|
||||||
|
|
||||||
|
def get_next(self) -> typing.Optional[CommandBuffer]:
|
||||||
|
if self.index < self.last_index:
|
||||||
|
self.index = self.index + 1
|
||||||
|
return self.history[self.index]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_prev(self) -> typing.Optional[CommandBuffer]:
|
||||||
|
if self.index > 0:
|
||||||
|
self.index = self.index - 1
|
||||||
|
return self.history[self.index]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def add_command(self, command: CommandBuffer, execution: bool=False) -> None:
|
||||||
|
if self.index == self.last_index or execution:
|
||||||
|
last_item_empty = not self.history[-1].text
|
||||||
|
if self.history[-1].text == command.text or (last_item_empty and execution):
|
||||||
|
self.history[-1] = copy.copy(command)
|
||||||
|
else:
|
||||||
|
self.history.append(command)
|
||||||
|
if not execution and self.index < self.last_index:
|
||||||
|
self.index += 1
|
||||||
|
if execution:
|
||||||
|
self.index = self.last_index
|
||||||
|
|
||||||
|
|
||||||
class CommandEdit(urwid.WidgetWrap):
|
class CommandEdit(urwid.WidgetWrap):
|
||||||
leader = ": "
|
leader = ": "
|
||||||
|
|
||||||
def __init__(self, master: mitmproxy.master.Master, text: str) -> None:
|
def __init__(self, master: mitmproxy.master.Master,
|
||||||
|
text: str, history: CommandHistory) -> None:
|
||||||
super().__init__(urwid.Text(self.leader))
|
super().__init__(urwid.Text(self.leader))
|
||||||
self.master = master
|
self.master = master
|
||||||
self.cbuf = CommandBuffer(master, text)
|
self.cbuf = CommandBuffer(master, text)
|
||||||
|
self.history = history
|
||||||
self.update()
|
self.update()
|
||||||
|
|
||||||
def keypress(self, size, key):
|
def keypress(self, size, key):
|
||||||
@ -172,6 +213,11 @@ class CommandEdit(urwid.WidgetWrap):
|
|||||||
self.cbuf.left()
|
self.cbuf.left()
|
||||||
elif key == "right":
|
elif key == "right":
|
||||||
self.cbuf.right()
|
self.cbuf.right()
|
||||||
|
elif key == "up":
|
||||||
|
self.history.add_command(self.cbuf)
|
||||||
|
self.cbuf = self.history.get_prev() or self.cbuf
|
||||||
|
elif key == "down":
|
||||||
|
self.cbuf = self.history.get_next() or self.cbuf
|
||||||
elif key == "tab":
|
elif key == "tab":
|
||||||
self.cbuf.cycle_completion()
|
self.cbuf.cycle_completion()
|
||||||
elif len(key) == 1:
|
elif len(key) == 1:
|
||||||
|
@ -42,6 +42,8 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
|
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
|
||||||
signals.status_prompt_command.connect(self.sig_prompt_command)
|
signals.status_prompt_command.connect(self.sig_prompt_command)
|
||||||
|
|
||||||
|
self.command_history = commander.CommandHistory(master)
|
||||||
|
|
||||||
self.prompting = None
|
self.prompting = None
|
||||||
|
|
||||||
self.onekey = False
|
self.onekey = False
|
||||||
@ -98,7 +100,7 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
|
|
||||||
def sig_prompt_command(self, sender, partial=""):
|
def sig_prompt_command(self, sender, partial=""):
|
||||||
signals.focus.send(self, section="footer")
|
signals.focus.send(self, section="footer")
|
||||||
self._w = commander.CommandEdit(self.master, partial)
|
self._w = commander.CommandEdit(self.master, partial, self.command_history)
|
||||||
self.prompting = commandexecutor.CommandExecutor(self.master)
|
self.prompting = commandexecutor.CommandExecutor(self.master)
|
||||||
|
|
||||||
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
||||||
@ -125,6 +127,7 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
def keypress(self, size, k):
|
def keypress(self, size, k):
|
||||||
if self.prompting:
|
if self.prompting:
|
||||||
if k == "esc":
|
if k == "esc":
|
||||||
|
self.command_history.index = self.command_history.last_index
|
||||||
self.prompt_done()
|
self.prompt_done()
|
||||||
elif self.onekey:
|
elif self.onekey:
|
||||||
if k == "enter":
|
if k == "enter":
|
||||||
@ -132,6 +135,7 @@ class ActionBar(urwid.WidgetWrap):
|
|||||||
elif k in self.onekey:
|
elif k in self.onekey:
|
||||||
self.prompt_execute(k)
|
self.prompt_execute(k)
|
||||||
elif k == "enter":
|
elif k == "enter":
|
||||||
|
self.command_history.add_command(self._w.cbuf, True)
|
||||||
self.prompt_execute(self._w.get_edit_text())
|
self.prompt_execute(self._w.get_edit_text())
|
||||||
else:
|
else:
|
||||||
if common.is_keypress(k):
|
if common.is_keypress(k):
|
||||||
|
@ -28,6 +28,57 @@ class TestListCompleter:
|
|||||||
assert c.cycle() == expected
|
assert c.cycle() == expected
|
||||||
|
|
||||||
|
|
||||||
|
class TestCommandHistory:
|
||||||
|
def fill_history(self, commands):
|
||||||
|
with taddons.context() as tctx:
|
||||||
|
history = commander.CommandHistory(tctx.master, size=3)
|
||||||
|
for c in commands:
|
||||||
|
cbuf = commander.CommandBuffer(tctx.master, c)
|
||||||
|
history.add_command(cbuf)
|
||||||
|
return history, tctx.master
|
||||||
|
|
||||||
|
def test_add_command(self):
|
||||||
|
commands = ["command1", "command2"]
|
||||||
|
history, tctx_master = self.fill_history(commands)
|
||||||
|
|
||||||
|
history_commands = [buf.text for buf in history.history]
|
||||||
|
assert history_commands == [""] + commands
|
||||||
|
|
||||||
|
# The history size is only 3. So, we forget the first one command,
|
||||||
|
# when adding fourth command
|
||||||
|
cbuf = commander.CommandBuffer(tctx_master, "command3")
|
||||||
|
history.add_command(cbuf)
|
||||||
|
history_commands = [buf.text for buf in history.history]
|
||||||
|
assert history_commands == commands + ["command3"]
|
||||||
|
|
||||||
|
# Commands with the same text are not repeated in the history one by one
|
||||||
|
history.add_command(cbuf)
|
||||||
|
history_commands = [buf.text for buf in history.history]
|
||||||
|
assert history_commands == commands + ["command3"]
|
||||||
|
|
||||||
|
def test_get_next(self):
|
||||||
|
commands = ["command1", "command2"]
|
||||||
|
history, tctx_master = self.fill_history(commands)
|
||||||
|
|
||||||
|
history.index = -1
|
||||||
|
expected_items = ["", "command1", "command2"]
|
||||||
|
for i in range(3):
|
||||||
|
assert history.get_next().text == expected_items[i]
|
||||||
|
# We are at the last item of the history
|
||||||
|
assert history.get_next() is None
|
||||||
|
|
||||||
|
def test_get_prev(self):
|
||||||
|
commands = ["command1", "command2"]
|
||||||
|
history, tctx_master = self.fill_history(commands)
|
||||||
|
|
||||||
|
expected_items = ["command2", "command1", ""]
|
||||||
|
history.index = history.last_index + 1
|
||||||
|
for i in range(3):
|
||||||
|
assert history.get_prev().text == expected_items[i]
|
||||||
|
# We are at the first item of the history
|
||||||
|
assert history.get_prev() is None
|
||||||
|
|
||||||
|
|
||||||
class TestCommandBuffer:
|
class TestCommandBuffer:
|
||||||
|
|
||||||
def test_backspace(self):
|
def test_backspace(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user