mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 02:10:59 +00:00
Addons and addon testing
- Fix some loading sequence bugs affecting command-line script invocation - Allow addons to over-ride existing options (with a warning). We need this for reloading. - Convert har_dump to new-style arguments, fix and re-instate its test suite. - Covnert miscelaneous other exmples to new-style args.
This commit is contained in:
parent
e32efcae49
commit
5327756377
@ -4,7 +4,6 @@ This inline script can be used to dump flows as HAR files.
|
||||
|
||||
|
||||
import json
|
||||
import sys
|
||||
import base64
|
||||
import zlib
|
||||
import os
|
||||
@ -15,6 +14,7 @@ from datetime import timezone
|
||||
import mitmproxy
|
||||
|
||||
from mitmproxy import version
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.net.http import cookies
|
||||
|
||||
@ -26,16 +26,12 @@ SERVERS_SEEN = set()
|
||||
|
||||
|
||||
def load(l):
|
||||
"""
|
||||
Called once on script startup before any other events.
|
||||
"""
|
||||
if len(sys.argv) != 2:
|
||||
raise ValueError(
|
||||
'Usage: -s "har_dump.py filename" '
|
||||
'(- will output to stdout, filenames ending with .zhar '
|
||||
'will result in compressed har)'
|
||||
)
|
||||
l.add_option(
|
||||
"hardump", str, "", "HAR dump path.",
|
||||
)
|
||||
|
||||
|
||||
def configure(updated):
|
||||
HAR.update({
|
||||
"log": {
|
||||
"version": "1.2",
|
||||
@ -156,21 +152,20 @@ def done():
|
||||
"""
|
||||
Called once on script shutdown, after any other events.
|
||||
"""
|
||||
dump_file = sys.argv[1]
|
||||
if ctx.options.hardump:
|
||||
json_dump = json.dumps(HAR, indent=2) # type: str
|
||||
|
||||
json_dump = json.dumps(HAR, indent=2) # type: str
|
||||
if ctx.options.hardump == '-':
|
||||
mitmproxy.ctx.log(json_dump)
|
||||
else:
|
||||
raw = json_dump.encode() # type: bytes
|
||||
if ctx.options.hardump.endswith('.zhar'):
|
||||
raw = zlib.compress(raw, 9)
|
||||
|
||||
if dump_file == '-':
|
||||
mitmproxy.ctx.log(json_dump)
|
||||
else:
|
||||
raw = json_dump.encode() # type: bytes
|
||||
if dump_file.endswith('.zhar'):
|
||||
raw = zlib.compress(raw, 9)
|
||||
with open(os.path.expanduser(ctx.options.hardump), "wb") as f:
|
||||
f.write(raw)
|
||||
|
||||
with open(os.path.expanduser(dump_file), "wb") as f:
|
||||
f.write(raw)
|
||||
|
||||
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
|
||||
mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))
|
||||
|
||||
|
||||
def format_cookies(cookie_list):
|
||||
|
@ -23,10 +23,10 @@ Authors: Maximilian Hils, Matthew Tuusberg
|
||||
import collections
|
||||
import random
|
||||
|
||||
import sys
|
||||
from enum import Enum
|
||||
|
||||
import mitmproxy
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.exceptions import TlsProtocolException
|
||||
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
|
||||
|
||||
@ -113,9 +113,15 @@ tls_strategy = None
|
||||
|
||||
|
||||
def load(l):
|
||||
l.add_option(
|
||||
"tlsstrat", int, 0, "TLS passthrough strategy (0-100)",
|
||||
)
|
||||
|
||||
|
||||
def configure(updated):
|
||||
global tls_strategy
|
||||
if len(sys.argv) == 2:
|
||||
tls_strategy = ProbabilisticStrategy(float(sys.argv[1]))
|
||||
if ctx.options.tlsstrat > 0:
|
||||
tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
|
||||
else:
|
||||
tls_strategy = ConservativeStrategy()
|
||||
|
||||
|
@ -1,15 +1,21 @@
|
||||
"""
|
||||
This scripts demonstrates how to use mitmproxy's filter pattern in scripts.
|
||||
Usage:
|
||||
mitmdump -s "flowfilter.py FILTER"
|
||||
"""
|
||||
import sys
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class Filter:
|
||||
def __init__(self, spec):
|
||||
self.filter = flowfilter.parse(spec)
|
||||
def __init__(self):
|
||||
self.filter = None
|
||||
|
||||
def configure(self, updated):
|
||||
self.filter = flowfilter.parse(ctx.options.flowfilter)
|
||||
|
||||
def load(self, l):
|
||||
l.add_option(
|
||||
"flowfilter", str, "", "Check that flow matches filter."
|
||||
)
|
||||
|
||||
def response(self, flow):
|
||||
if flowfilter.match(self.filter, flow):
|
||||
@ -17,4 +23,4 @@ class Filter:
|
||||
print(flow)
|
||||
|
||||
|
||||
addons = [Filter(sys.argv[1])]
|
||||
addons = [Filter()]
|
||||
|
@ -83,6 +83,8 @@ class Loader:
|
||||
help: str,
|
||||
choices: typing.Optional[typing.Sequence[str]] = None
|
||||
) -> None:
|
||||
if name in self.master.options:
|
||||
ctx.log.warn("Over-riding existing option %s" % name)
|
||||
self.master.options.add_option(
|
||||
name,
|
||||
typespec,
|
||||
|
@ -47,7 +47,7 @@ class Script:
|
||||
if time.time() - self.last_load > self.ReloadInterval:
|
||||
mtime = os.stat(self.path).st_mtime
|
||||
if mtime > self.last_mtime:
|
||||
ctx.log.info("Loading script: %s" % self.name)
|
||||
ctx.log.info("Loading script: %s" % self.path)
|
||||
if self.ns:
|
||||
ctx.master.addons.remove(self.ns)
|
||||
self.ns = load_script(ctx, self.path)
|
||||
@ -108,7 +108,6 @@ class ScriptLoader:
|
||||
if s in current:
|
||||
ordered.append(current[s])
|
||||
else:
|
||||
ctx.log.info("Loading script: %s" % s)
|
||||
sc = Script(s)
|
||||
ordered.append(sc)
|
||||
newscripts.append(sc)
|
||||
|
@ -104,8 +104,6 @@ class OptManager:
|
||||
help: str,
|
||||
choices: typing.Optional[typing.Sequence[str]] = None
|
||||
) -> None:
|
||||
if name in self._options:
|
||||
raise ValueError("Option %s already exists" % name)
|
||||
self._options[name] = _Option(name, typespec, default, help, choices)
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -120,3 +120,9 @@ class context:
|
||||
self.configure(sc)
|
||||
self.master.addons.invoke_addon(sc, "tick")
|
||||
return sc.addons[0] if sc.addons else None
|
||||
|
||||
def invoke(self, addon, event, *args, **kwargs):
|
||||
"""
|
||||
Recursively invoke an event on an addon and all its children.
|
||||
"""
|
||||
return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
|
||||
|
@ -77,6 +77,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
|
||||
server = process_options(parser, opts, args)
|
||||
master = MasterKlass(opts, server)
|
||||
master.addons.trigger("configure", opts.keys())
|
||||
master.addons.trigger("tick")
|
||||
remaining = opts.update_known(**unknown)
|
||||
if remaining and opts.verbosity > 1:
|
||||
print("Ignored options: %s" % remaining)
|
||||
|
@ -1,114 +0,0 @@
|
||||
import json
|
||||
import shlex
|
||||
import pytest
|
||||
|
||||
from mitmproxy import options
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy import master
|
||||
from mitmproxy.addons import script
|
||||
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.net.http import cookies
|
||||
|
||||
example_dir = tutils.test_data.push("../examples")
|
||||
|
||||
|
||||
class ScriptError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RaiseMaster(master.Master):
|
||||
def add_log(self, e, level):
|
||||
if level in ("warn", "error"):
|
||||
raise ScriptError(e)
|
||||
|
||||
|
||||
def tscript(cmd, args=""):
|
||||
o = options.Options()
|
||||
cmd = example_dir.path(cmd) + " " + args
|
||||
m = RaiseMaster(o, proxy.DummyServer())
|
||||
sc = script.Script(cmd)
|
||||
m.addons.add(sc)
|
||||
return m, sc
|
||||
|
||||
|
||||
class TestHARDump:
|
||||
|
||||
def flow(self, resp_content=b'message'):
|
||||
times = dict(
|
||||
timestamp_start=746203272,
|
||||
timestamp_end=746203272,
|
||||
)
|
||||
|
||||
# Create a dummy flow for testing
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(method=b'GET', **times),
|
||||
resp=tutils.tresp(content=resp_content, **times)
|
||||
)
|
||||
|
||||
def test_no_file_arg(self):
|
||||
with pytest.raises(ScriptError):
|
||||
tscript("complex/har_dump.py")
|
||||
|
||||
def test_simple(self, tmpdir):
|
||||
path = str(tmpdir.join("somefile"))
|
||||
|
||||
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
|
||||
m.addons.trigger("response", self.flow())
|
||||
m.addons.remove(sc)
|
||||
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert len(har["log"]["entries"]) == 1
|
||||
|
||||
def test_base64(self, tmpdir):
|
||||
path = str(tmpdir.join("somefile"))
|
||||
|
||||
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
|
||||
m.addons.trigger(
|
||||
"response", self.flow(resp_content=b"foo" + b"\xFF" * 10)
|
||||
)
|
||||
m.addons.remove(sc)
|
||||
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
|
||||
|
||||
def test_format_cookies(self):
|
||||
m, sc = tscript("complex/har_dump.py", "-")
|
||||
format_cookies = sc.ns.format_cookies
|
||||
|
||||
CA = cookies.CookieAttrs
|
||||
|
||||
f = format_cookies([("n", "v", CA([("k", "v")]))])[0]
|
||||
assert f['name'] == "n"
|
||||
assert f['value'] == "v"
|
||||
assert not f['httpOnly']
|
||||
assert not f['secure']
|
||||
|
||||
f = format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0]
|
||||
assert f['httpOnly']
|
||||
assert f['secure']
|
||||
|
||||
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
|
||||
assert f['expires']
|
||||
|
||||
def test_binary(self, tmpdir):
|
||||
|
||||
f = self.flow()
|
||||
f.request.method = "POST"
|
||||
f.request.headers["content-type"] = "application/x-www-form-urlencoded"
|
||||
f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f"
|
||||
f.response.headers["random-junk"] = bytes(range(256))
|
||||
f.response.content = bytes(range(256))
|
||||
|
||||
path = str(tmpdir.join("somefile"))
|
||||
|
||||
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
|
||||
m.addons.trigger("response", f)
|
||||
m.addons.remove(sc)
|
||||
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert len(har["log"]["entries"]) == 1
|
86
test/examples/test_har_dump.py
Normal file
86
test/examples/test_har_dump.py
Normal file
@ -0,0 +1,86 @@
|
||||
import json
|
||||
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.net.http import cookies
|
||||
|
||||
example_dir = tutils.test_data.push("../examples")
|
||||
|
||||
|
||||
class TestHARDump:
|
||||
def flow(self, resp_content=b'message'):
|
||||
times = dict(
|
||||
timestamp_start=746203272,
|
||||
timestamp_end=746203272,
|
||||
)
|
||||
|
||||
# Create a dummy flow for testing
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(method=b'GET', **times),
|
||||
resp=tutils.tresp(content=resp_content, **times)
|
||||
)
|
||||
|
||||
def test_simple(self, tmpdir):
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.script(example_dir.path("complex/har_dump.py"))
|
||||
path = str(tmpdir.join("somefile"))
|
||||
tctx.configure(a, hardump=path)
|
||||
tctx.invoke(a, "response", self.flow())
|
||||
tctx.invoke(a, "done")
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert len(har["log"]["entries"]) == 1
|
||||
|
||||
def test_base64(self, tmpdir):
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.script(example_dir.path("complex/har_dump.py"))
|
||||
path = str(tmpdir.join("somefile"))
|
||||
tctx.configure(a, hardump=path)
|
||||
|
||||
tctx.invoke(
|
||||
a, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)
|
||||
)
|
||||
tctx.invoke(a, "done")
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
|
||||
|
||||
def test_format_cookies(self):
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.script(example_dir.path("complex/har_dump.py"))
|
||||
|
||||
CA = cookies.CookieAttrs
|
||||
|
||||
f = a.format_cookies([("n", "v", CA([("k", "v")]))])[0]
|
||||
assert f['name'] == "n"
|
||||
assert f['value'] == "v"
|
||||
assert not f['httpOnly']
|
||||
assert not f['secure']
|
||||
|
||||
f = a.format_cookies([("n", "v", CA([("httponly", None), ("secure", None)]))])[0]
|
||||
assert f['httpOnly']
|
||||
assert f['secure']
|
||||
|
||||
f = a.format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
|
||||
assert f['expires']
|
||||
|
||||
def test_binary(self, tmpdir):
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.script(example_dir.path("complex/har_dump.py"))
|
||||
path = str(tmpdir.join("somefile"))
|
||||
tctx.configure(a, hardump=path)
|
||||
|
||||
f = self.flow()
|
||||
f.request.method = "POST"
|
||||
f.request.headers["content-type"] = "application/x-www-form-urlencoded"
|
||||
f.request.content = b"foo=bar&baz=s%c3%bc%c3%9f"
|
||||
f.response.headers["random-junk"] = bytes(range(256))
|
||||
f.response.content = bytes(range(256))
|
||||
|
||||
tctx.invoke(a, "response", f)
|
||||
tctx.invoke(a, "done")
|
||||
|
||||
with open(path, "r") as inp:
|
||||
har = json.load(inp)
|
||||
assert len(har["log"]["entries"]) == 1
|
@ -76,6 +76,13 @@ def test_defaults():
|
||||
assert addons.default_addons()
|
||||
|
||||
|
||||
def test_loader():
|
||||
with taddons.context() as tctx:
|
||||
l = addonmanager.Loader(tctx.master)
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
|
||||
|
||||
def test_simple():
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.master.addons
|
||||
|
@ -38,12 +38,6 @@ class TM(optmanager.OptManager):
|
||||
self.add_option("one", typing.Optional[str], None, "help")
|
||||
|
||||
|
||||
def test_add_option():
|
||||
o = TO()
|
||||
with pytest.raises(ValueError, match="already exists"):
|
||||
o.add_option("one", typing.Optional[int], None, "help")
|
||||
|
||||
|
||||
def test_defaults():
|
||||
o = TD2()
|
||||
defaults = {
|
||||
|
Loading…
Reference in New Issue
Block a user