mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
cut: use csv module to encode multi-values for saving
Also add q.text, q.raw_content, s.text, s.raw_content selectors
This commit is contained in:
parent
4b568f99d6
commit
7ffb2c7981
@ -1,4 +1,4 @@
|
||||
import os
|
||||
import csv
|
||||
import typing
|
||||
from mitmproxy import command
|
||||
from mitmproxy import exceptions
|
||||
@ -19,10 +19,10 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
|
||||
if not req:
|
||||
return ""
|
||||
rem = cut[len("q."):]
|
||||
if rem in ["method", "scheme", "host", "port", "path", "url"]:
|
||||
if rem in ["method", "scheme", "host", "port", "path", "url", "text"]:
|
||||
return str(getattr(req, rem))
|
||||
elif rem == "content":
|
||||
return req.content
|
||||
elif rem in ["content", "raw_content"]:
|
||||
return getattr(req, rem)
|
||||
elif rem.startswith("header["):
|
||||
return req.headers.get(headername(rem), "")
|
||||
elif cut.startswith("s."):
|
||||
@ -30,10 +30,10 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
|
||||
if not resp:
|
||||
return ""
|
||||
rem = cut[len("s."):]
|
||||
if rem in ["status_code", "reason"]:
|
||||
if rem in ["status_code", "reason", "text"]:
|
||||
return str(getattr(resp, rem))
|
||||
elif rem == "content":
|
||||
return resp.content
|
||||
elif rem in ["content", "raw_content"]:
|
||||
return getattr(resp, rem)
|
||||
elif rem.startswith("header["):
|
||||
return resp.headers.get(headername(rem), "")
|
||||
raise exceptions.CommandError("Invalid cut specification: %s" % cut)
|
||||
@ -65,9 +65,10 @@ class Cut:
|
||||
be @all. The cuts are a comma-separated list of cut snippets.
|
||||
|
||||
HTTP requests: q.method, q.scheme, q.host, q.port, q.path, q.url,
|
||||
q.header[key], q.content
|
||||
q.header[key], q.content, q.text, q.raw_content
|
||||
|
||||
HTTP responses: s.status_code, s.reason, s.header[key], s.content
|
||||
HTTP responses: s.status_code, s.reason, s.header[key], s.content,
|
||||
s.text, s.raw_content
|
||||
|
||||
Client connections: cc.address, cc.sni, cc.cipher_name,
|
||||
cc.alpn_proto, cc.tls_version
|
||||
@ -85,28 +86,33 @@ class Cut:
|
||||
@command.command("cut.save")
|
||||
def save(self, cuts: command.Cuts, path: str) -> None:
|
||||
"""
|
||||
Save cuts to file.
|
||||
Save cuts to file. If there are multiple rows or columns, the format
|
||||
is UTF-8 encoded CSV. If there is exactly one row and one column,
|
||||
the data is written to file as-is, with raw bytes preserved.
|
||||
|
||||
cut.save resp.content|@focus /tmp/foo
|
||||
|
||||
cut.save req.host,resp.header[content-type]|@focus /tmp/foo
|
||||
"""
|
||||
mode = "wb"
|
||||
append = False
|
||||
if path.startswith("+"):
|
||||
mode = "ab"
|
||||
append = True
|
||||
path = path[1:]
|
||||
path = os.path.expanduser(path)
|
||||
with open(path, mode) as fp:
|
||||
if fp.tell() > 0:
|
||||
# We're appending to a file that already exists and has content
|
||||
fp.write(b"\n")
|
||||
for ci, c in enumerate(cuts):
|
||||
if ci > 0:
|
||||
if len(cuts) == 1 and len(cuts[0]) == 1:
|
||||
with open(path, "ab" if append else "wb") as fp:
|
||||
if fp.tell() > 0:
|
||||
# We're appending to a file that already exists and has content
|
||||
fp.write(b"\n")
|
||||
for vi, v in enumerate(c):
|
||||
if vi > 0:
|
||||
fp.write(b", ")
|
||||
if isinstance(v, str):
|
||||
v = strutils.always_bytes(v)
|
||||
v = cuts[0][0]
|
||||
if isinstance(v, bytes):
|
||||
fp.write(v)
|
||||
else:
|
||||
fp.write(v.encode("utf8"))
|
||||
else:
|
||||
with open(path, "a" if append else "w", newline='', encoding="utf8") as fp:
|
||||
writer = csv.writer(fp)
|
||||
for r in cuts:
|
||||
writer.writerow(
|
||||
[strutils.always_str(c) or "" for c in r] # type: ignore
|
||||
)
|
||||
ctx.log.alert("Saved %s cuts." % len(cuts))
|
||||
|
@ -7,6 +7,33 @@ from mitmproxy.test import tflow
|
||||
import pytest
|
||||
|
||||
|
||||
def test_extract():
|
||||
tf = tflow.tflow(resp=True)
|
||||
tests = [
|
||||
["q.method", "GET"],
|
||||
["q.scheme", "http"],
|
||||
["q.host", "address"],
|
||||
["q.port", "22"],
|
||||
["q.path", "/path"],
|
||||
["q.url", "http://address:22/path"],
|
||||
["q.text", "content"],
|
||||
["q.content", b"content"],
|
||||
["q.raw_content", b"content"],
|
||||
["q.header[header]", "qvalue"],
|
||||
|
||||
["s.status_code", "200"],
|
||||
["s.reason", "OK"],
|
||||
["s.text", "message"],
|
||||
["s.content", b"message"],
|
||||
["s.raw_content", b"message"],
|
||||
["s.header[header-response]", "svalue"],
|
||||
]
|
||||
for t in tests:
|
||||
ret = cut.extract(t[0], tf)
|
||||
if ret != t[1]:
|
||||
raise AssertionError("Expected %s, got %s", t[1], ret)
|
||||
|
||||
|
||||
def test_parse_cutspec():
|
||||
tests = [
|
||||
("", None, True),
|
||||
@ -71,9 +98,9 @@ def test_cut_file(tmpdir):
|
||||
|
||||
v.add([tflow.tflow(resp=True)])
|
||||
tctx.command(c.save, "q.method|@all", f)
|
||||
assert qr(f) == b"GET\nGET"
|
||||
tctx.command(c.save, "q.method,q.path|@all", f)
|
||||
assert qr(f) == b"GET, /path\nGET, /path"
|
||||
assert qr(f).splitlines() == [b"GET", b"GET"]
|
||||
tctx.command(c.save, "q.method,q.content|@all", f)
|
||||
assert qr(f).splitlines() == [b"GET,content", b"GET,content"]
|
||||
|
||||
|
||||
def test_cut():
|
||||
|
Loading…
Reference in New Issue
Block a user