mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 10:16:27 +00:00
Merge pull request #2264 from cortesi/options
Change the way addons handle options
This commit is contained in:
commit
1daf0b3f0a
@ -6,6 +6,6 @@ def load(l):
|
||||
l.add_option("custom", bool, False, "A custom option")
|
||||
|
||||
|
||||
def configure(options, updated):
|
||||
def configure(updated):
|
||||
if "custom" in updated:
|
||||
ctx.log.info("custom option value: %s" % options.custom)
|
||||
ctx.log.info("custom option value: %s" % ctx.options.custom)
|
||||
|
@ -1,26 +1,21 @@
|
||||
# (this script works best with --anticache)
|
||||
from bs4 import BeautifulSoup
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class Injector:
|
||||
def __init__(self):
|
||||
self.iframe_url = None
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"iframe", str, "", "IFrame to inject"
|
||||
)
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.iframe_url = options.iframe
|
||||
|
||||
def response(self, flow):
|
||||
if self.iframe_url:
|
||||
if ctx.options.iframe:
|
||||
html = BeautifulSoup(flow.response.content, "html.parser")
|
||||
if html.body:
|
||||
iframe = html.new_tag(
|
||||
"iframe",
|
||||
src=self.iframe_url,
|
||||
src=ctx.options.iframe,
|
||||
frameborder=0,
|
||||
height=0,
|
||||
width=0)
|
||||
|
@ -110,7 +110,7 @@ class AddonManager:
|
||||
master.options.changed.connect(self._configure_all)
|
||||
|
||||
def _configure_all(self, options, updated):
|
||||
self.trigger("configure", options, updated)
|
||||
self.trigger("configure", updated)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
@ -129,9 +129,13 @@ class AddonManager:
|
||||
|
||||
def register(self, addon):
|
||||
"""
|
||||
Register an addon and all its sub-addons with the manager without
|
||||
adding it to the chain. This should be used by addons that
|
||||
dynamically manage addons. Must be called within a current context.
|
||||
Register an addon, call its load event, and then register all its
|
||||
sub-addons. This should be used by addons that dynamically manage
|
||||
addons.
|
||||
|
||||
If the calling addon is already running, it should follow with
|
||||
running and configure events. Must be called within a current
|
||||
context.
|
||||
"""
|
||||
for a in traverse([addon]):
|
||||
name = _get_name(a)
|
||||
|
@ -1,10 +1,7 @@
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class AntiCache:
|
||||
def __init__(self):
|
||||
self.enabled = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.enabled = options.anticache
|
||||
|
||||
def request(self, flow):
|
||||
if self.enabled:
|
||||
if ctx.options.anticache:
|
||||
flow.request.anticache()
|
||||
|
@ -1,10 +1,7 @@
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class AntiComp:
|
||||
def __init__(self):
|
||||
self.enabled = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.enabled = options.anticomp
|
||||
|
||||
def request(self, flow):
|
||||
if self.enabled:
|
||||
if ctx.options.anticomp:
|
||||
flow.request.anticomp()
|
||||
|
@ -7,7 +7,7 @@ class CheckALPN:
|
||||
def __init__(self):
|
||||
self.failed = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
self.failed = mitmproxy.ctx.master.options.http2 and not tcp.HAS_ALPN
|
||||
if self.failed:
|
||||
ctx.log.warn(
|
||||
|
@ -5,7 +5,7 @@ class CheckCA:
|
||||
def __init__(self):
|
||||
self.failed = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
has_ca = (
|
||||
mitmproxy.ctx.master.server and
|
||||
mitmproxy.ctx.master.server.config and
|
||||
|
@ -20,12 +20,12 @@ class ClientPlayback:
|
||||
def load(self, flows: typing.Sequence[flow.Flow]):
|
||||
self.flows = flows
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "client_replay" in updated:
|
||||
if options.client_replay:
|
||||
ctx.log.info("Client Replay: {}".format(options.client_replay))
|
||||
if ctx.options.client_replay:
|
||||
ctx.log.info("Client Replay: {}".format(ctx.options.client_replay))
|
||||
try:
|
||||
flows = io.read_flows_from_paths(options.client_replay)
|
||||
flows = io.read_flows_from_paths(ctx.options.client_replay)
|
||||
except exceptions.FlowReadException as e:
|
||||
raise exceptions.OptionsError(str(e))
|
||||
self.load(flows)
|
||||
|
@ -4,12 +4,14 @@
|
||||
"""
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import platform
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.net import server_spec
|
||||
from mitmproxy.utils import human
|
||||
|
||||
|
||||
class CoreOptionValidation:
|
||||
def configure(self, opts, updated):
|
||||
def configure(self, updated):
|
||||
opts = ctx.options
|
||||
if opts.add_upstream_certs_to_client_chain and not opts.upstream_cert:
|
||||
raise exceptions.OptionsError(
|
||||
"The no-upstream-cert and add-upstream-certs-to-client-chain "
|
||||
|
@ -14,9 +14,6 @@ class DisableH2C:
|
||||
by sending the connection preface. We just kill those flows.
|
||||
"""
|
||||
|
||||
def configure(self, options, updated):
|
||||
pass
|
||||
|
||||
def process_flow(self, f):
|
||||
if f.request.headers.get('upgrade', '') == 'h2c':
|
||||
mitmproxy.ctx.log.warn("HTTP/2 cleartext connections (h2c upgrade requests) are currently not supported.")
|
||||
|
@ -29,24 +29,18 @@ def colorful(line, styles):
|
||||
class Dumper:
|
||||
def __init__(self, outfile=sys.stdout):
|
||||
self.filter = None # type: flowfilter.TFilter
|
||||
self.flow_detail = None # type: int
|
||||
self.outfp = outfile # type: typing.io.TextIO
|
||||
self.showhost = None # type: bool
|
||||
self.default_contentview = "auto" # type: str
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "view_filter" in updated:
|
||||
if options.view_filter:
|
||||
self.filter = flowfilter.parse(options.view_filter)
|
||||
if ctx.options.view_filter:
|
||||
self.filter = flowfilter.parse(ctx.options.view_filter)
|
||||
if not self.filter:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid filter expression: %s" % options.view_filter
|
||||
"Invalid filter expression: %s" % ctx.options.view_filter
|
||||
)
|
||||
else:
|
||||
self.filter = None
|
||||
self.flow_detail = options.flow_detail
|
||||
self.showhost = options.showhost
|
||||
self.default_contentview = options.default_contentview
|
||||
|
||||
def echo(self, text, ident=None, **style):
|
||||
if ident:
|
||||
@ -67,13 +61,13 @@ class Dumper:
|
||||
|
||||
def _echo_message(self, message):
|
||||
_, lines, error = contentviews.get_message_content_view(
|
||||
self.default_contentview,
|
||||
ctx.options.default_contentview,
|
||||
message
|
||||
)
|
||||
if error:
|
||||
ctx.log.debug(error)
|
||||
|
||||
if self.flow_detail == 3:
|
||||
if ctx.options.flow_detail == 3:
|
||||
lines_to_echo = itertools.islice(lines, 70)
|
||||
else:
|
||||
lines_to_echo = lines
|
||||
@ -95,7 +89,7 @@ class Dumper:
|
||||
if next(lines, None):
|
||||
self.echo("(cut off)", ident=4, dim=True)
|
||||
|
||||
if self.flow_detail >= 2:
|
||||
if ctx.options.flow_detail >= 2:
|
||||
self.echo("")
|
||||
|
||||
def _echo_request_line(self, flow):
|
||||
@ -121,12 +115,12 @@ class Dumper:
|
||||
fg=method_color,
|
||||
bold=True
|
||||
)
|
||||
if self.showhost:
|
||||
if ctx.options.showhost:
|
||||
url = flow.request.pretty_url
|
||||
else:
|
||||
url = flow.request.url
|
||||
terminalWidthLimit = max(shutil.get_terminal_size()[0] - 25, 50)
|
||||
if self.flow_detail < 1 and len(url) > terminalWidthLimit:
|
||||
if ctx.options.flow_detail < 1 and len(url) > terminalWidthLimit:
|
||||
url = url[:terminalWidthLimit] + "…"
|
||||
url = click.style(strutils.escape_control_characters(url), bold=True)
|
||||
|
||||
@ -176,7 +170,7 @@ class Dumper:
|
||||
size = click.style(size, bold=True)
|
||||
|
||||
arrows = click.style(" <<", bold=True)
|
||||
if self.flow_detail == 1:
|
||||
if ctx.options.flow_detail == 1:
|
||||
# This aligns the HTTP response code with the HTTP request method:
|
||||
# 127.0.0.1:59519: GET http://example.com/
|
||||
# << 304 Not Modified 0b
|
||||
@ -194,16 +188,16 @@ class Dumper:
|
||||
def echo_flow(self, f):
|
||||
if f.request:
|
||||
self._echo_request_line(f)
|
||||
if self.flow_detail >= 2:
|
||||
if ctx.options.flow_detail >= 2:
|
||||
self._echo_headers(f.request.headers)
|
||||
if self.flow_detail >= 3:
|
||||
if ctx.options.flow_detail >= 3:
|
||||
self._echo_message(f.request)
|
||||
|
||||
if f.response:
|
||||
self._echo_response_line(f)
|
||||
if self.flow_detail >= 2:
|
||||
if ctx.options.flow_detail >= 2:
|
||||
self._echo_headers(f.response.headers)
|
||||
if self.flow_detail >= 3:
|
||||
if ctx.options.flow_detail >= 3:
|
||||
self._echo_message(f.response)
|
||||
|
||||
if f.error:
|
||||
@ -211,7 +205,7 @@ class Dumper:
|
||||
self.echo(" << {}".format(msg), bold=True, fg="red")
|
||||
|
||||
def match(self, f):
|
||||
if self.flow_detail == 0:
|
||||
if ctx.options.flow_detail == 0:
|
||||
return False
|
||||
if not self.filter:
|
||||
return True
|
||||
@ -239,7 +233,7 @@ class Dumper:
|
||||
if self.match(f):
|
||||
message = f.messages[-1]
|
||||
self.echo(f.message_info(message))
|
||||
if self.flow_detail >= 3:
|
||||
if ctx.options.flow_detail >= 3:
|
||||
self._echo_message(message)
|
||||
|
||||
def websocket_end(self, f):
|
||||
@ -267,5 +261,5 @@ class Dumper:
|
||||
server=repr(f.server_conn.address),
|
||||
direction=direction,
|
||||
))
|
||||
if self.flow_detail >= 3:
|
||||
if ctx.options.flow_detail >= 3:
|
||||
self._echo_message(message)
|
||||
|
@ -1,20 +1,21 @@
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class Intercept:
|
||||
def __init__(self):
|
||||
self.filt = None
|
||||
|
||||
def configure(self, opts, updated):
|
||||
def configure(self, updated):
|
||||
if "intercept" in updated:
|
||||
if not opts.intercept:
|
||||
if not ctx.options.intercept:
|
||||
self.filt = None
|
||||
return
|
||||
self.filt = flowfilter.parse(opts.intercept)
|
||||
self.filt = flowfilter.parse(ctx.options.intercept)
|
||||
if not self.filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid interception filter: %s" % opts.intercept
|
||||
"Invalid interception filter: %s" % ctx.options.intercept
|
||||
)
|
||||
|
||||
def process_flow(self, f):
|
||||
|
@ -1,5 +1,6 @@
|
||||
from mitmproxy.addons import wsgiapp
|
||||
from mitmproxy.addons.onboardingapp import app
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class Onboarding(wsgiapp.WSGIApp):
|
||||
@ -7,13 +8,11 @@ class Onboarding(wsgiapp.WSGIApp):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(app.Adapter(app.application), None, None)
|
||||
self.enabled = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.host = options.onboarding_host
|
||||
self.port = options.onboarding_port
|
||||
self.enabled = options.onboarding
|
||||
def configure(self, updated):
|
||||
self.host = ctx.options.onboarding_host
|
||||
self.port = ctx.options.onboarding_port
|
||||
|
||||
def request(self, f):
|
||||
if self.enabled:
|
||||
if ctx.options.onboarding:
|
||||
super().request(f)
|
||||
|
@ -10,6 +10,7 @@ import mitmproxy.net.http
|
||||
from mitmproxy import connections # noqa
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.net.http import status_codes
|
||||
|
||||
REALM = "mitmproxy"
|
||||
@ -45,7 +46,6 @@ class ProxyAuth:
|
||||
self.nonanonymous = False
|
||||
self.htpasswd = None
|
||||
self.singleuser = None
|
||||
self.mode = None
|
||||
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
|
||||
@ -58,7 +58,7 @@ class ProxyAuth:
|
||||
- True, if authentication is done as if mitmproxy is a proxy
|
||||
- False, if authentication is done as if mitmproxy is a HTTP server
|
||||
"""
|
||||
return self.mode in ("regular", "upstream")
|
||||
return ctx.options.mode in ("regular", "upstream")
|
||||
|
||||
def which_auth_header(self) -> str:
|
||||
if self.is_proxy_auth():
|
||||
@ -113,16 +113,16 @@ class ProxyAuth:
|
||||
return False
|
||||
|
||||
# Handlers
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "proxyauth" in updated:
|
||||
self.nonanonymous = False
|
||||
self.singleuser = None
|
||||
self.htpasswd = None
|
||||
if options.proxyauth:
|
||||
if options.proxyauth == "any":
|
||||
if ctx.options.proxyauth:
|
||||
if ctx.options.proxyauth == "any":
|
||||
self.nonanonymous = True
|
||||
elif options.proxyauth.startswith("@"):
|
||||
p = options.proxyauth[1:]
|
||||
elif ctx.options.proxyauth.startswith("@"):
|
||||
p = ctx.options.proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(p)
|
||||
except (ValueError, OSError) as v:
|
||||
@ -130,20 +130,18 @@ class ProxyAuth:
|
||||
"Could not open htpasswd file: %s" % p
|
||||
)
|
||||
else:
|
||||
parts = options.proxyauth.split(':')
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user auth specification."
|
||||
)
|
||||
self.singleuser = parts
|
||||
if "mode" in updated:
|
||||
self.mode = options.mode
|
||||
if self.enabled():
|
||||
if options.mode == "transparent":
|
||||
if ctx.options.mode == "transparent":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in transparent mode."
|
||||
)
|
||||
if options.mode == "socks5":
|
||||
if ctx.options.mode == "socks5":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in SOCKS mode. "
|
||||
"https://github.com/mitmproxy/mitmproxy/issues/738"
|
||||
|
@ -9,9 +9,6 @@ class ReadFile:
|
||||
"""
|
||||
An addon that handles reading from file on startup.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.path = None
|
||||
|
||||
def load_flows_file(self, path: str) -> int:
|
||||
path = os.path.expanduser(path)
|
||||
cnt = 0
|
||||
@ -31,16 +28,11 @@ class ReadFile:
|
||||
ctx.log.error("Flow file corrupted.")
|
||||
raise exceptions.FlowReadException(v)
|
||||
|
||||
def configure(self, options, updated):
|
||||
if "rfile" in updated and options.rfile:
|
||||
self.path = options.rfile
|
||||
|
||||
def running(self):
|
||||
if self.path:
|
||||
if ctx.options.rfile:
|
||||
try:
|
||||
self.load_flows_file(self.path)
|
||||
self.load_flows_file(ctx.options.rfile)
|
||||
except exceptions.FlowReadException as v:
|
||||
raise exceptions.OptionsError(v)
|
||||
finally:
|
||||
self.path = None
|
||||
ctx.master.addons.trigger("processing_complete")
|
||||
|
@ -47,7 +47,7 @@ class Replace:
|
||||
def __init__(self):
|
||||
self.lst = []
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
"""
|
||||
.replacements is a list of tuples (fpat, rex, s):
|
||||
|
||||
@ -57,7 +57,7 @@ class Replace:
|
||||
"""
|
||||
if "replacements" in updated:
|
||||
lst = []
|
||||
for rep in options.replacements:
|
||||
for rep in ctx.options.replacements:
|
||||
fpatt, rex, s = parse_hook(rep)
|
||||
|
||||
flt = flowfilter.parse(fpatt)
|
||||
|
@ -1,15 +1,12 @@
|
||||
import os
|
||||
import importlib
|
||||
import threading
|
||||
import time
|
||||
import sys
|
||||
|
||||
from mitmproxy import addonmanager
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import ctx
|
||||
|
||||
import watchdog.events
|
||||
from watchdog.observers import polling
|
||||
|
||||
|
||||
def load_script(actx, path):
|
||||
if not os.path.exists(path):
|
||||
@ -28,79 +25,44 @@ def load_script(actx, path):
|
||||
sys.path[:] = oldpath
|
||||
|
||||
|
||||
class ReloadHandler(watchdog.events.FileSystemEventHandler):
|
||||
def __init__(self, callback):
|
||||
self.callback = callback
|
||||
|
||||
def filter(self, event):
|
||||
"""
|
||||
Returns True only when .py file is changed
|
||||
"""
|
||||
if event.is_directory:
|
||||
return False
|
||||
if os.path.basename(event.src_path).startswith("."):
|
||||
return False
|
||||
if event.src_path.endswith(".py"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def on_modified(self, event):
|
||||
if self.filter(event):
|
||||
self.callback()
|
||||
|
||||
def on_created(self, event):
|
||||
if self.filter(event):
|
||||
self.callback()
|
||||
|
||||
|
||||
class Script:
|
||||
"""
|
||||
An addon that manages a single script.
|
||||
"""
|
||||
ReloadInterval = 2
|
||||
|
||||
def __init__(self, path):
|
||||
self.name = "scriptmanager:" + path
|
||||
self.path = path
|
||||
self.ns = None
|
||||
self.observer = None
|
||||
|
||||
self.last_options = None
|
||||
self.should_reload = threading.Event()
|
||||
|
||||
def load(self, l):
|
||||
self.ns = load_script(ctx, self.path)
|
||||
self.last_load = 0
|
||||
self.last_mtime = 0
|
||||
|
||||
@property
|
||||
def addons(self):
|
||||
if self.ns is not None:
|
||||
return [self.ns]
|
||||
return []
|
||||
|
||||
def reload(self):
|
||||
self.should_reload.set()
|
||||
return [self.ns] if self.ns else []
|
||||
|
||||
def tick(self):
|
||||
if self.should_reload.is_set():
|
||||
self.should_reload.clear()
|
||||
ctx.log.info("Reloading script: %s" % self.name)
|
||||
if self.ns:
|
||||
ctx.master.addons.remove(self.ns)
|
||||
self.ns = load_script(ctx, self.path)
|
||||
if self.ns:
|
||||
# We're already running, so we have to explicitly register and
|
||||
# configure the addon
|
||||
ctx.master.addons.register(self.ns)
|
||||
self.configure(self.last_options, self.last_options.keys())
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.last_options = options
|
||||
if not self.observer:
|
||||
self.observer = polling.PollingObserver()
|
||||
# Bind the handler to the real underlying master object
|
||||
self.observer.schedule(
|
||||
ReloadHandler(self.reload),
|
||||
os.path.dirname(self.path) or "."
|
||||
)
|
||||
self.observer.start()
|
||||
if time.time() - self.last_load > self.ReloadInterval:
|
||||
mtime = os.stat(self.path).st_mtime
|
||||
if mtime > self.last_mtime:
|
||||
ctx.log.info("Loading script: %s" % self.name)
|
||||
if self.ns:
|
||||
ctx.master.addons.remove(self.ns)
|
||||
self.ns = load_script(ctx, self.path)
|
||||
if self.ns:
|
||||
# We're already running, so we have to explicitly register and
|
||||
# configure the addon
|
||||
ctx.master.addons.register(self.ns)
|
||||
ctx.master.addons.invoke_addon(self.ns, "running")
|
||||
ctx.master.addons.invoke_addon(
|
||||
self.ns,
|
||||
"configure",
|
||||
ctx.options.keys()
|
||||
)
|
||||
self.last_load = time.time()
|
||||
self.last_mtime = mtime
|
||||
|
||||
|
||||
class ScriptLoader:
|
||||
@ -118,14 +80,14 @@ class ScriptLoader:
|
||||
# Returning once we have proper commands
|
||||
raise NotImplementedError
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "scripts" in updated:
|
||||
for s in options.scripts:
|
||||
if options.scripts.count(s) > 1:
|
||||
for s in ctx.options.scripts:
|
||||
if ctx.options.scripts.count(s) > 1:
|
||||
raise exceptions.OptionsError("Duplicate script: %s" % s)
|
||||
|
||||
for a in self.addons[:]:
|
||||
if a.path not in options.scripts:
|
||||
if a.path not in ctx.options.scripts:
|
||||
ctx.log.info("Un-loading script: %s" % a.name)
|
||||
ctx.master.addons.remove(a)
|
||||
self.addons.remove(a)
|
||||
@ -142,7 +104,7 @@ class ScriptLoader:
|
||||
|
||||
ordered = []
|
||||
newscripts = []
|
||||
for s in options.scripts:
|
||||
for s in ctx.options.scripts:
|
||||
if s in current:
|
||||
ordered.append(current[s])
|
||||
else:
|
||||
|
@ -10,8 +10,6 @@ from mitmproxy import io
|
||||
|
||||
class ServerPlayback:
|
||||
def __init__(self):
|
||||
self.options = None
|
||||
|
||||
self.flowmap = {}
|
||||
self.stop = False
|
||||
self.final_flow = None
|
||||
@ -38,27 +36,27 @@ class ServerPlayback:
|
||||
queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True)
|
||||
|
||||
key = [str(r.port), str(r.scheme), str(r.method), str(path)] # type: List[Any]
|
||||
if not self.options.server_replay_ignore_content:
|
||||
if self.options.server_replay_ignore_payload_params and r.multipart_form:
|
||||
if not ctx.options.server_replay_ignore_content:
|
||||
if ctx.options.server_replay_ignore_payload_params and r.multipart_form:
|
||||
key.extend(
|
||||
(k, v)
|
||||
for k, v in r.multipart_form.items(multi=True)
|
||||
if k.decode(errors="replace") not in self.options.server_replay_ignore_payload_params
|
||||
if k.decode(errors="replace") not in ctx.options.server_replay_ignore_payload_params
|
||||
)
|
||||
elif self.options.server_replay_ignore_payload_params and r.urlencoded_form:
|
||||
elif ctx.options.server_replay_ignore_payload_params and r.urlencoded_form:
|
||||
key.extend(
|
||||
(k, v)
|
||||
for k, v in r.urlencoded_form.items(multi=True)
|
||||
if k not in self.options.server_replay_ignore_payload_params
|
||||
if k not in ctx.options.server_replay_ignore_payload_params
|
||||
)
|
||||
else:
|
||||
key.append(str(r.raw_content))
|
||||
|
||||
if not self.options.server_replay_ignore_host:
|
||||
if not ctx.options.server_replay_ignore_host:
|
||||
key.append(r.host)
|
||||
|
||||
filtered = []
|
||||
ignore_params = self.options.server_replay_ignore_params or []
|
||||
ignore_params = ctx.options.server_replay_ignore_params or []
|
||||
for p in queriesArray:
|
||||
if p[0] not in ignore_params:
|
||||
filtered.append(p)
|
||||
@ -66,9 +64,9 @@ class ServerPlayback:
|
||||
key.append(p[0])
|
||||
key.append(p[1])
|
||||
|
||||
if self.options.server_replay_use_headers:
|
||||
if ctx.options.server_replay_use_headers:
|
||||
headers = []
|
||||
for i in self.options.server_replay_use_headers:
|
||||
for i in ctx.options.server_replay_use_headers:
|
||||
v = r.headers.get(i)
|
||||
headers.append((i, v))
|
||||
key.append(headers)
|
||||
@ -83,7 +81,7 @@ class ServerPlayback:
|
||||
"""
|
||||
hsh = self._hash(request)
|
||||
if hsh in self.flowmap:
|
||||
if self.options.server_replay_nopop:
|
||||
if ctx.options.server_replay_nopop:
|
||||
return self.flowmap[hsh][0]
|
||||
else:
|
||||
ret = self.flowmap[hsh].pop(0)
|
||||
@ -91,13 +89,12 @@ class ServerPlayback:
|
||||
del self.flowmap[hsh]
|
||||
return ret
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.options = options
|
||||
def configure(self, updated):
|
||||
if "server_replay" in updated:
|
||||
self.clear()
|
||||
if options.server_replay:
|
||||
if ctx.options.server_replay:
|
||||
try:
|
||||
flows = io.read_flows_from_paths(options.server_replay)
|
||||
flows = io.read_flows_from_paths(ctx.options.server_replay)
|
||||
except exceptions.FlowReadException as e:
|
||||
raise exceptions.OptionsError(str(e))
|
||||
self.load_flows(flows)
|
||||
@ -112,13 +109,13 @@ class ServerPlayback:
|
||||
if rflow:
|
||||
response = rflow.response.copy()
|
||||
response.is_replay = True
|
||||
if self.options.refresh_server_playback:
|
||||
if ctx.options.refresh_server_playback:
|
||||
response.refresh()
|
||||
f.response = response
|
||||
if not self.flowmap:
|
||||
self.final_flow = f
|
||||
self.stop = True
|
||||
elif self.options.replay_kill_extra:
|
||||
elif ctx.options.replay_kill_extra:
|
||||
ctx.log.warn(
|
||||
"server_playback: killed non-replay request {}".format(
|
||||
f.request.url
|
||||
|
@ -1,5 +1,6 @@
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
def parse_setheader(s):
|
||||
@ -43,17 +44,10 @@ class SetHeaders:
|
||||
def __init__(self):
|
||||
self.lst = []
|
||||
|
||||
def configure(self, options, updated):
|
||||
"""
|
||||
options.setheaders is a tuple of (fpatt, header, value)
|
||||
|
||||
fpatt: String specifying a filter pattern.
|
||||
header: Header name.
|
||||
value: Header value string
|
||||
"""
|
||||
def configure(self, updated):
|
||||
if "setheaders" in updated:
|
||||
self.lst = []
|
||||
for shead in options.setheaders:
|
||||
for shead in ctx.options.setheaders:
|
||||
fpatt, header, value = parse_setheader(shead)
|
||||
|
||||
flt = flowfilter.parse(fpatt)
|
||||
|
@ -1,5 +1,6 @@
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class StickyAuth:
|
||||
@ -7,13 +8,13 @@ class StickyAuth:
|
||||
self.flt = None
|
||||
self.hosts = {}
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "stickyauth" in updated:
|
||||
if options.stickyauth:
|
||||
flt = flowfilter.parse(options.stickyauth)
|
||||
if ctx.options.stickyauth:
|
||||
flt = flowfilter.parse(ctx.options.stickyauth)
|
||||
if not flt:
|
||||
raise exceptions.OptionsError(
|
||||
"stickyauth: invalid filter expression: %s" % options.stickyauth
|
||||
"stickyauth: invalid filter expression: %s" % ctx.options.stickyauth
|
||||
)
|
||||
self.flt = flt
|
||||
else:
|
||||
|
@ -5,6 +5,7 @@ from mitmproxy.net.http import cookies
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
def ckey(attrs, f):
|
||||
@ -33,13 +34,13 @@ class StickyCookie:
|
||||
self.jar = collections.defaultdict(dict)
|
||||
self.flt = None
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
if "stickycookie" in updated:
|
||||
if options.stickycookie:
|
||||
flt = flowfilter.parse(options.stickycookie)
|
||||
if ctx.options.stickycookie:
|
||||
flt = flowfilter.parse(ctx.options.stickycookie)
|
||||
if not flt:
|
||||
raise exceptions.OptionsError(
|
||||
"stickycookie: invalid filter expression: %s" % options.stickycookie
|
||||
"stickycookie: invalid filter expression: %s" % ctx.options.stickycookie
|
||||
)
|
||||
self.flt = flt
|
||||
else:
|
||||
|
@ -8,10 +8,10 @@ class StreamBodies:
|
||||
def __init__(self):
|
||||
self.max_size = None
|
||||
|
||||
def configure(self, options, updated):
|
||||
if "stream_large_bodies" in updated and options.stream_large_bodies:
|
||||
def configure(self, updated):
|
||||
if "stream_large_bodies" in updated and ctx.options.stream_large_bodies:
|
||||
try:
|
||||
self.max_size = human.parse_size(options.stream_large_bodies)
|
||||
self.max_size = human.parse_size(ctx.options.stream_large_bodies)
|
||||
except ValueError as e:
|
||||
raise exceptions.OptionsError(e)
|
||||
|
||||
|
@ -3,6 +3,7 @@ import os.path
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import io
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
class StreamFile:
|
||||
@ -20,26 +21,26 @@ class StreamFile:
|
||||
self.stream = io.FilteredFlowWriter(f, flt)
|
||||
self.active_flows = set()
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
# We're already streaming - stop the previous stream and restart
|
||||
if "streamfile_filter" in updated:
|
||||
if options.streamfile_filter:
|
||||
self.filt = flowfilter.parse(options.streamfile_filter)
|
||||
if ctx.options.streamfile_filter:
|
||||
self.filt = flowfilter.parse(ctx.options.streamfile_filter)
|
||||
if not self.filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid filter specification: %s" % options.streamfile_filter
|
||||
"Invalid filter specification: %s" % ctx.options.streamfile_filter
|
||||
)
|
||||
else:
|
||||
self.filt = None
|
||||
if "streamfile" in updated:
|
||||
if self.stream:
|
||||
self.done()
|
||||
if options.streamfile:
|
||||
if options.streamfile.startswith("+"):
|
||||
path = options.streamfile[1:]
|
||||
if ctx.options.streamfile:
|
||||
if ctx.options.streamfile.startswith("+"):
|
||||
path = ctx.options.streamfile[1:]
|
||||
mode = "ab"
|
||||
else:
|
||||
path = options.streamfile
|
||||
path = ctx.options.streamfile
|
||||
mode = "wb"
|
||||
self.start_stream_to_path(path, mode, self.filt)
|
||||
|
||||
|
@ -2,6 +2,7 @@ import sys
|
||||
import click
|
||||
|
||||
from mitmproxy import log
|
||||
from mitmproxy import ctx
|
||||
|
||||
# These get over-ridden by the save execution context. Keep them around so we
|
||||
# can log directly.
|
||||
@ -11,19 +12,15 @@ realstderr = sys.stderr
|
||||
|
||||
class TermLog:
|
||||
def __init__(self, outfile=None):
|
||||
self.options = None
|
||||
self.outfile = outfile
|
||||
|
||||
def configure(self, options, updated):
|
||||
self.options = options
|
||||
|
||||
def log(self, e):
|
||||
if log.log_tier(e.level) == log.log_tier("error"):
|
||||
outfile = self.outfile or realstderr
|
||||
else:
|
||||
outfile = self.outfile or realstdout
|
||||
|
||||
if self.options.verbosity >= log.log_tier(e.level):
|
||||
if ctx.options.verbosity >= log.log_tier(e.level):
|
||||
click.secho(
|
||||
e.msg,
|
||||
file=outfile,
|
||||
|
@ -8,15 +8,8 @@ from mitmproxy.utils import human
|
||||
|
||||
|
||||
class TermStatus:
|
||||
def __init__(self):
|
||||
self.server = False
|
||||
|
||||
def configure(self, options, updated):
|
||||
if "server" in updated:
|
||||
self.server = options.server
|
||||
|
||||
def running(self):
|
||||
if self.server:
|
||||
if ctx.options.server:
|
||||
ctx.log.info(
|
||||
"Proxy server listening at http://{}".format(
|
||||
human.format_address(ctx.master.server.address)
|
||||
|
@ -2,6 +2,7 @@ import re
|
||||
import base64
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
|
||||
@ -26,20 +27,17 @@ class UpstreamAuth():
|
||||
"""
|
||||
def __init__(self):
|
||||
self.auth = None
|
||||
self.root_mode = None
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
# FIXME: We're doing this because our proxy core is terminally confused
|
||||
# at the moment. Ideally, we should be able to check if we're in
|
||||
# reverse proxy mode at the HTTP layer, so that scripts can put the
|
||||
# proxy in reverse proxy mode for specific reuests.
|
||||
if "mode" in updated:
|
||||
self.root_mode = options.mode
|
||||
if "upstream_auth" in updated:
|
||||
if options.upstream_auth is None:
|
||||
if ctx.options.upstream_auth is None:
|
||||
self.auth = None
|
||||
else:
|
||||
self.auth = parse_upstream_auth(options.upstream_auth)
|
||||
self.auth = parse_upstream_auth(ctx.options.upstream_auth)
|
||||
|
||||
def http_connect(self, f):
|
||||
if self.auth and f.mode == "upstream":
|
||||
@ -49,5 +47,5 @@ class UpstreamAuth():
|
||||
if self.auth:
|
||||
if f.mode == "upstream" and not f.server_conn.via:
|
||||
f.request.headers["Proxy-Authorization"] = self.auth
|
||||
elif self.root_mode == "reverse":
|
||||
elif ctx.options.mode == "reverse":
|
||||
f.request.headers["Proxy-Authorization"] = self.auth
|
||||
|
@ -18,6 +18,7 @@ import sortedcontainers
|
||||
import mitmproxy.flow
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import http # noqa
|
||||
|
||||
# The underlying sorted list implementation expects the sort key to be stable
|
||||
@ -302,26 +303,26 @@ class View(collections.Sequence):
|
||||
return self._store.get(flow_id)
|
||||
|
||||
# Event handlers
|
||||
def configure(self, opts, updated):
|
||||
def configure(self, updated):
|
||||
if "view_filter" in updated:
|
||||
filt = None
|
||||
if opts.view_filter:
|
||||
filt = flowfilter.parse(opts.view_filter)
|
||||
if ctx.options.view_filter:
|
||||
filt = flowfilter.parse(ctx.options.view_filter)
|
||||
if not filt:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid interception filter: %s" % opts.view_filter
|
||||
"Invalid interception filter: %s" % ctx.options.view_filter
|
||||
)
|
||||
self.set_filter(filt)
|
||||
if "console_order" in updated:
|
||||
if opts.console_order not in self.orders:
|
||||
if ctx.options.console_order not in self.orders:
|
||||
raise exceptions.OptionsError(
|
||||
"Unknown flow order: %s" % opts.console_order
|
||||
"Unknown flow order: %s" % ctx.options.console_order
|
||||
)
|
||||
self.set_order(self.orders[opts.console_order])
|
||||
self.set_order(self.orders[ctx.options.console_order])
|
||||
if "console_order_reversed" in updated:
|
||||
self.set_reversed(opts.console_order_reversed)
|
||||
self.set_reversed(ctx.options.console_order_reversed)
|
||||
if "console_focus_follow" in updated:
|
||||
self.focus_follow = opts.console_focus_follow
|
||||
self.focus_follow = ctx.options.console_focus_follow
|
||||
|
||||
def request(self, f):
|
||||
self.add(f)
|
||||
|
@ -1,4 +1,7 @@
|
||||
import mitmproxy.master # noqa
|
||||
import mitmproxy.log # noqa
|
||||
import mitmproxy.options # noqa
|
||||
|
||||
master = None # type: "mitmproxy.master.Master"
|
||||
log = None # type: "mitmproxy.log.Log"
|
||||
options = None # type: "mitmproxy.options.Options"
|
||||
|
@ -50,11 +50,13 @@ class Master:
|
||||
return
|
||||
mitmproxy_ctx.master = self
|
||||
mitmproxy_ctx.log = log.Log(self)
|
||||
mitmproxy_ctx.options = self.options
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
mitmproxy_ctx.master = None
|
||||
mitmproxy_ctx.log = None
|
||||
mitmproxy_ctx.options = None
|
||||
|
||||
def tell(self, mtype, m):
|
||||
m.reply = controller.DummyReply()
|
||||
|
@ -107,14 +107,16 @@ class context:
|
||||
self.master.addons.invoke_addon(
|
||||
addon,
|
||||
"configure",
|
||||
self.options,
|
||||
kwargs.keys()
|
||||
)
|
||||
|
||||
def script(self, path):
|
||||
"""
|
||||
Loads a script from path, and returns the enclosed addon.
|
||||
"""
|
||||
sc = script.Script(path)
|
||||
loader = addonmanager.Loader(self.master)
|
||||
sc.load(loader)
|
||||
for a in addonmanager.traverse(sc.addons):
|
||||
getattr(a, "load", lambda x: None)(loader)
|
||||
return sc
|
||||
self.master.addons.invoke_addon(sc, "load", loader)
|
||||
self.configure(sc)
|
||||
self.master.addons.invoke_addon(sc, "tick")
|
||||
return sc.addons[0] if sc.addons else None
|
||||
|
@ -76,7 +76,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
|
||||
unknown = optmanager.load_paths(opts, args.conf)
|
||||
server = process_options(parser, opts, args)
|
||||
master = MasterKlass(opts, server)
|
||||
master.addons.trigger("configure", opts, opts.keys())
|
||||
master.addons.trigger("configure", opts.keys())
|
||||
remaining = opts.update_known(**unknown)
|
||||
if remaining and opts.verbosity > 1:
|
||||
print("Ignored options: %s" % remaining)
|
||||
|
1
setup.py
1
setup.py
@ -80,7 +80,6 @@ setup(
|
||||
"ruamel.yaml>=0.13.2, <0.15",
|
||||
"tornado>=4.3, <4.6",
|
||||
"urwid>=1.3.1, <1.4",
|
||||
"watchdog>=0.8.3, <0.9",
|
||||
"brotlipy>=0.5.1, <0.7",
|
||||
"sortedcontainers>=1.5.4, <1.6",
|
||||
# transitive from cryptography, we just blacklist here.
|
||||
|
@ -1,9 +1,4 @@
|
||||
from mitmproxy import options
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy import master
|
||||
from mitmproxy.addons import script
|
||||
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.test import taddons
|
||||
@ -14,37 +9,20 @@ from ..mitmproxy import tservers
|
||||
example_dir = tutils.test_data.push("../examples")
|
||||
|
||||
|
||||
class ScriptError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RaiseMaster(master.Master):
|
||||
def add_log(self, e, level):
|
||||
if level in ("warn", "error"):
|
||||
raise ScriptError(e)
|
||||
|
||||
|
||||
def tscript(cmd, args=""):
|
||||
o = options.Options()
|
||||
cmd = example_dir.path(cmd)
|
||||
m = RaiseMaster(o, proxy.DummyServer())
|
||||
sc = script.Script(cmd)
|
||||
m.addons.add(sc)
|
||||
return m, sc
|
||||
|
||||
|
||||
class TestScripts(tservers.MasterTest):
|
||||
def test_add_header(self):
|
||||
m, _ = tscript("simple/add_header.py")
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
m.addons.handle_lifecycle("response", f)
|
||||
assert f.response.headers["newheader"] == "foo"
|
||||
with taddons.context() as tctx:
|
||||
a = tctx.script(example_dir.path("simple/add_header.py"))
|
||||
f = tflow.tflow(resp=tutils.tresp())
|
||||
a.response(f)
|
||||
assert f.response.headers["newheader"] == "foo"
|
||||
|
||||
def test_custom_contentviews(self):
|
||||
m, sc = tscript("simple/custom_contentview.py")
|
||||
swapcase = contentviews.get("swapcase")
|
||||
_, fmt = swapcase(b"<html>Test!</html>")
|
||||
assert any(b'tEST!' in val[0][1] for val in fmt)
|
||||
with taddons.context() as tctx:
|
||||
tctx.script(example_dir.path("simple/custom_contentview.py"))
|
||||
swapcase = contentviews.get("swapcase")
|
||||
_, fmt = swapcase(b"<html>Test!</html>")
|
||||
assert any(b'tEST!' in val[0][1] for val in fmt)
|
||||
|
||||
def test_iframe_injector(self):
|
||||
with taddons.context() as tctx:
|
||||
@ -61,57 +39,63 @@ class TestScripts(tservers.MasterTest):
|
||||
assert b'iframe' in content and b'evil_iframe' in content
|
||||
|
||||
def test_modify_form(self):
|
||||
m, sc = tscript("simple/modify_form.py")
|
||||
with taddons.context() as tctx:
|
||||
sc = tctx.script(example_dir.path("simple/modify_form.py"))
|
||||
|
||||
form_header = Headers(content_type="application/x-www-form-urlencoded")
|
||||
f = tflow.tflow(req=tutils.treq(headers=form_header))
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
form_header = Headers(content_type="application/x-www-form-urlencoded")
|
||||
f = tflow.tflow(req=tutils.treq(headers=form_header))
|
||||
sc.request(f)
|
||||
|
||||
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
|
||||
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
|
||||
|
||||
f.request.headers["content-type"] = ""
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
|
||||
f.request.headers["content-type"] = ""
|
||||
sc.request(f)
|
||||
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
|
||||
|
||||
def test_modify_querystring(self):
|
||||
m, sc = tscript("simple/modify_querystring.py")
|
||||
f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
|
||||
with taddons.context() as tctx:
|
||||
sc = tctx.script(example_dir.path("simple/modify_querystring.py"))
|
||||
f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
|
||||
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
assert f.request.query["mitmproxy"] == "rocks"
|
||||
sc.request(f)
|
||||
assert f.request.query["mitmproxy"] == "rocks"
|
||||
|
||||
f.request.path = "/"
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
assert f.request.query["mitmproxy"] == "rocks"
|
||||
f.request.path = "/"
|
||||
sc.request(f)
|
||||
assert f.request.query["mitmproxy"] == "rocks"
|
||||
|
||||
def test_redirect_requests(self):
|
||||
m, sc = tscript("simple/redirect_requests.py")
|
||||
f = tflow.tflow(req=tutils.treq(host="example.org"))
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
assert f.request.host == "mitmproxy.org"
|
||||
with taddons.context() as tctx:
|
||||
sc = tctx.script(example_dir.path("simple/redirect_requests.py"))
|
||||
f = tflow.tflow(req=tutils.treq(host="example.org"))
|
||||
sc.request(f)
|
||||
assert f.request.host == "mitmproxy.org"
|
||||
|
||||
def test_send_reply_from_proxy(self):
|
||||
m, sc = tscript("simple/send_reply_from_proxy.py")
|
||||
f = tflow.tflow(req=tutils.treq(host="example.com", port=80))
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
assert f.response.content == b"Hello World"
|
||||
with taddons.context() as tctx:
|
||||
sc = tctx.script(example_dir.path("simple/send_reply_from_proxy.py"))
|
||||
f = tflow.tflow(req=tutils.treq(host="example.com", port=80))
|
||||
sc.request(f)
|
||||
assert f.response.content == b"Hello World"
|
||||
|
||||
def test_dns_spoofing(self):
|
||||
m, sc = tscript("complex/dns_spoofing.py")
|
||||
original_host = "example.com"
|
||||
with taddons.context() as tctx:
|
||||
sc = tctx.script(example_dir.path("complex/dns_spoofing.py"))
|
||||
|
||||
host_header = Headers(host=original_host)
|
||||
f = tflow.tflow(req=tutils.treq(headers=host_header, port=80))
|
||||
original_host = "example.com"
|
||||
|
||||
m.addons.handle_lifecycle("requestheaders", f)
|
||||
host_header = Headers(host=original_host)
|
||||
f = tflow.tflow(req=tutils.treq(headers=host_header, port=80))
|
||||
|
||||
# Rewrite by reverse proxy mode
|
||||
f.request.scheme = "https"
|
||||
f.request.port = 443
|
||||
tctx.master.addons.invoke_addon(sc, "requestheaders", f)
|
||||
|
||||
m.addons.handle_lifecycle("request", f)
|
||||
# Rewrite by reverse proxy mode
|
||||
f.request.scheme = "https"
|
||||
f.request.port = 443
|
||||
|
||||
assert f.request.scheme == "http"
|
||||
assert f.request.port == 80
|
||||
tctx.master.addons.invoke_addon(sc, "request", f)
|
||||
|
||||
assert f.request.headers["Host"] == original_host
|
||||
assert f.request.scheme == "http"
|
||||
assert f.request.port == 80
|
||||
|
||||
assert f.request.headers["Host"] == original_host
|
||||
|
@ -1,4 +1,5 @@
|
||||
from mitmproxy.addons import onboarding
|
||||
from mitmproxy.test import taddons
|
||||
from .. import tservers
|
||||
|
||||
|
||||
@ -7,10 +8,14 @@ class TestApp(tservers.HTTPProxyTest):
|
||||
return [onboarding.Onboarding()]
|
||||
|
||||
def test_basic(self):
|
||||
assert self.app("/").status_code == 200
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(self.addons()[0])
|
||||
assert self.app("/").status_code == 200
|
||||
|
||||
def test_cert(self):
|
||||
for ext in ["pem", "p12"]:
|
||||
resp = self.app("/cert/%s" % ext)
|
||||
assert resp.status_code == 200
|
||||
assert resp.content
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(self.addons()[0])
|
||||
for ext in ["pem", "p12"]:
|
||||
resp = self.app("/cert/%s" % ext)
|
||||
assert resp.status_code == 200
|
||||
assert resp.content
|
||||
|
@ -66,9 +66,6 @@ def test_configure():
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
ctx.configure(up, proxyauth="any", mode="socks5")
|
||||
|
||||
ctx.configure(up, mode="regular")
|
||||
assert up.mode == "regular"
|
||||
|
||||
|
||||
def test_check():
|
||||
up = proxyauth.ProxyAuth()
|
||||
|
@ -1,7 +1,6 @@
|
||||
import traceback
|
||||
import sys
|
||||
import time
|
||||
import watchdog.events
|
||||
import pytest
|
||||
|
||||
from unittest import mock
|
||||
@ -16,34 +15,6 @@ from mitmproxy import master
|
||||
from mitmproxy.addons import script
|
||||
|
||||
|
||||
class Called:
|
||||
def __init__(self):
|
||||
self.called = False
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
self.called = True
|
||||
|
||||
|
||||
def test_reloadhandler():
|
||||
rh = script.ReloadHandler(Called())
|
||||
assert not rh.filter(watchdog.events.DirCreatedEvent("path"))
|
||||
assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/.bar"))
|
||||
assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/bar"))
|
||||
assert rh.filter(watchdog.events.FileModifiedEvent("/foo/bar.py"))
|
||||
|
||||
assert not rh.callback.called
|
||||
rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar"))
|
||||
assert not rh.callback.called
|
||||
rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar.py"))
|
||||
assert rh.callback.called
|
||||
rh.callback.called = False
|
||||
|
||||
rh.on_created(watchdog.events.FileCreatedEvent("foo"))
|
||||
assert not rh.callback.called
|
||||
rh.on_created(watchdog.events.FileCreatedEvent("foo.py"))
|
||||
assert rh.callback.called
|
||||
|
||||
|
||||
def test_load_script():
|
||||
with taddons.context() as tctx:
|
||||
ns = script.load_script(
|
||||
@ -89,6 +60,8 @@ class TestScript:
|
||||
)
|
||||
)
|
||||
tctx.master.addons.add(sc)
|
||||
tctx.configure(sc)
|
||||
sc.tick()
|
||||
|
||||
rec = tctx.master.addons.get("recorder")
|
||||
|
||||
@ -107,10 +80,12 @@ class TestScript:
|
||||
f.write("\n")
|
||||
sc = script.Script(str(f))
|
||||
tctx.configure(sc)
|
||||
for _ in range(5):
|
||||
sc.reload()
|
||||
sc.tick()
|
||||
for _ in range(3):
|
||||
sc.last_load, sc.last_mtime = 0, 0
|
||||
sc.tick()
|
||||
time.sleep(0.1)
|
||||
tctx.master.has_log("Loading")
|
||||
|
||||
def test_exception(self):
|
||||
with taddons.context() as tctx:
|
||||
@ -118,10 +93,12 @@ class TestScript:
|
||||
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
|
||||
)
|
||||
tctx.master.addons.add(sc)
|
||||
tctx.configure(sc)
|
||||
sc.tick()
|
||||
|
||||
f = tflow.tflow(resp=True)
|
||||
tctx.master.addons.trigger("request", f)
|
||||
|
||||
assert tctx.master.logs[0].level == "error"
|
||||
tctx.master.has_log("ValueError: Error!")
|
||||
tctx.master.has_log("error.py")
|
||||
|
||||
@ -133,8 +110,10 @@ class TestScript:
|
||||
)
|
||||
)
|
||||
tctx.master.addons.add(sc)
|
||||
tctx.configure(sc)
|
||||
sc.tick()
|
||||
assert sc.ns.event_log == [
|
||||
'scriptload', 'addonload'
|
||||
'scriptload', 'addonload', 'scriptconfigure', 'addonconfigure'
|
||||
]
|
||||
|
||||
|
||||
@ -207,21 +186,23 @@ class TestScriptLoader:
|
||||
"%s/c.py" % rec,
|
||||
]
|
||||
)
|
||||
|
||||
tctx.master.addons.invoke_addon(sc, "tick")
|
||||
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
|
||||
assert debug == [
|
||||
'a load',
|
||||
'a running',
|
||||
'a configure',
|
||||
'a tick',
|
||||
|
||||
'b load',
|
||||
'b running',
|
||||
'b configure',
|
||||
'b tick',
|
||||
|
||||
'c load',
|
||||
'c running',
|
||||
|
||||
'a configure',
|
||||
'b configure',
|
||||
'c configure',
|
||||
'c tick',
|
||||
]
|
||||
|
||||
tctx.master.logs = []
|
||||
@ -233,6 +214,7 @@ class TestScriptLoader:
|
||||
"%s/b.py" % rec,
|
||||
]
|
||||
)
|
||||
|
||||
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
|
||||
assert debug == [
|
||||
'c configure',
|
||||
@ -248,13 +230,16 @@ class TestScriptLoader:
|
||||
"%s/a.py" % rec,
|
||||
]
|
||||
)
|
||||
tctx.master.addons.invoke_addon(sc, "tick")
|
||||
|
||||
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
|
||||
assert debug == [
|
||||
'c done',
|
||||
'b done',
|
||||
'a configure',
|
||||
'e load',
|
||||
'e running',
|
||||
'e configure',
|
||||
'a configure',
|
||||
'e tick',
|
||||
'a tick',
|
||||
]
|
||||
|
@ -6,7 +6,6 @@ from mitmproxy.test import tflow
|
||||
|
||||
import mitmproxy.test.tutils
|
||||
from mitmproxy.addons import serverplayback
|
||||
from mitmproxy import options
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import io
|
||||
|
||||
@ -39,86 +38,88 @@ def test_tick():
|
||||
|
||||
def test_server_playback():
|
||||
sp = serverplayback.ServerPlayback()
|
||||
sp.configure(options.Options(), [])
|
||||
f = tflow.tflow(resp=True)
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(sp)
|
||||
f = tflow.tflow(resp=True)
|
||||
|
||||
assert not sp.flowmap
|
||||
assert not sp.flowmap
|
||||
|
||||
sp.load_flows([f])
|
||||
assert sp.flowmap
|
||||
assert sp.next_flow(f)
|
||||
assert not sp.flowmap
|
||||
sp.load_flows([f])
|
||||
assert sp.flowmap
|
||||
assert sp.next_flow(f)
|
||||
assert not sp.flowmap
|
||||
|
||||
sp.load_flows([f])
|
||||
assert sp.flowmap
|
||||
sp.clear()
|
||||
assert not sp.flowmap
|
||||
sp.load_flows([f])
|
||||
assert sp.flowmap
|
||||
sp.clear()
|
||||
assert not sp.flowmap
|
||||
|
||||
|
||||
def test_ignore_host():
|
||||
sp = serverplayback.ServerPlayback()
|
||||
sp.configure(options.Options(server_replay_ignore_host=True), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(sp, server_replay_ignore_host=True)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
|
||||
r.request.host = "address"
|
||||
r2.request.host = "address"
|
||||
assert sp._hash(r) == sp._hash(r2)
|
||||
r2.request.host = "wrong_address"
|
||||
assert sp._hash(r) == sp._hash(r2)
|
||||
r.request.host = "address"
|
||||
r2.request.host = "address"
|
||||
assert sp._hash(r) == sp._hash(r2)
|
||||
r2.request.host = "wrong_address"
|
||||
assert sp._hash(r) == sp._hash(r2)
|
||||
|
||||
|
||||
def test_ignore_content():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(options.Options(server_replay_ignore_content=False), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(s, server_replay_ignore_content=False)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
|
||||
r.request.content = b"foo"
|
||||
r2.request.content = b"foo"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b"bar"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
r.request.content = b"foo"
|
||||
r2.request.content = b"foo"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b"bar"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
|
||||
s.configure(options.Options(server_replay_ignore_content=True), [])
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r.request.content = b"foo"
|
||||
r2.request.content = b"foo"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b"bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b""
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = None
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
tctx.configure(s, server_replay_ignore_content=True)
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r.request.content = b"foo"
|
||||
r2.request.content = b"foo"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b"bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = b""
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.content = None
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
|
||||
|
||||
def test_ignore_content_wins_over_params():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(
|
||||
options.Options(
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_content=True,
|
||||
server_replay_ignore_payload_params=[
|
||||
"param1", "param2"
|
||||
]
|
||||
),
|
||||
[]
|
||||
)
|
||||
# NOTE: parameters are mutually exclusive in options
|
||||
)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||
r.request.content = b"paramx=y"
|
||||
# NOTE: parameters are mutually exclusive in options
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||
r.request.content = b"paramx=y"
|
||||
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||
r2.request.content = b"paramx=x"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
|
||||
r2.request.content = b"paramx=x"
|
||||
|
||||
# same parameters
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# same parameters
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
|
||||
|
||||
def test_ignore_payload_params_other_content_type():
|
||||
@ -147,136 +148,139 @@ def test_ignore_payload_params_other_content_type():
|
||||
|
||||
def test_hash():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(options.Options(), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(s)
|
||||
|
||||
r = tflow.tflow()
|
||||
r2 = tflow.tflow()
|
||||
r = tflow.tflow()
|
||||
r2 = tflow.tflow()
|
||||
|
||||
assert s._hash(r)
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r.request.headers["foo"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r.request.path = "voing"
|
||||
assert s._hash(r) != s._hash(r2)
|
||||
assert s._hash(r)
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r.request.headers["foo"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r.request.path = "voing"
|
||||
assert s._hash(r) != s._hash(r2)
|
||||
|
||||
r.request.path = "path?blank_value"
|
||||
r2.request.path = "path?"
|
||||
assert s._hash(r) != s._hash(r2)
|
||||
r.request.path = "path?blank_value"
|
||||
r2.request.path = "path?"
|
||||
assert s._hash(r) != s._hash(r2)
|
||||
|
||||
|
||||
def test_headers():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(options.Options(server_replay_use_headers=["foo"]), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(s, server_replay_use_headers=["foo"])
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["foo"] = "bar"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
r2.request.headers["foo"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.headers["oink"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["foo"] = "bar"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
r2.request.headers["foo"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.headers["oink"] = "bar"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r = tflow.tflow(resp=True)
|
||||
r2 = tflow.tflow(resp=True)
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
|
||||
|
||||
def test_load():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(options.Options(), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(s)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["key"] = "one"
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["key"] = "one"
|
||||
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["key"] = "two"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["key"] = "two"
|
||||
|
||||
s.load_flows([r, r2])
|
||||
s.load_flows([r, r2])
|
||||
|
||||
assert s.count() == 2
|
||||
assert s.count() == 2
|
||||
|
||||
n = s.next_flow(r)
|
||||
assert n.request.headers["key"] == "one"
|
||||
assert s.count() == 1
|
||||
n = s.next_flow(r)
|
||||
assert n.request.headers["key"] == "one"
|
||||
assert s.count() == 1
|
||||
|
||||
n = s.next_flow(r)
|
||||
assert n.request.headers["key"] == "two"
|
||||
assert not s.flowmap
|
||||
assert s.count() == 0
|
||||
n = s.next_flow(r)
|
||||
assert n.request.headers["key"] == "two"
|
||||
assert not s.flowmap
|
||||
assert s.count() == 0
|
||||
|
||||
assert not s.next_flow(r)
|
||||
assert not s.next_flow(r)
|
||||
|
||||
|
||||
def test_load_with_server_replay_nopop():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(options.Options(server_replay_nopop=True), [])
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(s, server_replay_nopop=True)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["key"] = "one"
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.headers["key"] = "one"
|
||||
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["key"] = "two"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.headers["key"] = "two"
|
||||
|
||||
s.load_flows([r, r2])
|
||||
s.load_flows([r, r2])
|
||||
|
||||
assert s.count() == 2
|
||||
s.next_flow(r)
|
||||
assert s.count() == 2
|
||||
assert s.count() == 2
|
||||
s.next_flow(r)
|
||||
assert s.count() == 2
|
||||
|
||||
|
||||
def test_ignore_params():
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(
|
||||
options.Options(
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_params=["param1", "param2"]
|
||||
),
|
||||
[]
|
||||
)
|
||||
)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.path = "/test?param1=1"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.path = "/test"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param1=2"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param2=1"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param3=2"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
r = tflow.tflow(resp=True)
|
||||
r.request.path = "/test?param1=1"
|
||||
r2 = tflow.tflow(resp=True)
|
||||
r2.request.path = "/test"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param1=2"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param2=1"
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
r2.request.path = "/test?param3=2"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
|
||||
|
||||
def thash(r, r2, setter):
|
||||
s = serverplayback.ServerPlayback()
|
||||
s.configure(
|
||||
options.Options(
|
||||
with taddons.context() as tctx:
|
||||
s = serverplayback.ServerPlayback()
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_payload_params=["param1", "param2"]
|
||||
),
|
||||
[]
|
||||
)
|
||||
)
|
||||
|
||||
setter(r, paramx="x", param1="1")
|
||||
setter(r, paramx="x", param1="1")
|
||||
|
||||
setter(r2, paramx="x", param1="1")
|
||||
# same parameters
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# ignored parameters !=
|
||||
setter(r2, paramx="x", param1="2")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# missing parameter
|
||||
setter(r2, paramx="x")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# ignorable parameter added
|
||||
setter(r2, paramx="x", param1="2")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# not ignorable parameter changed
|
||||
setter(r2, paramx="y", param1="1")
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
# not ignorable parameter missing
|
||||
setter(r2, param1="1")
|
||||
r2.request.content = b"param1=1"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
setter(r2, paramx="x", param1="1")
|
||||
# same parameters
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# ignored parameters !=
|
||||
setter(r2, paramx="x", param1="2")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# missing parameter
|
||||
setter(r2, paramx="x")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# ignorable parameter added
|
||||
setter(r2, paramx="x", param1="2")
|
||||
assert s._hash(r) == s._hash(r2)
|
||||
# not ignorable parameter changed
|
||||
setter(r2, paramx="y", param1="1")
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
# not ignorable parameter missing
|
||||
setter(r2, param1="1")
|
||||
r2.request.content = b"param1=1"
|
||||
assert not s._hash(r) == s._hash(r2)
|
||||
|
||||
|
||||
def test_ignore_payload_params():
|
||||
|
@ -5,6 +5,7 @@ from mitmproxy.test import taddons
|
||||
def test_configure():
|
||||
ts = termstatus.TermStatus()
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(ts, server=False)
|
||||
ts.running()
|
||||
assert not ctx.master.logs
|
||||
ctx.configure(ts, server=True)
|
||||
|
@ -9,12 +9,12 @@ class Addon:
|
||||
def load(self, opts):
|
||||
event_log.append("addonload")
|
||||
|
||||
def configure(self, options, updated):
|
||||
def configure(self, updated):
|
||||
event_log.append("addonconfigure")
|
||||
|
||||
|
||||
def configure(options, updated):
|
||||
event_log.append("addonconfigure")
|
||||
def configure(updated):
|
||||
event_log.append("scriptconfigure")
|
||||
|
||||
|
||||
def load(l):
|
||||
|
@ -2,5 +2,5 @@ from mitmproxy.script import concurrent
|
||||
|
||||
|
||||
@concurrent
|
||||
def start(opts):
|
||||
def load(v):
|
||||
pass
|
||||
|
@ -297,7 +297,7 @@ class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
def test_auth(self):
|
||||
self.master.addons.add(proxyauth.ProxyAuth())
|
||||
self.master.addons.trigger(
|
||||
"configure", self.master.options, self.master.options.keys()
|
||||
"configure", self.master.options.keys()
|
||||
)
|
||||
self.master.options.proxyauth = "test:test"
|
||||
assert self.pathod("202").status_code == 407
|
||||
|
@ -2,10 +2,7 @@ from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.test import taddons
|
||||
|
||||
from mitmproxy import addonmanager
|
||||
from mitmproxy import controller
|
||||
from mitmproxy.addons import script
|
||||
|
||||
import time
|
||||
|
||||
from .. import tservers
|
||||
@ -36,25 +33,20 @@ class TestConcurrent(tservers.MasterTest):
|
||||
|
||||
def test_concurrent_err(self):
|
||||
with taddons.context() as tctx:
|
||||
sc = script.Script(
|
||||
tctx.script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/concurrent_decorator_err.py"
|
||||
)
|
||||
)
|
||||
l = addonmanager.Loader(tctx.master)
|
||||
sc.load(l)
|
||||
assert tctx.master.has_log("decorator not supported")
|
||||
|
||||
def test_concurrent_class(self):
|
||||
with taddons.context() as tctx:
|
||||
sc = script.Script(
|
||||
sc = tctx.script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/concurrent_decorator_class.py"
|
||||
)
|
||||
)
|
||||
l = addonmanager.Loader(tctx.master)
|
||||
sc.load(l)
|
||||
|
||||
f1, f2 = tflow.tflow(), tflow.tflow()
|
||||
tctx.cycle(sc, f1)
|
||||
tctx.cycle(sc, f2)
|
||||
|
@ -30,7 +30,7 @@ class TestMaster(tservers.MasterTest):
|
||||
opts["verbosity"] = 1
|
||||
o = options.Options(**opts)
|
||||
m = console.master.ConsoleMaster(o, proxy.DummyServer())
|
||||
m.addons.trigger("configure", o, o.keys())
|
||||
m.addons.trigger("configure", o.keys())
|
||||
return m
|
||||
|
||||
def test_basic(self):
|
||||
|
@ -74,7 +74,7 @@ class TestMaster(taddons.RecordingMaster):
|
||||
self.state = TestState()
|
||||
self.addons.add(self.state)
|
||||
self.addons.add(*addons)
|
||||
self.addons.trigger("configure", self.options, self.options.keys())
|
||||
self.addons.trigger("configure", self.options.keys())
|
||||
self.addons.trigger("running")
|
||||
|
||||
def reset(self, addons):
|
||||
|
Loading…
Reference in New Issue
Block a user