mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
Merge pull request #2905 from cortesi/opts
Start migrating addon options into /addons
This commit is contained in:
commit
eee109117f
1
.gitignore
vendored
1
.gitignore
vendored
@ -13,6 +13,7 @@ MANIFEST
|
||||
build/
|
||||
dist/
|
||||
mitmproxy/contrib/kaitaistruct/*.ksy
|
||||
.pytest_cache
|
||||
|
||||
# UI
|
||||
|
||||
|
@ -2,6 +2,15 @@ from mitmproxy import ctx
|
||||
|
||||
|
||||
class AntiCache:
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"anticache", bool, False,
|
||||
"""
|
||||
Strip out request headers that might cause the server to return
|
||||
304-not-modified.
|
||||
"""
|
||||
)
|
||||
|
||||
def request(self, flow):
|
||||
if ctx.options.anticache:
|
||||
flow.request.anticache()
|
||||
|
@ -2,6 +2,12 @@ from mitmproxy import ctx
|
||||
|
||||
|
||||
class AntiComp:
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"anticomp", bool, False,
|
||||
"Try to convince servers to send us un-compressed data."
|
||||
)
|
||||
|
||||
def request(self, flow):
|
||||
if ctx.options.anticomp:
|
||||
flow.request.anticomp()
|
||||
|
@ -14,6 +14,12 @@ class ClientPlayback:
|
||||
self.current_thread = None
|
||||
self.configured = False
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"client_replay", typing.Sequence[str], [],
|
||||
"Replay client requests from a saved file."
|
||||
)
|
||||
|
||||
def count(self) -> int:
|
||||
if self.current_thread:
|
||||
current = 1
|
||||
|
@ -2,6 +2,16 @@ from mitmproxy import ctx
|
||||
|
||||
|
||||
class KeepServing:
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"keepserving", bool, False,
|
||||
"""
|
||||
Continue serving after client playback, server playback or file
|
||||
read. This option is ignored by interactive tools, which always keep
|
||||
serving.
|
||||
"""
|
||||
)
|
||||
|
||||
def event_processing_complete(self):
|
||||
if not ctx.master.options.keepserving:
|
||||
ctx.master.shutdown()
|
||||
|
@ -2,6 +2,9 @@ from mitmproxy.addons import wsgiapp
|
||||
from mitmproxy.addons.onboardingapp import app
|
||||
from mitmproxy import ctx
|
||||
|
||||
APP_HOST = "mitm.it"
|
||||
APP_PORT = 80
|
||||
|
||||
|
||||
class Onboarding(wsgiapp.WSGIApp):
|
||||
name = "onboarding"
|
||||
@ -9,6 +12,23 @@ class Onboarding(wsgiapp.WSGIApp):
|
||||
def __init__(self):
|
||||
super().__init__(app.Adapter(app.application), None, None)
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"onboarding", bool, True,
|
||||
"Toggle the mitmproxy onboarding app."
|
||||
)
|
||||
loader.add_option(
|
||||
"onboarding_host", str, APP_HOST,
|
||||
"""
|
||||
Onboarding app domain. For transparent mode, use an IP when a DNS
|
||||
entry for the app domain is not present.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"onboarding_port", int, APP_PORT,
|
||||
"Port to serve the onboarding app from."
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
self.host = ctx.options.onboarding_host
|
||||
self.port = ctx.options.onboarding_port
|
||||
|
@ -11,6 +11,11 @@ class ReadFile:
|
||||
"""
|
||||
An addon that handles reading from file on startup.
|
||||
"""
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"rfile", typing.Optional[str], None,
|
||||
"Read flows from file."
|
||||
)
|
||||
|
||||
def load_flows(self, fo: typing.IO[bytes]) -> int:
|
||||
cnt = 0
|
||||
|
@ -1,5 +1,6 @@
|
||||
import os
|
||||
import re
|
||||
import typing
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
@ -47,6 +48,15 @@ class Replace:
|
||||
def __init__(self):
|
||||
self.lst = []
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"replacements", typing.Sequence[str], [],
|
||||
"""
|
||||
Replacement patterns of the form "/pattern/regex/replacement", where
|
||||
the separator can be any character.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
"""
|
||||
.replacements is a list of tuples (fpat, rex, s):
|
||||
|
@ -16,6 +16,16 @@ class Save:
|
||||
self.filt = None
|
||||
self.active_flows = set() # type: Set[flow.Flow]
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"save_stream_file", typing.Optional[str], None,
|
||||
"Stream flows to file as they arrive. Prefix path with + to append."
|
||||
)
|
||||
loader.add_option(
|
||||
"save_stream_filter", typing.Optional[str], None,
|
||||
"Filter which flows are written to file."
|
||||
)
|
||||
|
||||
def open_file(self, path):
|
||||
if path.startswith("+"):
|
||||
path = path[1:]
|
||||
|
@ -98,6 +98,14 @@ class ScriptLoader:
|
||||
self.is_running = False
|
||||
self.addons = []
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"scripts", typing.Sequence[str], [],
|
||||
"""
|
||||
Execute a script.
|
||||
"""
|
||||
)
|
||||
|
||||
def running(self):
|
||||
self.is_running = True
|
||||
|
||||
|
@ -1,8 +1,6 @@
|
||||
import hashlib
|
||||
import urllib
|
||||
import typing
|
||||
from typing import Any # noqa
|
||||
from typing import List # noqa
|
||||
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import flow
|
||||
@ -19,6 +17,60 @@ class ServerPlayback:
|
||||
self.final_flow = None
|
||||
self.configured = False
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"server_replay_kill_extra", bool, False,
|
||||
"Kill extra requests during replay."
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_nopop", bool, False,
|
||||
"""
|
||||
Don't remove flows from server replay state after use. This makes it
|
||||
possible to replay same response multiple times.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_refresh", bool, True,
|
||||
"""
|
||||
Refresh server replay responses by adjusting date, expires and
|
||||
last-modified headers, as well as adjusting cookie expiration.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_use_headers", typing.Sequence[str], [],
|
||||
"Request headers to be considered during replay."
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay", typing.Sequence[str], [],
|
||||
"Replay server responses from a saved file."
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_ignore_content", bool, False,
|
||||
"Ignore request's content while searching for a saved flow to replay."
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_ignore_params", typing.Sequence[str], [],
|
||||
"""
|
||||
Request's parameters to be ignored while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_ignore_payload_params", typing.Sequence[str], [],
|
||||
"""
|
||||
Request's payload parameters (application/x-www-form-urlencoded or
|
||||
multipart/form-data) to be ignored while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"server_replay_ignore_host", bool, False,
|
||||
"""
|
||||
Ignore request's destination host while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
|
||||
@command.command("replay.server")
|
||||
def load_flows(self, flows: typing.Sequence[flow.Flow]) -> None:
|
||||
"""
|
||||
|
@ -1,3 +1,5 @@
|
||||
import typing
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
@ -44,6 +46,15 @@ class SetHeaders:
|
||||
def __init__(self):
|
||||
self.lst = []
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"setheaders", typing.Sequence[str], [],
|
||||
"""
|
||||
Header set pattern of the form "/pattern/header/value", where the
|
||||
separator can be any character.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "setheaders" in updated:
|
||||
self.lst = []
|
||||
|
@ -1,3 +1,5 @@
|
||||
import typing
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import ctx
|
||||
@ -8,6 +10,12 @@ class StickyAuth:
|
||||
self.flt = None
|
||||
self.hosts = {}
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"stickyauth", typing.Optional[str], None,
|
||||
"Set sticky auth filter. Matched against requests."
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "stickyauth" in updated:
|
||||
if ctx.options.stickyauth:
|
||||
|
@ -34,6 +34,12 @@ class StickyCookie:
|
||||
self.jar = collections.defaultdict(dict) # type: Dict[TOrigin, Dict[str, str]]
|
||||
self.flt = None # type: Optional[flowfilter.TFilter]
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"stickycookie", Optional[str], None,
|
||||
"Set sticky cookie filter. Matched against requests."
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "stickycookie" in updated:
|
||||
if ctx.options.stickycookie:
|
||||
|
@ -1,3 +1,5 @@
|
||||
import typing
|
||||
|
||||
from mitmproxy.net.http import http1
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import ctx
|
||||
@ -8,6 +10,23 @@ class StreamBodies:
|
||||
def __init__(self):
|
||||
self.max_size = None
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"stream_large_bodies", typing.Optional[str], None,
|
||||
"""
|
||||
Stream data to the client if response body exceeds the given
|
||||
threshold. If streamed, the body will not be stored in any way.
|
||||
Understands k/m/g suffixes, i.e. 3m for 3 megabytes.
|
||||
"""
|
||||
)
|
||||
loader.add_option(
|
||||
"stream_websockets", bool, False,
|
||||
"""
|
||||
Stream WebSocket messages between client and server.
|
||||
Messages are captured and cannot be modified.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "stream_large_bodies" in updated and ctx.options.stream_large_bodies:
|
||||
try:
|
||||
|
@ -1,4 +1,5 @@
|
||||
import re
|
||||
import typing
|
||||
import base64
|
||||
|
||||
from mitmproxy import exceptions
|
||||
@ -28,6 +29,15 @@ class UpstreamAuth():
|
||||
def __init__(self):
|
||||
self.auth = None
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"upstream_auth", typing.Optional[str], None,
|
||||
"""
|
||||
Add HTTP Basic authentication to upstream proxy and reverse proxy
|
||||
requests. Format: username:password.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
# FIXME: We're doing this because our proxy core is terminally confused
|
||||
# at the moment. Ideally, we should be able to check if we're in
|
||||
|
@ -12,8 +12,6 @@ log_verbosity = [
|
||||
"debug",
|
||||
]
|
||||
|
||||
APP_HOST = "mitm.it"
|
||||
APP_PORT = 80
|
||||
CA_DIR = "~/.mitmproxy"
|
||||
LISTEN_PORT = 8080
|
||||
|
||||
@ -30,201 +28,54 @@ class Options(optmanager.OptManager):
|
||||
# This provides type hints for IDEs (e.g. PyCharm) and mypy.
|
||||
# Autogenerated using test/helper_tools/typehints_for_options.py
|
||||
add_upstream_certs_to_client_chain = None # type: bool
|
||||
allow_remote = None # type: bool
|
||||
anticache = None # type: bool
|
||||
anticomp = None # type: bool
|
||||
body_size_limit = None # type: Optional[str]
|
||||
cadir = None # type: str
|
||||
certs = None # type: Sequence[str]
|
||||
ciphers_client = None # type: Optional[str]
|
||||
ciphers_server = None # type: Optional[str]
|
||||
client_certs = None # type: Optional[str]
|
||||
client_replay = None # type: Sequence[str]
|
||||
console_focus_follow = None # type: bool
|
||||
console_layout = None # type: str
|
||||
console_layout_headers = None # type: bool
|
||||
console_mouse = None # type: bool
|
||||
console_palette = None # type: str
|
||||
console_palette_transparent = None # type: bool
|
||||
default_contentview = None # type: str
|
||||
flow_detail = None # type: int
|
||||
http2 = None # type: bool
|
||||
http2_priority = None # type: bool
|
||||
ignore_hosts = None # type: Sequence[str]
|
||||
intercept = None # type: Optional[str]
|
||||
intercept_active = None # type: bool
|
||||
keep_host_header = None # type: bool
|
||||
keepserving = None # type: bool
|
||||
listen_host = None # type: str
|
||||
listen_port = None # type: int
|
||||
mode = None # type: str
|
||||
onboarding = None # type: bool
|
||||
onboarding_host = None # type: str
|
||||
onboarding_port = None # type: int
|
||||
proxyauth = None # type: Optional[str]
|
||||
rawtcp = None # type: bool
|
||||
server_replay_refresh = None # type: bool
|
||||
replacements = None # type: Sequence[str]
|
||||
server_replay_kill_extra = None # type: bool
|
||||
rfile = None # type: Optional[str]
|
||||
save_stream_file = None # type: Optional[str]
|
||||
save_stream_filter = None # type: Optional[str]
|
||||
scripts = None # type: Sequence[str]
|
||||
server = None # type: bool
|
||||
server_replay = None # type: Sequence[str]
|
||||
server_replay_ignore_content = None # type: bool
|
||||
server_replay_ignore_host = None # type: bool
|
||||
server_replay_ignore_params = None # type: Sequence[str]
|
||||
server_replay_ignore_payload_params = None # type: Sequence[str]
|
||||
server_replay_nopop = None # type: bool
|
||||
server_replay_use_headers = None # type: Sequence[str]
|
||||
setheaders = None # type: Sequence[str]
|
||||
showhost = None # type: bool
|
||||
spoof_source_address = None # type: bool
|
||||
ssl_insecure = None # type: bool
|
||||
ssl_verify_upstream_trusted_ca = None # type: Optional[str]
|
||||
ssl_verify_upstream_trusted_cadir = None # type: Optional[str]
|
||||
ssl_version_client = None # type: str
|
||||
ssl_version_server = None # type: str
|
||||
stickyauth = None # type: Optional[str]
|
||||
stickycookie = None # type: Optional[str]
|
||||
stream_large_bodies = None # type: Optional[str]
|
||||
stream_websockets = None # type: bool
|
||||
tcp_hosts = None # type: Sequence[str]
|
||||
upstream_auth = None # type: Optional[str]
|
||||
upstream_bind_address = None # type: str
|
||||
upstream_cert = None # type: bool
|
||||
websocket = None # type: bool
|
||||
|
||||
# FIXME: Options that must be migrated to addons, but are complicated
|
||||
# because they're used by more than one addon, or because they're
|
||||
# embedded in the core code somehow.
|
||||
default_contentview = None # type: str
|
||||
flow_detail = None # type: int
|
||||
intercept = None # type: Optional[str]
|
||||
intercept_active = None # type: bool
|
||||
proxyauth = None # type: Optional[str]
|
||||
showhost = None # type: bool
|
||||
verbosity = None # type: str
|
||||
view_filter = None # type: Optional[str]
|
||||
view_order = None # type: str
|
||||
view_order_reversed = None # type: bool
|
||||
web_debug = None # type: bool
|
||||
web_iface = None # type: str
|
||||
web_open_browser = None # type: bool
|
||||
web_port = None # type: int
|
||||
websocket = None # type: bool
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__()
|
||||
self.add_option(
|
||||
"onboarding", bool, True,
|
||||
"Toggle the mitmproxy onboarding app."
|
||||
)
|
||||
self.add_option(
|
||||
"onboarding_host", str, APP_HOST,
|
||||
"""
|
||||
Onboarding app domain. For transparent mode, use an IP when a DNS
|
||||
entry for the app domain is not present.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"onboarding_port", int, APP_PORT,
|
||||
"Port to serve the onboarding app from."
|
||||
)
|
||||
self.add_option(
|
||||
"anticache", bool, False,
|
||||
"""
|
||||
Strip out request headers that might cause the server to return
|
||||
304-not-modified.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"anticomp", bool, False,
|
||||
"Try to convince servers to send us un-compressed data."
|
||||
)
|
||||
self.add_option(
|
||||
"client_replay", Sequence[str], [],
|
||||
"Replay client requests from a saved file."
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_kill_extra", bool, False,
|
||||
"Kill extra requests during replay."
|
||||
)
|
||||
self.add_option(
|
||||
"keepserving", bool, False,
|
||||
"""
|
||||
Continue serving after client playback, server playback or file
|
||||
read. This option is ignored by interactive tools, which always keep
|
||||
serving.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server", bool, True,
|
||||
"Start a proxy server. Enabled by default."
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_nopop", bool, False,
|
||||
"""
|
||||
Don't remove flows from server replay state after use. This makes it
|
||||
possible to replay same response multiple times.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_refresh", bool, True,
|
||||
"""
|
||||
Refresh server replay responses by adjusting date, expires and
|
||||
last-modified headers, as well as adjusting cookie expiration.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"rfile", Optional[str], None,
|
||||
"Read flows from file."
|
||||
)
|
||||
self.add_option(
|
||||
"scripts", Sequence[str], [],
|
||||
"""
|
||||
Execute a script.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"showhost", bool, False,
|
||||
"Use the Host header to construct URLs for display."
|
||||
)
|
||||
self.add_option(
|
||||
"replacements", Sequence[str], [],
|
||||
"""
|
||||
Replacement patterns of the form "/pattern/regex/replacement", where
|
||||
the separator can be any character.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_use_headers", Sequence[str], [],
|
||||
"Request headers to be considered during replay."
|
||||
)
|
||||
self.add_option(
|
||||
"setheaders", Sequence[str], [],
|
||||
"""
|
||||
Header set pattern of the form "/pattern/header/value", where the
|
||||
separator can be any character.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay", Sequence[str], [],
|
||||
"Replay server responses from a saved file."
|
||||
)
|
||||
self.add_option(
|
||||
"stickycookie", Optional[str], None,
|
||||
"Set sticky cookie filter. Matched against requests."
|
||||
)
|
||||
self.add_option(
|
||||
"stickyauth", Optional[str], None,
|
||||
"Set sticky auth filter. Matched against requests."
|
||||
)
|
||||
self.add_option(
|
||||
"stream_large_bodies", Optional[str], None,
|
||||
"""
|
||||
Stream data to the client if response body exceeds the given
|
||||
threshold. If streamed, the body will not be stored in any way.
|
||||
Understands k/m/g suffixes, i.e. 3m for 3 megabytes.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"stream_websockets", bool, False,
|
||||
"""
|
||||
Stream WebSocket messages between client and server.
|
||||
Messages are captured and cannot be modified.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"verbosity", str, 'info',
|
||||
"Log verbosity.",
|
||||
@ -235,40 +86,6 @@ class Options(optmanager.OptManager):
|
||||
"The default content view mode.",
|
||||
choices = [i.name.lower() for i in contentviews.views]
|
||||
)
|
||||
self.add_option(
|
||||
"save_stream_file", Optional[str], None,
|
||||
"Stream flows to file as they arrive. Prefix path with + to append."
|
||||
)
|
||||
self.add_option(
|
||||
"save_stream_filter", Optional[str], None,
|
||||
"Filter which flows are written to file."
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_ignore_content", bool, False,
|
||||
"Ignore request's content while searching for a saved flow to replay."
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_ignore_params", Sequence[str], [],
|
||||
"""
|
||||
Request's parameters to be ignored while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_ignore_payload_params", Sequence[str], [],
|
||||
"""
|
||||
Request's payload parameters (application/x-www-form-urlencoded or
|
||||
multipart/form-data) to be ignored while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"server_replay_ignore_host", bool, False,
|
||||
"""
|
||||
Ignore request's destination host while searching for a saved flow
|
||||
to replay.
|
||||
"""
|
||||
)
|
||||
|
||||
# Proxy options
|
||||
self.add_option(
|
||||
@ -395,13 +212,6 @@ class Options(optmanager.OptManager):
|
||||
--upstream-bind-address to spoof a fixed source address.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"upstream_auth", Optional[str], None,
|
||||
"""
|
||||
Add HTTP Basic authentication to upstream proxy and reverse proxy
|
||||
requests. Format: username:password.
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"ssl_version_client", str, "secure",
|
||||
"""
|
||||
|
@ -327,6 +327,13 @@ class OptManager:
|
||||
return d
|
||||
|
||||
def make_parser(self, parser, optname, metavar=None, short=None):
|
||||
"""
|
||||
Auto-Create a command-line parser entry for a named option. If the
|
||||
option does not exist, it is ignored.
|
||||
"""
|
||||
if optname not in self._options:
|
||||
return
|
||||
|
||||
o = self._options[optname]
|
||||
|
||||
def mkf(l, s):
|
||||
|
@ -59,14 +59,17 @@ class context:
|
||||
provides a number of helper methods for common testing scenarios.
|
||||
"""
|
||||
|
||||
def __init__(self, master=None, options=None):
|
||||
def __init__(self, *addons, options=None):
|
||||
options = options or mitmproxy.options.Options()
|
||||
self.master = master or RecordingMaster(
|
||||
self.master = RecordingMaster(
|
||||
options
|
||||
)
|
||||
self.options = self.master.options
|
||||
self.wrapped = None
|
||||
|
||||
for a in addons:
|
||||
self.master.addons.add(a)
|
||||
|
||||
def ctx(self):
|
||||
"""
|
||||
Returns a new handler context.
|
||||
|
@ -4,12 +4,11 @@ import click
|
||||
from mitmproxy.addons import dumper
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.tools import options
|
||||
|
||||
|
||||
def show(flow_detail, flows):
|
||||
d = dumper.Dumper()
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=flow_detail)
|
||||
for f in flows:
|
||||
ctx.cycle(d, f)
|
||||
|
@ -19,8 +19,7 @@ from mitmproxy.test import taddons
|
||||
])
|
||||
def test_allowremote(allow_remote, ip, should_be_killed):
|
||||
ar = allowremote.AllowRemote()
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.register(ar)
|
||||
with taddons.context(ar) as tctx:
|
||||
tctx.options.allow_remote = allow_remote
|
||||
|
||||
with mock.patch('mitmproxy.proxy.protocol.base.Layer') as layer:
|
||||
|
@ -7,7 +7,7 @@ from mitmproxy.test import taddons
|
||||
class TestAntiCache:
|
||||
def test_simple(self):
|
||||
sa = anticache.AntiCache()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sa) as tctx:
|
||||
f = tflow.tflow(resp=True)
|
||||
f.request.headers["if-modified-since"] = "test"
|
||||
f.request.headers["if-none-match"] = "test"
|
||||
|
@ -7,7 +7,7 @@ from mitmproxy.test import taddons
|
||||
class TestAntiComp:
|
||||
def test_simple(self):
|
||||
sa = anticomp.AntiComp()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sa) as tctx:
|
||||
f = tflow.tflow(resp=True)
|
||||
sa.request(f)
|
||||
|
||||
|
@ -24,7 +24,7 @@ class MockThread():
|
||||
class TestClientPlayback:
|
||||
def test_playback(self):
|
||||
cp = clientplayback.ClientPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(cp) as tctx:
|
||||
assert cp.count() == 0
|
||||
f = tflow.tflow(resp=True)
|
||||
cp.start_replay([f])
|
||||
@ -58,7 +58,7 @@ class TestClientPlayback:
|
||||
|
||||
def test_load_file(self, tmpdir):
|
||||
cp = clientplayback.ClientPlayback()
|
||||
with taddons.context():
|
||||
with taddons.context(cp):
|
||||
fpath = str(tmpdir.join("flows"))
|
||||
tdump(fpath, [tflow.tflow(resp=True)])
|
||||
cp.load_file(fpath)
|
||||
@ -68,7 +68,7 @@ class TestClientPlayback:
|
||||
|
||||
def test_configure(self, tmpdir):
|
||||
cp = clientplayback.ClientPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(cp) as tctx:
|
||||
path = str(tmpdir.join("flows"))
|
||||
tdump(path, [tflow.tflow()])
|
||||
tctx.configure(cp, client_replay=[path])
|
||||
|
@ -10,9 +10,9 @@ def test_set():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(sa)
|
||||
|
||||
assert not tctx.master.options.anticomp
|
||||
tctx.command(sa.set, "anticomp")
|
||||
assert tctx.master.options.anticomp
|
||||
assert tctx.master.options.server
|
||||
tctx.command(sa.set, "server=false")
|
||||
assert not tctx.master.options.server
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
tctx.command(sa.set, "nonexistent")
|
||||
@ -128,28 +128,23 @@ def test_options(tmpdir):
|
||||
p = str(tmpdir.join("path"))
|
||||
sa = core.Core()
|
||||
with taddons.context() as tctx:
|
||||
tctx.options.stickycookie = "foo"
|
||||
assert tctx.options.stickycookie == "foo"
|
||||
sa.options_reset()
|
||||
assert tctx.options.stickycookie is None
|
||||
|
||||
tctx.options.stickycookie = "foo"
|
||||
tctx.options.stickyauth = "bar"
|
||||
sa.options_reset_one("stickycookie")
|
||||
assert tctx.options.stickycookie is None
|
||||
assert tctx.options.stickyauth == "bar"
|
||||
tctx.options.listen_host = "foo"
|
||||
assert tctx.options.listen_host == "foo"
|
||||
sa.options_reset_one("listen_host")
|
||||
assert tctx.options.listen_host != "foo"
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sa.options_reset_one("unknown")
|
||||
|
||||
tctx.options.listen_host = "foo"
|
||||
sa.options_save(p)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sa.options_save("/")
|
||||
|
||||
sa.options_reset()
|
||||
assert tctx.options.stickyauth is None
|
||||
assert tctx.options.listen_host == ""
|
||||
sa.options_load(p)
|
||||
assert tctx.options.stickyauth == "bar"
|
||||
assert tctx.options.listen_host == "foo"
|
||||
|
||||
sa.options_load("/nonexistent")
|
||||
|
||||
|
@ -10,12 +10,11 @@ from mitmproxy.test import tutils
|
||||
from mitmproxy.addons import dumper
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
from mitmproxy import options
|
||||
|
||||
|
||||
def test_configure():
|
||||
d = dumper.Dumper()
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, view_filter="~b foo")
|
||||
assert d.filter
|
||||
|
||||
@ -34,7 +33,7 @@ def test_configure():
|
||||
def test_simple():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=0)
|
||||
d.response(tflow.tflow(resp=True))
|
||||
assert not sio.getvalue()
|
||||
@ -102,7 +101,7 @@ def test_echo_body():
|
||||
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=3)
|
||||
d._echo_message(f.response)
|
||||
t = sio.getvalue()
|
||||
@ -112,7 +111,7 @@ def test_echo_body():
|
||||
def test_echo_request_line():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
|
||||
f.request.is_replay = True
|
||||
@ -147,7 +146,7 @@ class TestContentView:
|
||||
view_auto.side_effect = exceptions.ContentViewException("")
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=4, verbosity='debug')
|
||||
d.response(tflow.tflow())
|
||||
assert ctx.master.has_log("content viewer failed")
|
||||
@ -156,7 +155,7 @@ class TestContentView:
|
||||
def test_tcp():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.ttcpflow()
|
||||
d.tcp_message(f)
|
||||
@ -171,7 +170,7 @@ def test_tcp():
|
||||
def test_websocket():
|
||||
sio = io.StringIO()
|
||||
d = dumper.Dumper(sio)
|
||||
with taddons.context(options=options.Options()) as ctx:
|
||||
with taddons.context() as ctx:
|
||||
ctx.configure(d, flow_detail=3, showhost=True)
|
||||
f = tflow.twebsocketflow()
|
||||
d.websocket_message(f)
|
||||
|
@ -1,7 +1,6 @@
|
||||
import pytest
|
||||
|
||||
from mitmproxy.addons import intercept
|
||||
from mitmproxy import options
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
@ -9,7 +8,7 @@ from mitmproxy.test import tflow
|
||||
|
||||
def test_simple():
|
||||
r = intercept.Intercept()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
assert not r.filt
|
||||
tctx.configure(r, intercept="~q")
|
||||
assert r.filt
|
||||
|
@ -4,7 +4,6 @@ from mitmproxy.test import taddons
|
||||
|
||||
def test_keepserving():
|
||||
ks = keepserving.KeepServing()
|
||||
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(ks) as tctx:
|
||||
ks.event_processing_complete()
|
||||
assert tctx.master.should_exit.is_set()
|
||||
|
@ -2,7 +2,6 @@ import pytest
|
||||
|
||||
from mitmproxy.addons import onboarding
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy import options
|
||||
from .. import tservers
|
||||
|
||||
|
||||
@ -11,25 +10,28 @@ class TestApp(tservers.HTTPProxyTest):
|
||||
return [onboarding.Onboarding()]
|
||||
|
||||
def test_basic(self):
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(self.addons()[0])
|
||||
ob = onboarding.Onboarding()
|
||||
with taddons.context(ob) as tctx:
|
||||
tctx.configure(ob)
|
||||
assert self.app("/").status_code == 200
|
||||
|
||||
@pytest.mark.parametrize("ext", ["pem", "p12"])
|
||||
def test_cert(self, ext):
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(self.addons()[0])
|
||||
ob = onboarding.Onboarding()
|
||||
with taddons.context(ob) as tctx:
|
||||
tctx.configure(ob)
|
||||
resp = self.app("/cert/%s" % ext)
|
||||
assert resp.status_code == 200
|
||||
assert resp.content
|
||||
|
||||
@pytest.mark.parametrize("ext", ["pem", "p12"])
|
||||
def test_head(self, ext):
|
||||
with taddons.context() as tctx:
|
||||
tctx.configure(self.addons()[0])
|
||||
ob = onboarding.Onboarding()
|
||||
with taddons.context(ob) as tctx:
|
||||
tctx.configure(ob)
|
||||
p = self.pathoc()
|
||||
with p.connect():
|
||||
resp = p.request("head:'http://%s/cert/%s'" % (options.APP_HOST, ext))
|
||||
resp = p.request("head:'http://%s/cert/%s'" % (tctx.options.onboarding_host, ext))
|
||||
assert resp.status_code == 200
|
||||
assert "Content-Length" in resp.headers
|
||||
assert not resp.content
|
||||
|
@ -41,7 +41,7 @@ class TestReadFile:
|
||||
@mock.patch('mitmproxy.master.Master.load_flow')
|
||||
def test_configure(self, mck, tmpdir, data, corrupt_data):
|
||||
rf = readfile.ReadFile()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(rf) as tctx:
|
||||
tf = tmpdir.join("tfile")
|
||||
|
||||
tf.write(data.getvalue())
|
||||
@ -58,7 +58,7 @@ class TestReadFile:
|
||||
@mock.patch('mitmproxy.master.Master.load_flow')
|
||||
def test_corrupt(self, mck, corrupt_data):
|
||||
rf = readfile.ReadFile()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(rf) as tctx:
|
||||
with pytest.raises(exceptions.FlowReadException):
|
||||
rf.load_flows(io.BytesIO(b"qibble"))
|
||||
assert not mck.called
|
||||
@ -71,7 +71,7 @@ class TestReadFile:
|
||||
|
||||
def test_nonexisting_file(self):
|
||||
rf = readfile.ReadFile()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(rf) as tctx:
|
||||
with pytest.raises(exceptions.FlowReadException):
|
||||
rf.load_flows_from_path("nonexistent")
|
||||
assert len(tctx.master.logs) == 1
|
||||
@ -82,7 +82,7 @@ class TestReadFileStdin:
|
||||
@mock.patch('sys.stdin')
|
||||
def test_stdin(self, stdin, load_flow, data, corrupt_data):
|
||||
rf = readfile.ReadFileStdin()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(rf) as tctx:
|
||||
stdin.buffer = data
|
||||
tctx.configure(rf, rfile="-")
|
||||
assert not load_flow.called
|
||||
@ -97,7 +97,7 @@ class TestReadFileStdin:
|
||||
@mock.patch('mitmproxy.master.Master.load_flow')
|
||||
def test_normal(self, load_flow, tmpdir, data):
|
||||
rf = readfile.ReadFileStdin()
|
||||
with taddons.context():
|
||||
with taddons.context(rf):
|
||||
tfile = tmpdir.join("tfile")
|
||||
tfile.write(data.getvalue())
|
||||
rf.load_flows_from_path(str(tfile))
|
||||
|
@ -18,7 +18,7 @@ class TestReplace:
|
||||
|
||||
def test_configure(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(r, replacements=["one/two/three"])
|
||||
with pytest.raises(Exception, match="Invalid filter pattern"):
|
||||
tctx.configure(r, replacements=["/~b/two/three"])
|
||||
@ -28,7 +28,7 @@ class TestReplace:
|
||||
|
||||
def test_simple(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=[
|
||||
@ -48,7 +48,7 @@ class TestReplace:
|
||||
|
||||
def test_order(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=[
|
||||
@ -67,7 +67,7 @@ class TestReplace:
|
||||
class TestReplaceFile:
|
||||
def test_simple(self, tmpdir):
|
||||
r = replace.Replace()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tmpfile = tmpdir.join("replacement")
|
||||
tmpfile.write("bar")
|
||||
tctx.configure(
|
||||
@ -81,7 +81,7 @@ class TestReplaceFile:
|
||||
|
||||
def test_nonexistent(self, tmpdir):
|
||||
r = replace.Replace()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
with pytest.raises(Exception, match="Invalid file path"):
|
||||
tctx.configure(
|
||||
r,
|
||||
|
@ -5,14 +5,13 @@ from mitmproxy.test import tflow
|
||||
|
||||
from mitmproxy import io
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import options
|
||||
from mitmproxy.addons import save
|
||||
from mitmproxy.addons import view
|
||||
|
||||
|
||||
def test_configure(tmpdir):
|
||||
sa = save.Save()
|
||||
with taddons.context(options=options.Options()) as tctx:
|
||||
with taddons.context() as tctx:
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
tctx.configure(sa, save_stream_file=str(tmpdir))
|
||||
with pytest.raises(Exception, match="Invalid filter"):
|
||||
|
@ -171,7 +171,7 @@ class TestScriptLoader:
|
||||
"mitmproxy/data/addonscripts/recorder/recorder.py"
|
||||
)
|
||||
sc = script.ScriptLoader()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
sc.script_run([tflow.tflow(resp=True)], rp)
|
||||
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
|
||||
assert debug == [
|
||||
@ -183,7 +183,7 @@ class TestScriptLoader:
|
||||
|
||||
def test_script_run_nonexistent(self):
|
||||
sc = script.ScriptLoader()
|
||||
with taddons.context():
|
||||
with taddons.context(sc):
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sc.script_run([tflow.tflow(resp=True)], "/")
|
||||
|
||||
@ -208,8 +208,7 @@ class TestScriptLoader:
|
||||
|
||||
def test_dupes(self):
|
||||
sc = script.ScriptLoader()
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(sc)
|
||||
with taddons.context(sc) as tctx:
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
tctx.configure(
|
||||
sc,
|
||||
@ -232,7 +231,7 @@ class TestScriptLoader:
|
||||
|
||||
def test_load_err(self):
|
||||
sc = script.ScriptLoader()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, scripts=[
|
||||
tutils.test_data.path("mitmproxy/data/addonscripts/load_error.py")
|
||||
])
|
||||
@ -242,7 +241,7 @@ class TestScriptLoader:
|
||||
pass # this is expected and normally guarded.
|
||||
# on the next tick we should not fail however.
|
||||
tctx.invoke(sc, "tick")
|
||||
assert len(tctx.master.addons) == 0
|
||||
assert len(tctx.master.addons) == 1
|
||||
|
||||
def test_order(self):
|
||||
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
|
||||
|
@ -19,7 +19,7 @@ def tdump(path, flows):
|
||||
|
||||
def test_load_file(tmpdir):
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context():
|
||||
with taddons.context(s):
|
||||
fpath = str(tmpdir.join("flows"))
|
||||
tdump(fpath, [tflow.tflow(resp=True)])
|
||||
s.load_file(fpath)
|
||||
@ -30,7 +30,7 @@ def test_load_file(tmpdir):
|
||||
|
||||
def test_config(tmpdir):
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
fpath = str(tmpdir.join("flows"))
|
||||
tdump(fpath, [tflow.tflow(resp=True)])
|
||||
tctx.configure(s, server_replay=[fpath])
|
||||
@ -41,7 +41,7 @@ def test_config(tmpdir):
|
||||
|
||||
def test_tick():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
s.stop = True
|
||||
s.final_flow = tflow.tflow()
|
||||
s.final_flow.live = False
|
||||
@ -51,7 +51,7 @@ def test_tick():
|
||||
|
||||
def test_server_playback():
|
||||
sp = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sp) as tctx:
|
||||
tctx.configure(sp)
|
||||
f = tflow.tflow(resp=True)
|
||||
|
||||
@ -70,7 +70,7 @@ def test_server_playback():
|
||||
|
||||
def test_ignore_host():
|
||||
sp = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sp) as tctx:
|
||||
tctx.configure(sp, server_replay_ignore_host=True)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
@ -85,7 +85,7 @@ def test_ignore_host():
|
||||
|
||||
def test_ignore_content():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(s, server_replay_ignore_content=False)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
@ -113,7 +113,7 @@ def test_ignore_content():
|
||||
|
||||
def test_ignore_content_wins_over_params():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_content=True,
|
||||
@ -137,7 +137,7 @@ def test_ignore_content_wins_over_params():
|
||||
|
||||
def test_ignore_payload_params_other_content_type():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_content=False,
|
||||
@ -161,7 +161,7 @@ def test_ignore_payload_params_other_content_type():
|
||||
|
||||
def test_hash():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(s)
|
||||
|
||||
r = tflow.tflow()
|
||||
@ -181,7 +181,7 @@ def test_hash():
|
||||
|
||||
def test_headers():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(s, server_replay_use_headers=["foo"])
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
@ -200,7 +200,7 @@ def test_headers():
|
||||
|
||||
def test_load():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(s)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
@ -227,7 +227,7 @@ def test_load():
|
||||
|
||||
def test_load_with_server_replay_nopop():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(s, server_replay_nopop=True)
|
||||
|
||||
r = tflow.tflow(resp=True)
|
||||
@ -245,7 +245,7 @@ def test_load_with_server_replay_nopop():
|
||||
|
||||
def test_ignore_params():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_ignore_params=["param1", "param2"]
|
||||
@ -266,7 +266,7 @@ def test_ignore_params():
|
||||
|
||||
def thash(r, r2, setter):
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
s = serverplayback.ServerPlayback()
|
||||
tctx.configure(
|
||||
s,
|
||||
@ -328,7 +328,7 @@ def test_ignore_payload_params():
|
||||
|
||||
def test_server_playback_full():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_refresh = True,
|
||||
@ -360,7 +360,7 @@ def test_server_playback_full():
|
||||
|
||||
def test_server_playback_kill():
|
||||
s = serverplayback.ServerPlayback()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(s) as tctx:
|
||||
tctx.configure(
|
||||
s,
|
||||
server_replay_refresh = True,
|
||||
|
@ -19,14 +19,14 @@ class TestSetHeaders:
|
||||
|
||||
def test_configure(self):
|
||||
sh = setheaders.SetHeaders()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sh) as tctx:
|
||||
with pytest.raises(Exception, match="Invalid setheader filter pattern"):
|
||||
tctx.configure(sh, setheaders = ["/~b/one/two"])
|
||||
tctx.configure(sh, setheaders = ["/foo/bar/voing"])
|
||||
|
||||
def test_setheaders(self):
|
||||
sh = setheaders.SetHeaders()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sh) as tctx:
|
||||
tctx.configure(
|
||||
sh,
|
||||
setheaders = [
|
||||
|
@ -9,7 +9,7 @@ from mitmproxy import exceptions
|
||||
|
||||
def test_configure():
|
||||
r = stickyauth.StickyAuth()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(r, stickyauth="~s")
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
tctx.configure(r, stickyauth="~~")
|
||||
@ -20,7 +20,7 @@ def test_configure():
|
||||
|
||||
def test_simple():
|
||||
r = stickyauth.StickyAuth()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(r, stickyauth=".*")
|
||||
f = tflow.tflow(resp=True)
|
||||
f.request.headers["authorization"] = "foo"
|
||||
|
@ -15,7 +15,7 @@ def test_domain_match():
|
||||
class TestStickyCookie:
|
||||
def test_config(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
with pytest.raises(Exception, match="invalid filter"):
|
||||
tctx.configure(sc, stickycookie="~b")
|
||||
|
||||
@ -26,7 +26,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_simple(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
f = tflow.tflow(resp=True)
|
||||
f.response.headers["set-cookie"] = "foo=bar"
|
||||
@ -50,7 +50,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_response(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; " \
|
||||
@ -68,7 +68,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_response_multiple(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
# Test setting of multiple cookies
|
||||
@ -82,7 +82,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_response_weird(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
# Test setting of weird cookie keys
|
||||
@ -100,7 +100,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_response_overwrite(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
# Test overwriting of a cookie value
|
||||
@ -115,7 +115,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_response_delete(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
# Test that a cookie is be deleted
|
||||
@ -127,7 +127,7 @@ class TestStickyCookie:
|
||||
|
||||
def test_request(self):
|
||||
sc = stickycookie.StickyCookie()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sc) as tctx:
|
||||
tctx.configure(sc, stickycookie=".*")
|
||||
|
||||
f = self._response(sc, "SSID=mooo", "www.google.com")
|
||||
|
@ -7,7 +7,7 @@ import pytest
|
||||
|
||||
def test_simple():
|
||||
sa = streambodies.StreamBodies()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(sa) as tctx:
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
tctx.configure(sa, stream_large_bodies = "invalid")
|
||||
tctx.configure(sa, stream_large_bodies = "10")
|
||||
|
@ -9,7 +9,7 @@ from mitmproxy.addons import upstream_auth
|
||||
|
||||
def test_configure():
|
||||
up = upstream_auth.UpstreamAuth()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(up) as tctx:
|
||||
tctx.configure(up, upstream_auth="test:test")
|
||||
assert up.auth == b"Basic" + b" " + base64.b64encode(b"test:test")
|
||||
|
||||
@ -29,7 +29,7 @@ def test_configure():
|
||||
|
||||
def test_simple():
|
||||
up = upstream_auth.UpstreamAuth()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(up) as tctx:
|
||||
tctx.configure(up, upstream_auth="foo:bar")
|
||||
|
||||
f = tflow.tflow()
|
||||
|
@ -351,7 +351,7 @@ def test_dump_defaults():
|
||||
def test_dump_dicts():
|
||||
o = options.Options()
|
||||
assert optmanager.dump_dicts(o)
|
||||
assert optmanager.dump_dicts(o, ['http2', 'anticomp'])
|
||||
assert optmanager.dump_dicts(o, ['http2', 'listen_port'])
|
||||
|
||||
|
||||
class TTypes(optmanager.OptManager):
|
||||
@ -375,9 +375,13 @@ def test_make_parser():
|
||||
opts.make_parser(parser, "int", short="c")
|
||||
opts.make_parser(parser, "seqstr", short="d")
|
||||
opts.make_parser(parser, "bool_on", short="e")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
opts.make_parser(parser, "unknown")
|
||||
|
||||
# Nonexistent options ignore
|
||||
opts.make_parser(parser, "nonexistentxxx")
|
||||
|
||||
|
||||
def test_set():
|
||||
opts = TTypes()
|
||||
|
@ -7,10 +7,6 @@ from mitmproxy.tools import console
|
||||
from ... import tservers
|
||||
|
||||
|
||||
def test_options():
|
||||
assert options.Options(server_replay_kill_extra=True)
|
||||
|
||||
|
||||
class TestMaster(tservers.MasterTest):
|
||||
def mkmaster(self, **opts):
|
||||
if "verbosity" not in opts:
|
||||
|
@ -3,7 +3,9 @@ from mitmproxy.tools.console import statusbar, master
|
||||
|
||||
|
||||
def test_statusbar(monkeypatch):
|
||||
o = options.Options(
|
||||
o = options.Options()
|
||||
m = master.ConsoleMaster(o)
|
||||
m.options.update(
|
||||
setheaders=[":~q:foo:bar"],
|
||||
replacements=[":~q:foo:bar"],
|
||||
ignore_hosts=["example.com", "example.org"],
|
||||
@ -21,10 +23,8 @@ def test_statusbar(monkeypatch):
|
||||
upstream_cert=False,
|
||||
stream_large_bodies="3m",
|
||||
mode="transparent",
|
||||
scripts=["nonexistent"],
|
||||
save_stream_file="foo",
|
||||
)
|
||||
m = master.ConsoleMaster(o)
|
||||
|
||||
m.options.update(view_order='url', console_focus_follow=True)
|
||||
monkeypatch.setattr(m.addons.get("clientplayback"), "count", lambda: 42)
|
||||
monkeypatch.setattr(m.addons.get("serverplayback"), "count", lambda: 42)
|
||||
|
@ -8,7 +8,7 @@ from mitmproxy import flowfilter
|
||||
from mitmproxy.tools.web.app import flow_to_json
|
||||
|
||||
from mitmproxy.tools.web import static_viewer
|
||||
from mitmproxy.addons import save
|
||||
from mitmproxy.addons import save, readfile
|
||||
|
||||
|
||||
def test_save_static(tmpdir):
|
||||
@ -59,8 +59,9 @@ def test_save_flows_content(ctx, tmpdir):
|
||||
|
||||
def test_static_viewer(tmpdir):
|
||||
s = static_viewer.StaticViewer()
|
||||
rf = readfile.ReadFile()
|
||||
sa = save.Save()
|
||||
with taddons.context() as tctx:
|
||||
with taddons.context(rf) as tctx:
|
||||
sa.save([tflow.tflow(resp=True)], str(tmpdir.join('foo')))
|
||||
tctx.master.addons.add(s)
|
||||
tctx.configure(s, web_static_viewer=str(tmpdir), rfile=str(tmpdir.join('foo')))
|
||||
|
@ -222,12 +222,12 @@ class HTTPProxyTest(ProxyTestBase):
|
||||
p = pathod.pathoc.Pathoc(
|
||||
("127.0.0.1", self.proxy.port), True, fp=None
|
||||
)
|
||||
with p.connect((options.APP_HOST, options.APP_PORT)):
|
||||
with p.connect((self.master.options.onboarding_host, self.master.options.onbarding_port)):
|
||||
return p.request("get:'%s'" % page)
|
||||
else:
|
||||
p = self.pathoc()
|
||||
with p.connect():
|
||||
return p.request("get:'http://%s%s'" % (options.APP_HOST, page))
|
||||
return p.request("get:'http://%s%s'" % (self.master.options.onboarding_host, page))
|
||||
|
||||
|
||||
class TransparentProxyTest(ProxyTestBase):
|
||||
|
Loading…
Reference in New Issue
Block a user