fix cut addon to work with binary content, fix #3965 (#5230)

This commit is contained in:
Maximilian Hils 2022-03-30 14:24:18 +02:00 committed by GitHub
parent 31add1a7c0
commit 0fa7c46368
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 12 deletions

View File

@ -14,6 +14,8 @@
([#5225](https://github.com/mitmproxy/mitmproxy/issues/5225), @mhils) ([#5225](https://github.com/mitmproxy/mitmproxy/issues/5225), @mhils)
* Add example addon for domain fronting. * Add example addon for domain fronting.
([#5217](https://github.com/mitmproxy/mitmproxy/issues/5217), @randomstuff) ([#5217](https://github.com/mitmproxy/mitmproxy/issues/5217), @randomstuff)
* Improve cut addon to better handle binary contents
([#3965](https://github.com/mitmproxy/mitmproxy/issues/3965), @mhils)
## 19 March 2022: mitmproxy 8.0.0 ## 19 March 2022: mitmproxy 8.0.0

View File

@ -8,7 +8,6 @@ from mitmproxy import exceptions
from mitmproxy import flow from mitmproxy import flow
from mitmproxy import ctx from mitmproxy import ctx
from mitmproxy import certs from mitmproxy import certs
from mitmproxy.utils import strutils
import mitmproxy.types import mitmproxy.types
import pyperclip import pyperclip
@ -54,6 +53,14 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
return str(current or "") return str(current or "")
def extract_str(cut: str, f: flow.Flow) -> str:
ret = extract(cut, f)
if isinstance(ret, bytes):
return repr(ret)
else:
return ret
class Cut: class Cut:
@command.command("cut") @command.command("cut")
def cut( def cut(
@ -110,10 +117,8 @@ class Cut:
with open(path, "a" if append else "w", newline='', encoding="utf8") as tfp: with open(path, "a" if append else "w", newline='', encoding="utf8") as tfp:
writer = csv.writer(tfp) writer = csv.writer(tfp)
for f in flows: for f in flows:
vals = [extract(c, f) for c in cuts] vals = [extract_str(c, f) for c in cuts]
writer.writerow( writer.writerow(vals)
[strutils.always_str(x) or "" for x in vals] # type: ignore
)
ctx.log.alert("Saved %s cuts over %d flows as CSV." % (len(cuts), len(flows))) ctx.log.alert("Saved %s cuts over %d flows as CSV." % (len(cuts), len(flows)))
except OSError as e: except OSError as e:
ctx.log.error(str(e)) ctx.log.error(str(e))
@ -132,16 +137,14 @@ class Cut:
v: typing.Union[str, bytes] v: typing.Union[str, bytes]
fp = io.StringIO(newline="") fp = io.StringIO(newline="")
if len(cuts) == 1 and len(flows) == 1: if len(cuts) == 1 and len(flows) == 1:
v = extract(cuts[0], flows[0]) v = extract_str(cuts[0], flows[0])
fp.write(strutils.always_str(v)) # type: ignore fp.write(v)
ctx.log.alert("Clipped single cut.") ctx.log.alert("Clipped single cut.")
else: else:
writer = csv.writer(fp) writer = csv.writer(fp)
for f in flows: for f in flows:
vals = [extract(c, f) for c in cuts] vals = [extract_str(c, f) for c in cuts]
writer.writerow( writer.writerow(vals)
[strutils.always_str(v) for v in vals]
)
ctx.log.alert("Clipped %s cuts as CSV." % len(cuts)) ctx.log.alert("Clipped %s cuts as CSV." % len(cuts))
try: try:
pyperclip.copy(fp.getvalue()) pyperclip.copy(fp.getvalue())

View File

@ -59,6 +59,12 @@ def test_extract(tdata):
assert "CERTIFICATE" in cut.extract("server_conn.certificate_list", tf) assert "CERTIFICATE" in cut.extract("server_conn.certificate_list", tf)
def test_extract_str():
tf = tflow.tflow()
tf.request.raw_content = b"\xFF"
assert cut.extract_str("request.raw_content", tf) == r"b'\xff'"
def test_headername(): def test_headername():
with pytest.raises(exceptions.CommandError): with pytest.raises(exceptions.CommandError):
cut.headername("header[foo.") cut.headername("header[foo.")
@ -115,7 +121,7 @@ def test_cut_save(tmpdir):
tctx.command(c.save, "@all", "request.method", f) tctx.command(c.save, "@all", "request.method", f)
assert qr(f).splitlines() == [b"GET", b"GET"] assert qr(f).splitlines() == [b"GET", b"GET"]
tctx.command(c.save, "@all", "request.method,request.content", f) tctx.command(c.save, "@all", "request.method,request.content", f)
assert qr(f).splitlines() == [b"GET,content", b"GET,content"] assert qr(f).splitlines() == [b"GET,b'content'", b"GET,b'content'"]
@pytest.mark.parametrize("exception, log_message", [ @pytest.mark.parametrize("exception, log_message", [