Rotate stream files (#5097)

* Example addon for saving streamed data including a small bug fix to make it work.

* Revert "Example addon for saving streamed data including a small bug fix to make it work."

This reverts commit 02ab78def9a52eaca1a89d0757cd9475ce250eaa.

* Add support for rotating stream files every hour or day

* Added tests

* Modified to change the stream file every time the formating string changes as time moves on.

* Update to more compact version

* simplify save addon logic

* make mypy happy

* fix compatibility with Python 3.8

Co-authored-by: Maximilian Hils <git@maximilianhils.com>
This commit is contained in:
EndUser509 2022-03-17 11:41:05 +01:00 committed by GitHub
parent ecd4790cbb
commit 3a5550a09c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 160 additions and 78 deletions

View File

@ -32,6 +32,7 @@
* Reintroduce `Flow.live`, which signals if a flow belongs to a currently active connection. (#4207, @mhils)
* Speculative fix for some rare HTTP/2 connection stalls (#5158, @EndUser509)
* Add ability to specify custom ports with LDAP authentication (#5068, @demonoidvk)
* Add support for rotating saved streams every hour or day (@EndUser509)
* Console Improvements on Windows (@mhils)
* Fix processing of `--set` options (#5067, @marwinxxii)
* Lowercase user-added header names and emit a log message to notify the user when using HTTP/2 (#4746, @mhils)

View File

@ -1,51 +1,60 @@
import os.path
import sys
import typing
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import http
import mitmproxy.types
from mitmproxy import command, tcp
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy import io
@lru_cache
def _path(path: str) -> str:
"""Extract the path from a path spec (which may have an extra "+" at the front)"""
if path.startswith("+"):
path = path[1:]
return os.path.expanduser(path)
@lru_cache
def _mode(path: str) -> typing.Literal["ab", "wb"]:
"""Extract the writing mode (overwrite or append) from a path spec"""
if path.startswith("+"):
return "ab"
else:
return "wb"
class Save:
def __init__(self):
self.stream = None
self.filt = None
def __init__(self) -> None:
self.stream: typing.Optional[io.FilteredFlowWriter] = None
self.filt: typing.Optional[flowfilter.TFilter] = None
self.active_flows: typing.Set[flow.Flow] = set()
self.current_path: typing.Optional[str] = None
def load(self, loader):
loader.add_option(
"save_stream_file", typing.Optional[str], None,
"Stream flows to file as they arrive. Prefix path with + to append."
"""
Stream flows to file as they arrive. Prefix path with + to append.
The full path can use python strftime() formating, missing
directories are created as needed. A new file is opened every time
the formatted string changes.
"""
)
loader.add_option(
"save_stream_filter", typing.Optional[str], None,
"Filter which flows are written to file."
)
def open_file(self, path):
if path.startswith("+"):
path = path[1:]
mode = "ab"
else:
mode = "wb"
path = os.path.expanduser(path)
return open(path, mode)
def start_stream_to_path(self, path, flt):
try:
f = self.open_file(path)
except OSError as v:
raise exceptions.OptionsError(str(v))
self.stream = io.FilteredFlowWriter(f, flt)
self.active_flows = set()
def configure(self, updated):
# We're already streaming - stop the previous stream and restart
if "save_stream_filter" in updated:
if ctx.options.save_stream_filter:
try:
@ -55,10 +64,58 @@ class Save:
else:
self.filt = None
if "save_stream_file" in updated or "save_stream_filter" in updated:
if self.stream:
self.done()
if ctx.options.save_stream_file:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
try:
self.maybe_rotate_to_new_file()
except OSError as e:
raise exceptions.OptionsError(str(e)) from e
self.stream.flt = self.filt
else:
self.done()
def maybe_rotate_to_new_file(self) -> None:
path = datetime.today().strftime(_path(ctx.options.save_stream_file))
if self.current_path == path:
return
if self.stream:
self.stream.fo.close()
self.stream = None
new_log_file = Path(path)
new_log_file.parent.mkdir(parents=True, exist_ok=True)
f = new_log_file.open(_mode(ctx.options.save_stream_file))
self.stream = io.FilteredFlowWriter(f, self.filt)
self.current_path = path
def save_flow(self, flow: flow.Flow) -> None:
"""
Write the flow to the stream, but first check if we need to rotate to a new file.
"""
if not self.stream:
return
try:
self.maybe_rotate_to_new_file()
self.stream.add(flow)
except OSError as e:
# If we somehow fail to write flows to a logfile, we really want to crash visibly
# instead of letting traffic through unrecorded.
# No normal logging here, that would not be triggered anymore.
sys.stderr.write(f"Error while writing to {self.current_path}: {e}")
sys.exit(1)
else:
self.active_flows.discard(flow)
def done(self) -> None:
if self.stream:
for f in self.active_flows:
self.stream.add(f)
self.active_flows.clear()
self.current_path = None
self.stream.fo.close()
self.stream = None
@command.command("save.file")
def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None:
@ -67,50 +124,36 @@ class Save:
appended to the file, otherwise it is over-written.
"""
try:
f = self.open_file(path)
except OSError as v:
raise exceptions.CommandError(v) from v
with open(_path(path), _mode(path)) as f:
stream = io.FlowWriter(f)
for i in flows:
stream.add(i)
f.close()
ctx.log.alert("Saved %s flows." % len(flows))
except OSError as e:
raise exceptions.CommandError(e) from e
ctx.log.alert(f"Saved {len(flows)} flows.")
def tcp_start(self, flow):
def tcp_start(self, flow: tcp.TCPFlow):
if self.stream:
self.active_flows.add(flow)
def tcp_end(self, flow):
def tcp_end(self, flow: tcp.TCPFlow):
if self.stream:
self.stream.add(flow)
self.active_flows.discard(flow)
self.save_flow(flow)
def tcp_error(self, flow):
def tcp_error(self, flow: tcp.TCPFlow):
self.tcp_end(flow)
def websocket_end(self, flow: http.HTTPFlow):
if self.stream:
self.stream.add(flow)
self.active_flows.discard(flow)
self.save_flow(flow)
def request(self, flow: http.HTTPFlow):
if self.stream:
self.active_flows.add(flow)
def response(self, flow: http.HTTPFlow):
# websocket flows will receive a websocket_end,
# we don't want to persist them here already
if self.stream and flow.websocket is None:
self.stream.add(flow)
self.active_flows.discard(flow)
if flow.websocket is None:
self.save_flow(flow)
def error(self, flow: http.HTTPFlow):
self.response(flow)
def done(self):
if self.stream:
for f in self.active_flows:
self.stream.add(f)
self.active_flows = set()
self.stream.fo.close()
self.stream = None

View File

@ -1,22 +1,21 @@
import pytest
from mitmproxy import exceptions
from mitmproxy import io
from mitmproxy.addons import save
from mitmproxy.addons import view
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy.addons import save
from mitmproxy.addons import view
def test_configure(tmpdir):
def test_configure(tmp_path):
sa = save.Save()
with taddons.context(sa) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, save_stream_file=str(tmpdir))
tctx.configure(sa, save_stream_file=str(tmp_path))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(
sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
sa, save_stream_file=str(tmp_path / "foo"), save_stream_filter="~~"
)
tctx.configure(sa, save_stream_filter="foo")
assert sa.filt
@ -30,10 +29,10 @@ def rd(p):
return list(x.stream())
def test_tcp(tmpdir):
def test_tcp(tmp_path):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
p = str(tmp_path / "foo")
tctx.configure(sa, save_stream_file=p)
tt = tflow.ttcpflow()
@ -48,10 +47,10 @@ def test_tcp(tmpdir):
assert len(rd(p)) == 2
def test_websocket(tmpdir):
def test_websocket(tmp_path):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
p = str(tmp_path / "foo")
tctx.configure(sa, save_stream_file=p)
f = tflow.twebsocketflow()
@ -66,10 +65,10 @@ def test_websocket(tmpdir):
assert len(rd(p)) == 2
def test_save_command(tmpdir):
def test_save_command(tmp_path):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
p = str(tmp_path / "foo")
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], p)
@ -78,7 +77,7 @@ def test_save_command(tmpdir):
assert len(rd(p)) == 2
with pytest.raises(exceptions.CommandError):
sa.save([tflow.tflow(resp=True)], str(tmpdir))
sa.save([tflow.tflow(resp=True)], str(tmp_path))
v = view.View()
tctx.master.addons.add(v)
@ -86,10 +85,10 @@ def test_save_command(tmpdir):
tctx.master.commands.execute("save.file @shown %s" % p)
def test_simple(tmpdir):
def test_simple(tmp_path):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
p = str(tmp_path / "foo")
tctx.configure(sa, save_stream_file=p)
@ -111,3 +110,42 @@ def test_simple(tmpdir):
sa.request(f)
tctx.configure(sa, save_stream_file=None)
assert not rd(p)[2].response
f = tflow.tflow()
sa.response(f)
assert len(rd(p)) == 3
def test_rotate_stream(tmp_path):
sa = save.Save()
with taddons.context(sa) as tctx:
tctx.configure(sa, save_stream_file=str(tmp_path / "a.txt"))
f1 = tflow.tflow(resp=True)
f2 = tflow.tflow(resp=True)
sa.request(f1)
sa.response(f1)
sa.request(f2) # second request already started.
tctx.configure(sa, save_stream_file=str(tmp_path / "b.txt"))
sa.response(f2)
sa.done()
assert len(rd(tmp_path / "a.txt")) == 1
assert len(rd(tmp_path / "b.txt")) == 1
def test_disk_full(tmp_path, monkeypatch, capsys):
sa = save.Save()
with taddons.context(sa) as tctx:
tctx.configure(sa, save_stream_file=str(tmp_path / "foo.txt"))
def _raise(*_):
raise OSError("wat")
monkeypatch.setattr(sa, "maybe_rotate_to_new_file", _raise)
f = tflow.tflow(resp=True)
sa.request(f)
with pytest.raises(SystemExit):
sa.response(f)
assert "Error while writing" in capsys.readouterr().err