mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
commands: Reassess the cuts API
Make the cuts API more transparent. Cut specifications are no longer a centrally resolved core type, and flows are now passed explicitly.
This commit is contained in:
parent
099aa9cebf
commit
50a94db2cc
@ -56,49 +56,37 @@ def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
|
||||
return str(current or "")
|
||||
|
||||
|
||||
def parse_cutspec(s: str) -> typing.Tuple[str, typing.Sequence[str]]:
|
||||
"""
|
||||
Returns (flowspec, [cuts]).
|
||||
|
||||
Raises exceptions.CommandError if input is invalid.
|
||||
"""
|
||||
parts = s.split("|", maxsplit=1)
|
||||
flowspec = "@all"
|
||||
if len(parts) == 2:
|
||||
flowspec = parts[1].strip()
|
||||
cuts = parts[0]
|
||||
cutparts = [i.strip() for i in cuts.split(",") if i.strip()]
|
||||
if len(cutparts) == 0:
|
||||
raise exceptions.CommandError("Invalid cut specification.")
|
||||
return flowspec, cutparts
|
||||
|
||||
|
||||
class Cut:
|
||||
@command.command("cut")
|
||||
def cut(self, cutspec: str) -> command.Cuts:
|
||||
def cut(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut]
|
||||
) -> command.Cuts:
|
||||
"""
|
||||
Resolve a cut specification of the form "cuts|flowspec". The cuts
|
||||
are a comma-separated list of cut snippets. Cut snippets are
|
||||
attribute paths from the base of the flow object, with a few
|
||||
conveniences - "q", "s", "cc" and "sc" are shortcuts for request,
|
||||
response, client_conn and server_conn, "port" and "host" retrieve
|
||||
parts of an address tuple, ".header[key]" retrieves a header value.
|
||||
Return values converted sensibly: SSL certicates are converted to PEM
|
||||
Cut data from a set of flows. Cut specifications are attribute paths
|
||||
from the base of the flow object, with a few conveniences - "q",
|
||||
"s", "cc" and "sc" are shortcuts for request, response, client_conn
|
||||
and server_conn, "port" and "host" retrieve parts of an address
|
||||
tuple, ".header[key]" retrieves a header value. Return values
|
||||
converted to strings or bytes: SSL certicates are converted to PEM
|
||||
format, bools are "true" or "false", "bytes" are preserved, and all
|
||||
other values are converted to strings. The flowspec is optional, and
|
||||
if it is not specified, it is assumed to be @all.
|
||||
other values are converted to strings.
|
||||
"""
|
||||
flowspec, cuts = parse_cutspec(cutspec)
|
||||
flows = ctx.master.commands.call_args("view.resolve", [flowspec])
|
||||
ret = []
|
||||
for f in flows:
|
||||
ret.append([extract(c, f) for c in cuts])
|
||||
return ret
|
||||
|
||||
@command.command("cut.save")
|
||||
def save(self, cuts: command.Cuts, path: command.Path) -> None:
|
||||
def save(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut],
|
||||
path: command.Path
|
||||
) -> None:
|
||||
"""
|
||||
Save cuts to file. If there are multiple rows or columns, the format
|
||||
Save cuts to file. If there are multiple flows or cuts, the format
|
||||
is UTF-8 encoded CSV. If there is exactly one row and one column,
|
||||
the data is written to file as-is, with raw bytes preserved. If the
|
||||
path is prefixed with a "+", values are appended if there is an
|
||||
@ -108,12 +96,12 @@ class Cut:
|
||||
if path.startswith("+"):
|
||||
append = True
|
||||
path = command.Path(path[1:])
|
||||
if len(cuts) == 1 and len(cuts[0]) == 1:
|
||||
if len(cuts) == 1 and len(flows) == 1:
|
||||
with open(path, "ab" if append else "wb") as fp:
|
||||
if fp.tell() > 0:
|
||||
# We're appending to a file that already exists and has content
|
||||
fp.write(b"\n")
|
||||
v = cuts[0][0]
|
||||
for v in [extract(cuts[0], f) for f in flows]:
|
||||
if isinstance(v, bytes):
|
||||
fp.write(v)
|
||||
else:
|
||||
@ -122,16 +110,23 @@ class Cut:
|
||||
else:
|
||||
with open(path, "a" if append else "w", newline='', encoding="utf8") as fp:
|
||||
writer = csv.writer(fp)
|
||||
for r in cuts:
|
||||
for f in flows:
|
||||
vals = [extract(c, f) for c in cuts]
|
||||
writer.writerow(
|
||||
[strutils.always_str(c) or "" for c in r] # type: ignore
|
||||
[strutils.always_str(x) or "" for x in vals] # type: ignore
|
||||
)
|
||||
ctx.log.alert("Saved %s cuts as CSV." % len(cuts))
|
||||
ctx.log.alert("Saved %s cuts over %d flows as CSV." % (len(cuts), len(flows)))
|
||||
|
||||
@command.command("cut.clip")
|
||||
def clip(self, cuts: command.Cuts) -> None:
|
||||
def clip(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut],
|
||||
) -> None:
|
||||
"""
|
||||
Send cuts to the system clipboard.
|
||||
Send cuts to the clipboard. If there are multiple flows or cuts, the
|
||||
format is UTF-8 encoded CSV. If there is exactly one row and one
|
||||
column, the data is written to file as-is, with raw bytes preserved.
|
||||
"""
|
||||
fp = io.StringIO(newline="")
|
||||
if len(cuts) == 1 and len(cuts[0]) == 1:
|
||||
|
@ -29,6 +29,10 @@ Cuts = typing.Sequence[
|
||||
]
|
||||
|
||||
|
||||
class Cut(str):
|
||||
pass
|
||||
|
||||
|
||||
class Path(str):
|
||||
pass
|
||||
|
||||
@ -52,8 +56,10 @@ def typename(t: type, ret: bool) -> str:
|
||||
return "[flow]" if ret else "flowspec"
|
||||
elif t == typing.Sequence[str]:
|
||||
return "[str]"
|
||||
elif t == typing.Sequence[Cut]:
|
||||
return "[cut]"
|
||||
elif t == Cuts:
|
||||
return "[cuts]" if ret else "cutspec"
|
||||
return "[cuts]"
|
||||
elif t == flow.Flow:
|
||||
return "flow"
|
||||
elif issubclass(t, (str, int, bool)):
|
||||
@ -264,7 +270,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
"Command requires one flow, specification matched %s." % len(flows)
|
||||
)
|
||||
return flows[0]
|
||||
elif argtype == typing.Sequence[str]:
|
||||
elif argtype in (typing.Sequence[str], typing.Sequence[Cut]):
|
||||
return [i.strip() for i in spec.split(",")]
|
||||
else:
|
||||
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
|
||||
|
@ -31,7 +31,7 @@ def map(km):
|
||||
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")
|
||||
km.add("a", "flow.resume @focus", ["flowlist", "flowview"], "Resume this intercepted flow")
|
||||
km.add(
|
||||
"b", "console.command cut.save s.content|@focus ''",
|
||||
"b", "console.command cut.save @focus s.content ",
|
||||
["flowlist", "flowview"],
|
||||
"Save response body to file"
|
||||
)
|
||||
|
@ -56,42 +56,6 @@ def test_extract():
|
||||
assert "CERTIFICATE" in cut.extract("sc.cert", tf)
|
||||
|
||||
|
||||
def test_parse_cutspec():
|
||||
tests = [
|
||||
("", None, True),
|
||||
("req.method", ("@all", ["req.method"]), False),
|
||||
(
|
||||
"req.method,req.host",
|
||||
("@all", ["req.method", "req.host"]),
|
||||
False
|
||||
),
|
||||
(
|
||||
"req.method,req.host|~b foo",
|
||||
("~b foo", ["req.method", "req.host"]),
|
||||
False
|
||||
),
|
||||
(
|
||||
"req.method,req.host|~b foo | ~b bar",
|
||||
("~b foo | ~b bar", ["req.method", "req.host"]),
|
||||
False
|
||||
),
|
||||
(
|
||||
"req.method, req.host | ~b foo | ~b bar",
|
||||
("~b foo | ~b bar", ["req.method", "req.host"]),
|
||||
False
|
||||
),
|
||||
]
|
||||
for cutspec, output, err in tests:
|
||||
try:
|
||||
assert cut.parse_cutspec(cutspec) == output
|
||||
except exceptions.CommandError:
|
||||
if not err:
|
||||
raise
|
||||
else:
|
||||
if err:
|
||||
raise AssertionError("Expected error.")
|
||||
|
||||
|
||||
def test_headername():
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
cut.headername("header[foo.")
|
||||
@ -110,69 +74,64 @@ def test_cut_clip():
|
||||
v.add([tflow.tflow(resp=True)])
|
||||
|
||||
with mock.patch('pyperclip.copy') as pc:
|
||||
tctx.command(c.clip, "q.method|@all")
|
||||
tctx.command(c.clip, "@all", "q.method")
|
||||
assert pc.called
|
||||
|
||||
with mock.patch('pyperclip.copy') as pc:
|
||||
tctx.command(c.clip, "q.content|@all")
|
||||
tctx.command(c.clip, "@all", "q.content")
|
||||
assert pc.called
|
||||
|
||||
with mock.patch('pyperclip.copy') as pc:
|
||||
tctx.command(c.clip, "q.method,q.content|@all")
|
||||
tctx.command(c.clip, "@all", "q.method,q.content")
|
||||
assert pc.called
|
||||
|
||||
|
||||
def test_cut_file(tmpdir):
|
||||
def test_cut_save(tmpdir):
|
||||
f = str(tmpdir.join("path"))
|
||||
v = view.View()
|
||||
c = cut.Cut()
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(v, c)
|
||||
|
||||
v.add([tflow.tflow(resp=True)])
|
||||
|
||||
tctx.command(c.save, "q.method|@all", f)
|
||||
tctx.command(c.save, "@all", "q.method", f)
|
||||
assert qr(f) == b"GET"
|
||||
tctx.command(c.save, "q.content|@all", f)
|
||||
tctx.command(c.save, "@all", "q.content", f)
|
||||
assert qr(f) == b"content"
|
||||
tctx.command(c.save, "q.content|@all", "+" + f)
|
||||
tctx.command(c.save, "@all", "q.content", "+" + f)
|
||||
assert qr(f) == b"content\ncontent"
|
||||
|
||||
v.add([tflow.tflow(resp=True)])
|
||||
tctx.command(c.save, "q.method|@all", f)
|
||||
tctx.command(c.save, "@all", "q.method", f)
|
||||
assert qr(f).splitlines() == [b"GET", b"GET"]
|
||||
tctx.command(c.save, "q.method,q.content|@all", f)
|
||||
tctx.command(c.save, "@all", "q.method,q.content", f)
|
||||
assert qr(f).splitlines() == [b"GET,content", b"GET,content"]
|
||||
|
||||
|
||||
def test_cut():
|
||||
v = view.View()
|
||||
c = cut.Cut()
|
||||
with taddons.context() as tctx:
|
||||
v.add([tflow.tflow(resp=True)])
|
||||
tctx.master.addons.add(v, c)
|
||||
assert c.cut("q.method|@all") == [["GET"]]
|
||||
assert c.cut("q.scheme|@all") == [["http"]]
|
||||
assert c.cut("q.host|@all") == [["address"]]
|
||||
assert c.cut("q.port|@all") == [["22"]]
|
||||
assert c.cut("q.path|@all") == [["/path"]]
|
||||
assert c.cut("q.url|@all") == [["http://address:22/path"]]
|
||||
assert c.cut("q.content|@all") == [[b"content"]]
|
||||
assert c.cut("q.header[header]|@all") == [["qvalue"]]
|
||||
assert c.cut("q.header[unknown]|@all") == [[""]]
|
||||
with taddons.context():
|
||||
tflows = [tflow.tflow(resp=True)]
|
||||
assert c.cut(tflows, ["q.method"]) == [["GET"]]
|
||||
assert c.cut(tflows, ["q.scheme"]) == [["http"]]
|
||||
assert c.cut(tflows, ["q.host"]) == [["address"]]
|
||||
assert c.cut(tflows, ["q.port"]) == [["22"]]
|
||||
assert c.cut(tflows, ["q.path"]) == [["/path"]]
|
||||
assert c.cut(tflows, ["q.url"]) == [["http://address:22/path"]]
|
||||
assert c.cut(tflows, ["q.content"]) == [[b"content"]]
|
||||
assert c.cut(tflows, ["q.header[header]"]) == [["qvalue"]]
|
||||
assert c.cut(tflows, ["q.header[unknown]"]) == [[""]]
|
||||
|
||||
assert c.cut("s.status_code|@all") == [["200"]]
|
||||
assert c.cut("s.reason|@all") == [["OK"]]
|
||||
assert c.cut("s.content|@all") == [[b"message"]]
|
||||
assert c.cut("s.header[header-response]|@all") == [["svalue"]]
|
||||
assert c.cut("moo") == [[""]]
|
||||
assert c.cut(tflows, ["s.status_code"]) == [["200"]]
|
||||
assert c.cut(tflows, ["s.reason"]) == [["OK"]]
|
||||
assert c.cut(tflows, ["s.content"]) == [[b"message"]]
|
||||
assert c.cut(tflows, ["s.header[header-response]"]) == [["svalue"]]
|
||||
assert c.cut(tflows, ["moo"]) == [[""]]
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
assert c.cut("__dict__") == [[""]]
|
||||
assert c.cut(tflows, ["__dict__"]) == [[""]]
|
||||
|
||||
v = view.View()
|
||||
c = cut.Cut()
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(v, c)
|
||||
v.add([tflow.ttcpflow()])
|
||||
assert c.cut("q.method|@all") == [[""]]
|
||||
assert c.cut("s.status|@all") == [[""]]
|
||||
with taddons.context():
|
||||
tflows = [tflow.ttcpflow()]
|
||||
assert c.cut(tflows, ["q.method"]) == [[""]]
|
||||
assert c.cut(tflows, ["s.status"]) == [[""]]
|
||||
|
@ -155,8 +155,8 @@ def test_typename():
|
||||
assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
|
||||
assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
|
||||
|
||||
assert command.typename(command.Cuts, False) == "cutspec"
|
||||
assert command.typename(command.Cuts, True) == "[cuts]"
|
||||
assert command.typename(typing.Sequence[command.Cut], False) == "[cut]"
|
||||
|
||||
assert command.typename(flow.Flow, False) == "flow"
|
||||
assert command.typename(typing.Sequence[str], False) == "[str]"
|
||||
|
Loading…
Reference in New Issue
Block a user