From 3a5550a09cd40d76acfe71aa45c7a8309525ad51 Mon Sep 17 00:00:00 2001 From: EndUser509 <42170559+EndUser509@users.noreply.github.com> Date: Thu, 17 Mar 2022 11:41:05 +0100 Subject: [PATCH] Rotate stream files (#5097) * Example addon for saving streamed data including a small bug fix to make it work. * Revert "Example addon for saving streamed data including a small bug fix to make it work." This reverts commit 02ab78def9a52eaca1a89d0757cd9475ce250eaa. * Add support for rotating stream files every hour or day * Added tests * Modified to change the stream file every time the formating string changes as time moves on. * Update to more compact version * simplify save addon logic * make mypy happy * fix compatibility with Python 3.8 Co-authored-by: Maximilian Hils --- CHANGELOG.md | 1 + mitmproxy/addons/save.py | 165 ++++++++++++++++++----------- test/mitmproxy/addons/test_save.py | 72 ++++++++++--- 3 files changed, 160 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9fd195a7..9fd45dd33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ * Reintroduce `Flow.live`, which signals if a flow belongs to a currently active connection. (#4207, @mhils) * Speculative fix for some rare HTTP/2 connection stalls (#5158, @EndUser509) * Add ability to specify custom ports with LDAP authentication (#5068, @demonoidvk) +* Add support for rotating saved streams every hour or day (@EndUser509) * Console Improvements on Windows (@mhils) * Fix processing of `--set` options (#5067, @marwinxxii) * Lowercase user-added header names and emit a log message to notify the user when using HTTP/2 (#4746, @mhils) diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 3b3766dc6..d3873fd55 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -1,51 +1,60 @@ import os.path +import sys import typing +from datetime import datetime +from functools import lru_cache +from pathlib import Path -from mitmproxy import command -from mitmproxy import exceptions -from mitmproxy import flowfilter -from mitmproxy import io -from mitmproxy import ctx -from mitmproxy import flow -from mitmproxy import http import mitmproxy.types +from mitmproxy import command, tcp +from mitmproxy import ctx +from mitmproxy import exceptions +from mitmproxy import flow +from mitmproxy import flowfilter +from mitmproxy import http +from mitmproxy import io + + +@lru_cache +def _path(path: str) -> str: + """Extract the path from a path spec (which may have an extra "+" at the front)""" + if path.startswith("+"): + path = path[1:] + return os.path.expanduser(path) + + +@lru_cache +def _mode(path: str) -> typing.Literal["ab", "wb"]: + """Extract the writing mode (overwrite or append) from a path spec""" + if path.startswith("+"): + return "ab" + else: + return "wb" class Save: - def __init__(self): - self.stream = None - self.filt = None + def __init__(self) -> None: + self.stream: typing.Optional[io.FilteredFlowWriter] = None + self.filt: typing.Optional[flowfilter.TFilter] = None self.active_flows: typing.Set[flow.Flow] = set() + self.current_path: typing.Optional[str] = None def load(self, loader): loader.add_option( "save_stream_file", typing.Optional[str], None, - "Stream flows to file as they arrive. Prefix path with + to append." + """ + Stream flows to file as they arrive. Prefix path with + to append. + The full path can use python strftime() formating, missing + directories are created as needed. A new file is opened every time + the formatted string changes. + """ ) loader.add_option( "save_stream_filter", typing.Optional[str], None, "Filter which flows are written to file." ) - def open_file(self, path): - if path.startswith("+"): - path = path[1:] - mode = "ab" - else: - mode = "wb" - path = os.path.expanduser(path) - return open(path, mode) - - def start_stream_to_path(self, path, flt): - try: - f = self.open_file(path) - except OSError as v: - raise exceptions.OptionsError(str(v)) - self.stream = io.FilteredFlowWriter(f, flt) - self.active_flows = set() - def configure(self, updated): - # We're already streaming - stop the previous stream and restart if "save_stream_filter" in updated: if ctx.options.save_stream_filter: try: @@ -55,10 +64,58 @@ class Save: else: self.filt = None if "save_stream_file" in updated or "save_stream_filter" in updated: - if self.stream: - self.done() if ctx.options.save_stream_file: - self.start_stream_to_path(ctx.options.save_stream_file, self.filt) + try: + self.maybe_rotate_to_new_file() + except OSError as e: + raise exceptions.OptionsError(str(e)) from e + self.stream.flt = self.filt + else: + self.done() + + def maybe_rotate_to_new_file(self) -> None: + path = datetime.today().strftime(_path(ctx.options.save_stream_file)) + if self.current_path == path: + return + + if self.stream: + self.stream.fo.close() + self.stream = None + + new_log_file = Path(path) + new_log_file.parent.mkdir(parents=True, exist_ok=True) + + f = new_log_file.open(_mode(ctx.options.save_stream_file)) + self.stream = io.FilteredFlowWriter(f, self.filt) + self.current_path = path + + def save_flow(self, flow: flow.Flow) -> None: + """ + Write the flow to the stream, but first check if we need to rotate to a new file. + """ + if not self.stream: + return + try: + self.maybe_rotate_to_new_file() + self.stream.add(flow) + except OSError as e: + # If we somehow fail to write flows to a logfile, we really want to crash visibly + # instead of letting traffic through unrecorded. + # No normal logging here, that would not be triggered anymore. + sys.stderr.write(f"Error while writing to {self.current_path}: {e}") + sys.exit(1) + else: + self.active_flows.discard(flow) + + def done(self) -> None: + if self.stream: + for f in self.active_flows: + self.stream.add(f) + self.active_flows.clear() + + self.current_path = None + self.stream.fo.close() + self.stream = None @command.command("save.file") def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None: @@ -67,50 +124,36 @@ class Save: appended to the file, otherwise it is over-written. """ try: - f = self.open_file(path) - except OSError as v: - raise exceptions.CommandError(v) from v - stream = io.FlowWriter(f) - for i in flows: - stream.add(i) - f.close() - ctx.log.alert("Saved %s flows." % len(flows)) + with open(_path(path), _mode(path)) as f: + stream = io.FlowWriter(f) + for i in flows: + stream.add(i) + except OSError as e: + raise exceptions.CommandError(e) from e + ctx.log.alert(f"Saved {len(flows)} flows.") - def tcp_start(self, flow): + def tcp_start(self, flow: tcp.TCPFlow): if self.stream: self.active_flows.add(flow) - def tcp_end(self, flow): + def tcp_end(self, flow: tcp.TCPFlow): if self.stream: - self.stream.add(flow) - self.active_flows.discard(flow) + self.save_flow(flow) - def tcp_error(self, flow): + def tcp_error(self, flow: tcp.TCPFlow): self.tcp_end(flow) def websocket_end(self, flow: http.HTTPFlow): - if self.stream: - self.stream.add(flow) - self.active_flows.discard(flow) + self.save_flow(flow) def request(self, flow: http.HTTPFlow): - if self.stream: - self.active_flows.add(flow) + self.active_flows.add(flow) def response(self, flow: http.HTTPFlow): # websocket flows will receive a websocket_end, # we don't want to persist them here already - if self.stream and flow.websocket is None: - self.stream.add(flow) - self.active_flows.discard(flow) + if flow.websocket is None: + self.save_flow(flow) def error(self, flow: http.HTTPFlow): self.response(flow) - - def done(self): - if self.stream: - for f in self.active_flows: - self.stream.add(f) - self.active_flows = set() - self.stream.fo.close() - self.stream = None diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py index 5067562cd..0e50b70e3 100644 --- a/test/mitmproxy/addons/test_save.py +++ b/test/mitmproxy/addons/test_save.py @@ -1,22 +1,21 @@ import pytest +from mitmproxy import exceptions +from mitmproxy import io +from mitmproxy.addons import save +from mitmproxy.addons import view from mitmproxy.test import taddons from mitmproxy.test import tflow -from mitmproxy import io -from mitmproxy import exceptions -from mitmproxy.addons import save -from mitmproxy.addons import view - -def test_configure(tmpdir): +def test_configure(tmp_path): sa = save.Save() with taddons.context(sa) as tctx: with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, save_stream_file=str(tmpdir)) + tctx.configure(sa, save_stream_file=str(tmp_path)) with pytest.raises(Exception, match="Invalid filter"): tctx.configure( - sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~" + sa, save_stream_file=str(tmp_path / "foo"), save_stream_filter="~~" ) tctx.configure(sa, save_stream_filter="foo") assert sa.filt @@ -30,10 +29,10 @@ def rd(p): return list(x.stream()) -def test_tcp(tmpdir): +def test_tcp(tmp_path): sa = save.Save() with taddons.context(sa) as tctx: - p = str(tmpdir.join("foo")) + p = str(tmp_path / "foo") tctx.configure(sa, save_stream_file=p) tt = tflow.ttcpflow() @@ -48,10 +47,10 @@ def test_tcp(tmpdir): assert len(rd(p)) == 2 -def test_websocket(tmpdir): +def test_websocket(tmp_path): sa = save.Save() with taddons.context(sa) as tctx: - p = str(tmpdir.join("foo")) + p = str(tmp_path / "foo") tctx.configure(sa, save_stream_file=p) f = tflow.twebsocketflow() @@ -66,10 +65,10 @@ def test_websocket(tmpdir): assert len(rd(p)) == 2 -def test_save_command(tmpdir): +def test_save_command(tmp_path): sa = save.Save() with taddons.context() as tctx: - p = str(tmpdir.join("foo")) + p = str(tmp_path / "foo") sa.save([tflow.tflow(resp=True)], p) assert len(rd(p)) == 1 sa.save([tflow.tflow(resp=True)], p) @@ -78,7 +77,7 @@ def test_save_command(tmpdir): assert len(rd(p)) == 2 with pytest.raises(exceptions.CommandError): - sa.save([tflow.tflow(resp=True)], str(tmpdir)) + sa.save([tflow.tflow(resp=True)], str(tmp_path)) v = view.View() tctx.master.addons.add(v) @@ -86,10 +85,10 @@ def test_save_command(tmpdir): tctx.master.commands.execute("save.file @shown %s" % p) -def test_simple(tmpdir): +def test_simple(tmp_path): sa = save.Save() with taddons.context(sa) as tctx: - p = str(tmpdir.join("foo")) + p = str(tmp_path / "foo") tctx.configure(sa, save_stream_file=p) @@ -111,3 +110,42 @@ def test_simple(tmpdir): sa.request(f) tctx.configure(sa, save_stream_file=None) assert not rd(p)[2].response + + f = tflow.tflow() + sa.response(f) + assert len(rd(p)) == 3 + + +def test_rotate_stream(tmp_path): + sa = save.Save() + with taddons.context(sa) as tctx: + tctx.configure(sa, save_stream_file=str(tmp_path / "a.txt")) + f1 = tflow.tflow(resp=True) + f2 = tflow.tflow(resp=True) + sa.request(f1) + sa.response(f1) + sa.request(f2) # second request already started. + tctx.configure(sa, save_stream_file=str(tmp_path / "b.txt")) + sa.response(f2) + sa.done() + + assert len(rd(tmp_path / "a.txt")) == 1 + assert len(rd(tmp_path / "b.txt")) == 1 + + +def test_disk_full(tmp_path, monkeypatch, capsys): + sa = save.Save() + with taddons.context(sa) as tctx: + tctx.configure(sa, save_stream_file=str(tmp_path / "foo.txt")) + + def _raise(*_): + raise OSError("wat") + + monkeypatch.setattr(sa, "maybe_rotate_to_new_file", _raise) + + f = tflow.tflow(resp=True) + sa.request(f) + with pytest.raises(SystemExit): + sa.response(f) + + assert "Error while writing" in capsys.readouterr().err