mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
session: implemented changes requested after PR review.
This commit is contained in:
parent
e9c2b12dab
commit
a52451900c
@ -50,12 +50,13 @@ class SessionDB:
|
|||||||
or create a new one with optional path.
|
or create a new one with optional path.
|
||||||
:param db_path:
|
:param db_path:
|
||||||
"""
|
"""
|
||||||
self.live_components = {}
|
self.live_components: typing.Dict[str, tuple] = {}
|
||||||
self.tempdir = None
|
self.tempdir: tempfile.TemporaryDirectory = None
|
||||||
self.con = None
|
self.con: sqlite3.Connection = None
|
||||||
# This is used for fast look-ups over bodies already dumped to database.
|
# This is used for fast look-ups over bodies already dumped to database.
|
||||||
# This permits to enforce one-to-one relationship between flow and body table.
|
# This permits to enforce one-to-one relationship between flow and body table.
|
||||||
self.body_ledger = set()
|
self.body_ledger: typing.Set[str] = set()
|
||||||
|
self.id_ledger: typing.Set[str] = set()
|
||||||
if db_path is not None and os.path.isfile(db_path):
|
if db_path is not None and os.path.isfile(db_path):
|
||||||
self._load_session(db_path)
|
self._load_session(db_path)
|
||||||
else:
|
else:
|
||||||
@ -74,14 +75,10 @@ class SessionDB:
|
|||||||
shutil.rmtree(self.tempdir)
|
shutil.rmtree(self.tempdir)
|
||||||
|
|
||||||
def __contains__(self, fid):
|
def __contains__(self, fid):
|
||||||
return any([fid == i for i in self._get_ids()])
|
return fid in self.id_ledger
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
ln = self.con.execute("SELECT COUNT(*) FROM flow;").fetchall()[0]
|
return len(self.id_ledger)
|
||||||
return ln[0] if ln else 0
|
|
||||||
|
|
||||||
def _get_ids(self):
|
|
||||||
return [t[0] for t in self.con.execute("SELECT id FROM flow;").fetchall()]
|
|
||||||
|
|
||||||
def _load_session(self, path):
|
def _load_session(self, path):
|
||||||
if not self.is_session_db(path):
|
if not self.is_session_db(path):
|
||||||
@ -150,6 +147,7 @@ class SessionDB:
|
|||||||
body_buf = []
|
body_buf = []
|
||||||
flow_buf = []
|
flow_buf = []
|
||||||
for flow in flows:
|
for flow in flows:
|
||||||
|
self.id_ledger.add(flow.id)
|
||||||
self._disassemble(flow)
|
self._disassemble(flow)
|
||||||
f = copy.copy(flow)
|
f = copy.copy(flow)
|
||||||
f.request = copy.deepcopy(flow.request)
|
f.request = copy.deepcopy(flow.request)
|
||||||
@ -209,17 +207,21 @@ orders = [
|
|||||||
|
|
||||||
|
|
||||||
class Session:
|
class Session:
|
||||||
|
|
||||||
|
_FP_RATE = 150
|
||||||
|
_FP_DECREMENT = 0.9
|
||||||
|
_FP_DEFAULT = 3.0
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.db_store = None
|
self.db_store: SessionDB = None
|
||||||
self._hot_store = collections.OrderedDict()
|
self._hot_store: collections.OrderedDict = collections.OrderedDict()
|
||||||
self._live_components = {}
|
self._order_store: typing.Dict[str, typing.Dict[str, typing.Union[int, float, str]]] = {}
|
||||||
self._view = []
|
self._view: typing.List[typing.Tuple[typing.Union[int, float, str], str]] = []
|
||||||
self.order = orders[0]
|
self.order: str = orders[0]
|
||||||
self.filter = matchall
|
self.filter = matchall
|
||||||
self._flush_period = 3.0
|
self._flush_period: float = self._FP_DEFAULT
|
||||||
self._tweak_period = 0.5
|
self._flush_rate: int = self._FP_RATE
|
||||||
self._flush_rate = 150
|
self.started: bool = False
|
||||||
self.started = False
|
|
||||||
|
|
||||||
def load(self, loader):
|
def load(self, loader):
|
||||||
loader.add_option(
|
loader.add_option(
|
||||||
@ -242,7 +244,6 @@ class Session:
|
|||||||
self.db_store = SessionDB(ctx.options.session_path)
|
self.db_store = SessionDB(ctx.options.session_path)
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
loop.create_task(self._writer())
|
loop.create_task(self._writer())
|
||||||
loop.create_task(self._tweaker())
|
|
||||||
|
|
||||||
def configure(self, updated):
|
def configure(self, updated):
|
||||||
if "view_order" in updated:
|
if "view_order" in updated:
|
||||||
@ -253,28 +254,23 @@ class Session:
|
|||||||
async def _writer(self):
|
async def _writer(self):
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(self._flush_period)
|
await asyncio.sleep(self._flush_period)
|
||||||
tof = []
|
batches = -(-len(self._hot_store) // self._flush_rate)
|
||||||
to_dump = min(self._flush_rate, len(self._hot_store))
|
self._flush_period = self._flush_period * self._FP_DECREMENT if batches > 1 else self._FP_DEFAULT
|
||||||
for _ in range(to_dump):
|
while batches:
|
||||||
tof.append(self._hot_store.popitem(last=False)[1])
|
tof = []
|
||||||
self.db_store.store_flows(tof)
|
to_dump = min(len(self._hot_store), self._flush_rate)
|
||||||
|
for _ in range(to_dump):
|
||||||
|
tof.append(self._hot_store.popitem(last=False)[1])
|
||||||
|
self.db_store.store_flows(tof)
|
||||||
|
batches -= 1
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
async def _tweaker(self):
|
def load_view(self) -> typing.Sequence[http.HTTPFlow]:
|
||||||
while True:
|
|
||||||
await asyncio.sleep(self._tweak_period)
|
|
||||||
if len(self._hot_store) >= 3 * self._flush_rate:
|
|
||||||
self._flush_period *= 0.9
|
|
||||||
self._flush_rate *= 1.1
|
|
||||||
elif len(self._hot_store) < self._flush_rate:
|
|
||||||
self._flush_period *= 1.1
|
|
||||||
self._flush_rate *= 0.9
|
|
||||||
|
|
||||||
def load_view(self):
|
|
||||||
ids = [fid for _, fid in self._view]
|
ids = [fid for _, fid in self._view]
|
||||||
flows = self.load_storage(ids)
|
flows = self.load_storage(ids)
|
||||||
return sorted(flows, key=lambda f: self._generate_order(f))
|
return sorted(flows, key=lambda f: self._generate_order(self.order, f))
|
||||||
|
|
||||||
def load_storage(self, ids=None):
|
def load_storage(self, ids=None) -> typing.Sequence[http.HTTPFlow]:
|
||||||
flows = []
|
flows = []
|
||||||
ids_from_store = []
|
ids_from_store = []
|
||||||
if ids is not None:
|
if ids is not None:
|
||||||
@ -284,8 +280,6 @@ class Session:
|
|||||||
flows.append(self._hot_store[fid])
|
flows.append(self._hot_store[fid])
|
||||||
elif fid in self.db_store:
|
elif fid in self.db_store:
|
||||||
ids_from_store.append(fid)
|
ids_from_store.append(fid)
|
||||||
else:
|
|
||||||
flows.append(None)
|
|
||||||
flows += self.db_store.retrieve_flows(ids_from_store)
|
flows += self.db_store.retrieve_flows(ids_from_store)
|
||||||
else:
|
else:
|
||||||
for flow in self._hot_store.values():
|
for flow in self._hot_store.values():
|
||||||
@ -300,15 +294,15 @@ class Session:
|
|||||||
self._hot_store.clear()
|
self._hot_store.clear()
|
||||||
self._view = []
|
self._view = []
|
||||||
|
|
||||||
def store_count(self):
|
def store_count(self) -> int:
|
||||||
ln = 0
|
ln = 0
|
||||||
for fid in self._hot_store.keys():
|
for fid in self._hot_store.keys():
|
||||||
if fid not in self.db_store:
|
if fid not in self.db_store:
|
||||||
ln += 1
|
ln += 1
|
||||||
return ln + len(self.db_store)
|
return ln + len(self.db_store)
|
||||||
|
|
||||||
def _generate_order(self, f: http.HTTPFlow) -> typing.Optional[typing.Union[str, int, float]]:
|
@staticmethod
|
||||||
o = self.order
|
def _generate_order(o: str, f: http.HTTPFlow) -> typing.Optional[typing.Union[str, int, float]]:
|
||||||
if o == "time":
|
if o == "time":
|
||||||
return f.request.timestamp_start or 0
|
return f.request.timestamp_start or 0
|
||||||
if o == "method":
|
if o == "method":
|
||||||
@ -324,6 +318,11 @@ class Session:
|
|||||||
return s
|
return s
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _store_order(self, f: http.HTTPFlow):
|
||||||
|
self._order_store[f.id] = {}
|
||||||
|
for order in orders:
|
||||||
|
self._order_store[f.id][order] = self._generate_order(order, f)
|
||||||
|
|
||||||
def set_order(self, order: str) -> None:
|
def set_order(self, order: str) -> None:
|
||||||
if order not in orders:
|
if order not in orders:
|
||||||
raise CommandError(
|
raise CommandError(
|
||||||
@ -332,7 +331,7 @@ class Session:
|
|||||||
if order != self.order:
|
if order != self.order:
|
||||||
self.order = order
|
self.order = order
|
||||||
newview = [
|
newview = [
|
||||||
(self._generate_order(f), f.id) for f in self.load_view()
|
(self._order_store[t[1]][order], t[1]) for t in self._view
|
||||||
]
|
]
|
||||||
self._view = sorted(newview)
|
self._view = sorted(newview)
|
||||||
|
|
||||||
@ -341,7 +340,7 @@ class Session:
|
|||||||
flows = self.load_storage()
|
flows = self.load_storage()
|
||||||
for f in flows:
|
for f in flows:
|
||||||
if self.filter(f):
|
if self.filter(f):
|
||||||
self._base_add(f)
|
self.update_view(f)
|
||||||
|
|
||||||
def set_filter(self, input_filter: typing.Optional[str]) -> None:
|
def set_filter(self, input_filter: typing.Optional[str]) -> None:
|
||||||
filt = matchall if not input_filter else flowfilter.parse(input_filter)
|
filt = matchall if not input_filter else flowfilter.parse(input_filter)
|
||||||
@ -352,22 +351,20 @@ class Session:
|
|||||||
self.filter = filt
|
self.filter = filt
|
||||||
self._refilter()
|
self._refilter()
|
||||||
|
|
||||||
def _base_add(self, f):
|
def update_view(self, f):
|
||||||
if not any([f.id == t[1] for t in self._view]):
|
if any([f.id == t[1] for t in self._view]):
|
||||||
o = self._generate_order(f)
|
|
||||||
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
|
|
||||||
else:
|
|
||||||
o = self._generate_order(f)
|
|
||||||
self._view = [(order, fid) for order, fid in self._view if fid != f.id]
|
self._view = [(order, fid) for order, fid in self._view if fid != f.id]
|
||||||
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
|
o = self._order_store[f.id][self.order]
|
||||||
|
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
|
||||||
|
|
||||||
def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
|
def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
|
||||||
for f in flows:
|
for f in flows:
|
||||||
|
self._store_order(f)
|
||||||
if f.id in self._hot_store:
|
if f.id in self._hot_store:
|
||||||
self._hot_store.pop(f.id)
|
self._hot_store.pop(f.id)
|
||||||
self._hot_store[f.id] = f
|
self._hot_store[f.id] = f
|
||||||
if self.filter(f):
|
if self.filter(f):
|
||||||
self._base_add(f)
|
self.update_view(f)
|
||||||
|
|
||||||
def request(self, f):
|
def request(self, f):
|
||||||
self.update([f])
|
self.update([f])
|
||||||
|
@ -28,8 +28,10 @@ class TestSession:
|
|||||||
tctx.master.addons.add(s)
|
tctx.master.addons.add(s)
|
||||||
tctx.options.session_path = None
|
tctx.options.session_path = None
|
||||||
tctx.options.view_filter = None
|
tctx.options.view_filter = None
|
||||||
|
# To make tests quicker
|
||||||
if fp:
|
if fp:
|
||||||
s._flush_period = fp
|
s._flush_period = fp
|
||||||
|
s._FP_DEFAULT = fp
|
||||||
s.running()
|
s.running()
|
||||||
return s
|
return s
|
||||||
|
|
||||||
@ -85,21 +87,11 @@ class TestSession:
|
|||||||
def test_session_order_generators(self):
|
def test_session_order_generators(self):
|
||||||
s = session.Session()
|
s = session.Session()
|
||||||
tf = tflow.tflow(resp=True)
|
tf = tflow.tflow(resp=True)
|
||||||
|
assert s._generate_order('time', tf) == 946681200
|
||||||
s.order = "time"
|
assert s._generate_order('method', tf) == tf.request.method
|
||||||
assert s._generate_order(tf) == 946681200
|
assert s._generate_order('url', tf) == tf.request.url
|
||||||
|
assert s._generate_order('size', tf) == len(tf.request.raw_content) + len(tf.response.raw_content)
|
||||||
s.order = "method"
|
assert not s._generate_order('invalid', tf)
|
||||||
assert s._generate_order(tf) == tf.request.method
|
|
||||||
|
|
||||||
s.order = "url"
|
|
||||||
assert s._generate_order(tf) == tf.request.url
|
|
||||||
|
|
||||||
s.order = "size"
|
|
||||||
assert s._generate_order(tf) == len(tf.request.raw_content) + len(tf.response.raw_content)
|
|
||||||
|
|
||||||
s.order = "invalid"
|
|
||||||
assert not s._generate_order(tf)
|
|
||||||
|
|
||||||
def test_storage_simple(self):
|
def test_storage_simple(self):
|
||||||
s = session.Session()
|
s = session.Session()
|
||||||
@ -110,8 +102,12 @@ class TestSession:
|
|||||||
assert s.store_count() == 0
|
assert s.store_count() == 0
|
||||||
s.request(f)
|
s.request(f)
|
||||||
assert s._view == [(1, f.id)]
|
assert s._view == [(1, f.id)]
|
||||||
|
assert s._order_store[f.id]['time'] == 1
|
||||||
|
assert s._order_store[f.id]['method'] == f.request.method
|
||||||
|
assert s._order_store[f.id]['url'] == f.request.url
|
||||||
|
assert s._order_store[f.id]['size'] == len(f.request.raw_content)
|
||||||
assert s.load_view() == [f]
|
assert s.load_view() == [f]
|
||||||
assert s.load_storage(['nonexistent']) == [None]
|
assert s.load_storage(['nonexistent']) == []
|
||||||
|
|
||||||
s.error(f)
|
s.error(f)
|
||||||
s.response(f)
|
s.response(f)
|
||||||
@ -121,6 +117,10 @@ class TestSession:
|
|||||||
|
|
||||||
# Verify that flow has been updated, not duplicated
|
# Verify that flow has been updated, not duplicated
|
||||||
assert s._view == [(1, f.id)]
|
assert s._view == [(1, f.id)]
|
||||||
|
assert s._order_store[f.id]['time'] == 1
|
||||||
|
assert s._order_store[f.id]['method'] == f.request.method
|
||||||
|
assert s._order_store[f.id]['url'] == f.request.url
|
||||||
|
assert s._order_store[f.id]['size'] == len(f.request.raw_content)
|
||||||
assert s.store_count() == 1
|
assert s.store_count() == 1
|
||||||
|
|
||||||
f2 = self.tft(start=3)
|
f2 = self.tft(start=3)
|
||||||
@ -174,16 +174,16 @@ class TestSession:
|
|||||||
|
|
||||||
f.server_conn.via = tflow.tserver_conn()
|
f.server_conn.via = tflow.tserver_conn()
|
||||||
s.request(f)
|
s.request(f)
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(0.6)
|
||||||
assert len(s._hot_store) == 0
|
assert len(s._hot_store) == 0
|
||||||
assert all([lflow.__dict__ == flow.__dict__ for lflow, flow in list(zip(s.load_storage(), [f]))])
|
assert all([lflow.__dict__ == flow.__dict__ for lflow, flow in list(zip(s.load_storage(), [f]))])
|
||||||
|
|
||||||
flows = [self.tft() for _ in range(500)]
|
flows = [self.tft() for _ in range(500)]
|
||||||
s.update(flows)
|
s.update(flows)
|
||||||
fp = s._flush_period
|
|
||||||
fr = s._flush_rate
|
|
||||||
await asyncio.sleep(0.6)
|
await asyncio.sleep(0.6)
|
||||||
assert s._flush_period < fp and s._flush_rate > fr
|
assert s._flush_period == s._FP_DEFAULT * s._FP_DECREMENT
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
assert s._flush_period == s._FP_DEFAULT
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_storage_bodies(self):
|
async def test_storage_bodies(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user