session: basic flow capture implemented

This commit is contained in:
madt1m 2018-08-01 01:55:20 +02:00
parent 53b85d2360
commit ccb5fd7c99

View File

@ -9,7 +9,8 @@ import os
from mitmproxy import types from mitmproxy import types
from mitmproxy import http from mitmproxy import http
from mitmproxy import ctx from mitmproxy import ctx
from mitmproxy.exceptions import SessionLoadException from mitmproxy.io import protobuf
from mitmproxy.exceptions import SessionLoadException, CommandError
from mitmproxy.utils.data import pkg_data from mitmproxy.utils.data import pkg_data
@ -32,6 +33,13 @@ class SessionDB:
for Sessions and handles creation, for Sessions and handles creation,
retrieving and insertion in tables. retrieving and insertion in tables.
""" """
content_threshold = 1000
type_mappings = {
"body": {
"request" : 1,
"response" : 2
}
}
def __init__(self, db_path=None): def __init__(self, db_path=None):
""" """
@ -58,6 +66,13 @@ class SessionDB:
if self.tempdir: if self.tempdir:
shutil.rmtree(self.tempdir) shutil.rmtree(self.tempdir)
def __contains__(self, fid):
return fid in self._get_ids()
def _get_ids(self):
with self.con as con:
return [t[0] for t in con.execute("SELECT id FROM flow;").fetchall()]
def _load_session(self, path): def _load_session(self, path):
if not self.is_session_db(path): if not self.is_session_db(path):
raise SessionLoadException('Given path does not point to a valid Session') raise SessionLoadException('Given path does not point to a valid Session')
@ -90,6 +105,23 @@ class SessionDB:
c.close() c.close()
return False return False
def store_flows(self, flows):
body_buf = []
flow_buf = []
for flow in flows:
if len(flow.request.content) > self.content_threshold:
body_buf.append((flow.id, self.type_mappings["body"]["request"], flow.request.content))
flow.request.content = b""
if flow.response:
if len(flow.response.content) > self.content_threshold:
body_buf.append((flow.id, self.type_mappings["body"]["response"], flow.response.content))
flow.response.content = b""
flow_buf.append((flow.id, protobuf.dumps(flow)))
with self.con as con:
con.executemany("INSERT OR REPLACE INTO flow VALUES(?, ?)", flow_buf)
con.executemany("INSERT INTO body VALUES(?, ?, ?)", body_buf)
orders = [ orders = [
("t", "time"), ("t", "time"),
@ -101,12 +133,15 @@ orders = [
class Session: class Session:
def __init__(self): def __init__(self):
self.sdb = SessionDB(ctx.options.session_path) self.dbstore = SessionDB(ctx.options.session_path)
self._hot_store = [] self._hot_store = []
self._view = [] self._view = []
self._live_components = {}
self.order = orders[0] self.order = orders[0]
self._flush_period = 3.0 self._flush_period = 3.0
self._tweak_period = 0.5
self._flush_rate = 150 self._flush_rate = 150
self.started = False
def load(self, loader): def load(self, loader):
loader.add_option( loader.add_option(
@ -118,6 +153,23 @@ class Session:
"Flow sort order.", "Flow sort order.",
choices=list(map(lambda c: c[1], orders)) choices=list(map(lambda c: c[1], orders))
) )
loader.add_option(
"view_filter", typing.Optional[str], None,
"Limit the view to matching flows."
)
def running(self):
if not self.started:
self.started = True
loop = asyncio.get_event_loop()
tasks = (self._writer, self._tweaker)
loop.create_task(asyncio.gather(*(t() for t in tasks)))
def configure(self, updated):
if "view_order" in updated:
self.set_order(ctx.options.view_order)
if "view_filter" in updated:
self.set_filter(ctx.options.view_filter)
def _generate_order(self, f: http.HTTPFlow) -> typing.Union[str, int, float]: def _generate_order(self, f: http.HTTPFlow) -> typing.Union[str, int, float]:
o = self.order o = self.order
@ -135,6 +187,12 @@ class Session:
s += len(f.response.raw_content) s += len(f.response.raw_content)
return s return s
def set_order(self, order: str) -> None:
pass
def set_filter(self, filt: str) -> None:
pass
async def _writer(self): async def _writer(self):
while True: while True:
await asyncio.sleep(self._flush_period) await asyncio.sleep(self._flush_period)
@ -144,22 +202,52 @@ class Session:
tof.append(self._hot_store.pop()) tof.append(self._hot_store.pop())
self.store(tof) self.store(tof)
def store(self, flows: typing.Sequence[http.HTTPFlow]): async def _tweaker(self):
pass while True:
await asyncio.sleep(self._tweak_period)
if len(self._hot_store) >= self._flush_rate:
self._flush_period *= 0.9
self._flush_rate *= 0.9
elif len(self._hot_store) < self._flush_rate:
self._flush_period *= 1.1
self._flush_rate *= 1.1
def running(self): def store(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
pass # Some live components of flows cannot be serialized, but they are needed to ensure correct functionality.
# We solve this by keeping a list of tuples which "save" those components for each flow id, eventually
# adding them back when needed.
for f in flows:
self._live_components[f.id] = (
f.client_conn.wfile or None,
f.client_conn.rfile or None,
f.server_conn.wfile or None,
f.server_conn.rfile or None,
f.reply or None
)
self.dbstore.store_flows(flows)
def _base_add(self, f):
if f.id not in self._view:
o = self._generate_order(f)
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
else:
o = self._generate_order(f)
self._view = [flow for flow in self._view if flow.id != f.id]
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
def add(self, flows: typing.Sequence[http.HTTPFlow]) -> None: def add(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
for f in flows: for f in flows:
if f.id not in [f.id for f in self._hot_store] and f.id not in self.sdb: if f.id not in [f.id for f in self._hot_store] and f.id not in self.dbstore:
# Flow has to be filtered here before adding to view. Later # Flow has to be filtered here before adding to view. Later
o = self._generate_order(f)
self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
self._hot_store.append(f) self._hot_store.append(f)
self._base_add(f)
def update(self, flow): def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
pass for f in flows:
if f.id in [f.id for f in self._hot_store]:
self._hot_store = [flow for flow in self._hot_store if flow.id != f.id]
self._hot_store.append(f)
self._base_add(f)
def request(self, f): def request(self, f):
self.add([f]) self.add([f])