mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
addons.view: Add a focus tracker
This commit is contained in:
parent
9dcc3a3e20
commit
12a70d03ad
@ -3,9 +3,8 @@ The View:
|
||||
|
||||
- Keeps track of a store of flows
|
||||
- Maintains a filtered, ordered view onto that list of flows
|
||||
- Exposes various operations on flows in the store - notably intercept and
|
||||
resume
|
||||
- Exposes a number of signals so the view can be monitored
|
||||
- Has an associated class that tracks focus within the view
|
||||
"""
|
||||
import collections
|
||||
import typing
|
||||
@ -94,6 +93,16 @@ class View(collections.Sequence):
|
||||
self._view.add(f)
|
||||
self.sig_add.send(self, flow=f)
|
||||
|
||||
def remove(self, f: flow.Flow):
|
||||
"""
|
||||
Removes the flow from the underlying store and the view.
|
||||
"""
|
||||
if f.id in self._store:
|
||||
del self._store[f.id]
|
||||
if f in self._view:
|
||||
self._view.remove(f)
|
||||
self.sig_remove.send(self, flow=f)
|
||||
|
||||
def update(self, f: flow.Flow):
|
||||
"""
|
||||
Updates a flow. If the flow is not in the state, it's ignored.
|
||||
@ -113,6 +122,13 @@ class View(collections.Sequence):
|
||||
# The value was not in the view
|
||||
pass
|
||||
|
||||
# Reflect some methods to the efficient underlying implementation
|
||||
def bisect(self, f: flow.Flow) -> int:
|
||||
return self._view.bisect(f)
|
||||
|
||||
def index(self, f: flow.Flow) -> int:
|
||||
return self._view.index(f)
|
||||
|
||||
# Event handlers
|
||||
def request(self, f):
|
||||
self.add(f)
|
||||
@ -128,3 +144,53 @@ class View(collections.Sequence):
|
||||
|
||||
def response(self, f):
|
||||
self.update(f)
|
||||
|
||||
|
||||
class Focus:
|
||||
"""
|
||||
Tracks a focus element within a View.
|
||||
"""
|
||||
def __init__(self, v: View) -> None:
|
||||
self.view = v
|
||||
self._focusflow = None
|
||||
|
||||
self.focusflow = None
|
||||
if len(self.view):
|
||||
self.focusflow = self.view[0]
|
||||
v.sig_add.connect(self._sig_add)
|
||||
v.sig_remove.connect(self._sig_remove)
|
||||
v.sig_refresh.connect(self._sig_refresh)
|
||||
|
||||
@property
|
||||
def focusflow(self) -> typing.Optional[flow.Flow]:
|
||||
return self._focusflow
|
||||
|
||||
@focusflow.setter
|
||||
def focusflow(self, f: flow.Flow):
|
||||
if f is not None and f not in self.view:
|
||||
raise ValueError("Attempt to set focus to flow not in view")
|
||||
self._focusflow = f
|
||||
|
||||
@property
|
||||
def index(self) -> typing.Optional[int]:
|
||||
if self.focusflow:
|
||||
return self.view.index(self.focusflow)
|
||||
|
||||
def _sig_remove(self, view, flow):
|
||||
if len(view) == 0:
|
||||
self.focusflow = None
|
||||
elif flow is self.focusflow:
|
||||
idx = min(view.bisect(self.focusflow), len(view)-1)
|
||||
self.focusflow = view[idx]
|
||||
|
||||
def _sig_refresh(self, view):
|
||||
if len(view) == 0:
|
||||
self.focusflow = None
|
||||
else:
|
||||
if self.focusflow not in view:
|
||||
self.focusflow = view[0]
|
||||
|
||||
def _sig_add(self, view, flow):
|
||||
# We only have to act if we don't have a focus element
|
||||
if not self.focusflow:
|
||||
self.focusflow = flow
|
||||
|
@ -161,3 +161,58 @@ def test_signals():
|
||||
clearrec()
|
||||
v.update(f)
|
||||
assert not any([rec_add, rec_update, rec_remove, rec_refresh])
|
||||
|
||||
|
||||
def test_focus():
|
||||
# Special case - initialising with a view that already contains data
|
||||
v = view.View()
|
||||
v.add(tft())
|
||||
f = view.Focus(v)
|
||||
assert f.index is 0
|
||||
assert f.focusflow is v[0]
|
||||
|
||||
# Start empty
|
||||
v = view.View()
|
||||
f = view.Focus(v)
|
||||
assert f.index is None
|
||||
assert f.focusflow is None
|
||||
|
||||
v.add(tft(start=1))
|
||||
assert f.index == 0
|
||||
assert f.focusflow is v[0]
|
||||
|
||||
v.add(tft(start=0))
|
||||
assert f.index == 1
|
||||
assert f.focusflow is v[1]
|
||||
|
||||
v.add(tft(start=2))
|
||||
assert f.index == 1
|
||||
assert f.focusflow is v[1]
|
||||
|
||||
v.remove(v[1])
|
||||
assert f.index == 1
|
||||
assert f.focusflow is v[1]
|
||||
|
||||
v.remove(v[1])
|
||||
assert f.index == 0
|
||||
assert f.focusflow is v[0]
|
||||
|
||||
v.remove(v[0])
|
||||
assert f.index is None
|
||||
assert f.focusflow is None
|
||||
|
||||
v.add(tft(method="get", start=0))
|
||||
v.add(tft(method="get", start=1))
|
||||
v.add(tft(method="put", start=2))
|
||||
v.add(tft(method="get", start=3))
|
||||
|
||||
f.focusflow = v[2]
|
||||
assert f.focusflow.request.method == "PUT"
|
||||
|
||||
filt = flowfilter.parse("~m get")
|
||||
v.set_filter(filt)
|
||||
assert f.index == 0
|
||||
|
||||
filt = flowfilter.parse("~m oink")
|
||||
v.set_filter(filt)
|
||||
assert f.index is None
|
||||
|
Loading…
Reference in New Issue
Block a user