mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 08:11:00 +00:00
Merge pull request #3109 from cortesi/kmap
console keybindings: define a yaml format, load CONFDIR/keys.yaml on startup
This commit is contained in:
commit
ab89079c65
@ -2,7 +2,7 @@
|
||||
|
||||
* Fix an issue that caused mitmproxy to not retry HTTP requests on timeout.
|
||||
|
||||
* Various other fixes (@kira0204, @fenilgandhi, @tran-tien-dat, @smonami,
|
||||
* Various other fixes (@kira0204, @fenilgandhi, @tran-tien-dat, @smonami,
|
||||
@luzpaz, @fristonio, @kajojify, @Oliver-Fish, @hcbarry, @jplochocki, @MikeShi42,
|
||||
@ghillu, @emilstahl)
|
||||
|
||||
@ -423,7 +423,7 @@
|
||||
24 July 2015: mitmproxy 0.13
|
||||
|
||||
* Upstream certificate validation. See the --verify-upstream-cert,
|
||||
--upstream-trusted-cadir and --upstream-trusted-ca parameters. Thanks to
|
||||
--upstream-trusted-confdir and --upstream-trusted-ca parameters. Thanks to
|
||||
Kyle Morton (github.com/kyle-m) for his work on this.
|
||||
|
||||
* Add HTTP transparent proxy mode. This uses the host headers from HTTP
|
||||
|
@ -32,7 +32,7 @@ reason. Below is a list of pointers to manual certificate installation
|
||||
documentation for some common platforms. The mitmproxy CA cert is located in
|
||||
`~/.mitmproxy` after it has been generated at the first start of mitmproxy.
|
||||
|
||||
- [IOS](http://jasdev.me/intercepting-ios-traffic)
|
||||
- [IOS](http://jasdev.me/intercepting-ios-traffic)
|
||||
On iOS 10.3 and onwards, you also need to enable full trust for the mitmproxy
|
||||
root certificate:
|
||||
1. Go to Settings > General > About > Certificate Trust Settings.
|
||||
@ -42,12 +42,12 @@ documentation for some common platforms. The mitmproxy CA cert is located in
|
||||
- [Java](https://docs.oracle.com/cd/E19906-01/820-4916/geygn/index.html)
|
||||
- [Android/Android Simulator](http://wiki.cacert.org/FAQ/ImportRootCert#Android_Phones_.26_Tablets)
|
||||
- [Windows](https://web.archive.org/web/20160612045445/http://windows.microsoft.com/en-ca/windows/import-export-certificates-private-keys#1TC=windows-7)
|
||||
- [Windows (automated)](https://technet.microsoft.com/en-us/library/cc732443.aspx)
|
||||
- [Windows (automated)](https://technet.microsoft.com/en-us/library/cc732443.aspx)
|
||||
|
||||
{{< highlight bash >}}
|
||||
certutil.exe -importpfx Root mitmproxy-ca-cert.p12
|
||||
{{< / highlight >}}
|
||||
|
||||
|
||||
- [Mac OS X](https://support.apple.com/kb/PH20129)
|
||||
- [Ubuntu/Debian]( https://askubuntu.com/questions/73287/how-do-i-install-a-root-certificate/94861#94861)
|
||||
- [Mozilla Firefox](https://wiki.mozilla.org/MozillaRootCertificate#Mozilla_Firefox)
|
||||
@ -143,7 +143,7 @@ mitmproxy --cert *.example.com=cert.pem
|
||||
By default, mitmproxy will use `~/.mitmproxy/mitmproxy-ca.pem` as the
|
||||
certificate authority to generate certificates for all domains for which
|
||||
no custom certificate is provided (see above). You can use your own
|
||||
certificate authority by passing the `--set cadir=DIRECTORY` option to
|
||||
certificate authority by passing the `--set confdir=DIRECTORY` option to
|
||||
mitmproxy. Mitmproxy will then look for `mitmproxy-ca.pem` in the
|
||||
specified directory. If no such file exists, it will be generated
|
||||
automatically.
|
||||
|
@ -14,7 +14,7 @@ from mitmproxy.net.http import status_codes
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
CA_DIR = "~/.mitmproxy"
|
||||
CONF_DIR = "~/.mitmproxy"
|
||||
LISTEN_PORT = 8080
|
||||
|
||||
|
||||
|
@ -45,7 +45,7 @@ class PEM(tornado.web.RequestHandler):
|
||||
return config.CONF_BASENAME + "-ca-cert.pem"
|
||||
|
||||
def head(self):
|
||||
p = os.path.join(self.request.master.options.cadir, self.filename)
|
||||
p = os.path.join(self.request.master.options.confdir, self.filename)
|
||||
p = os.path.expanduser(p)
|
||||
content_length = os.path.getsize(p)
|
||||
|
||||
@ -57,7 +57,7 @@ class PEM(tornado.web.RequestHandler):
|
||||
self.set_header("Content-Length", content_length)
|
||||
|
||||
def get(self):
|
||||
p = os.path.join(self.request.master.options.cadir, self.filename)
|
||||
p = os.path.join(self.request.master.options.confdir, self.filename)
|
||||
p = os.path.expanduser(p)
|
||||
self.set_header("Content-Type", "application/x-x509-ca-cert")
|
||||
self.set_header(
|
||||
@ -76,7 +76,7 @@ class P12(tornado.web.RequestHandler):
|
||||
return config.CONF_BASENAME + "-ca-cert.p12"
|
||||
|
||||
def head(self):
|
||||
p = os.path.join(self.request.master.options.cadir, self.filename)
|
||||
p = os.path.join(self.request.master.options.confdir, self.filename)
|
||||
p = os.path.expanduser(p)
|
||||
content_length = os.path.getsize(p)
|
||||
|
||||
@ -89,7 +89,7 @@ class P12(tornado.web.RequestHandler):
|
||||
self.set_header("Content-Length", content_length)
|
||||
|
||||
def get(self):
|
||||
p = os.path.join(self.request.master.options.cadir, self.filename)
|
||||
p = os.path.join(self.request.master.options.confdir, self.filename)
|
||||
p = os.path.expanduser(p)
|
||||
self.set_header("Content-Type", "application/x-pkcs12")
|
||||
self.set_header(
|
||||
|
@ -71,7 +71,7 @@ def client_arguments_from_options(options: "mitmproxy.options.Options") -> dict:
|
||||
"verify": verify,
|
||||
"method": method,
|
||||
"options": tls_options,
|
||||
"ca_path": options.ssl_verify_upstream_trusted_cadir,
|
||||
"ca_path": options.ssl_verify_upstream_trusted_confdir,
|
||||
"ca_pemfile": options.ssl_verify_upstream_trusted_ca,
|
||||
"client_certs": options.client_certs,
|
||||
"cipher_list": options.ciphers_server,
|
||||
|
@ -4,7 +4,7 @@ from mitmproxy import optmanager
|
||||
from mitmproxy.net import tls
|
||||
|
||||
|
||||
CA_DIR = "~/.mitmproxy"
|
||||
CONF_DIR = "~/.mitmproxy"
|
||||
LISTEN_PORT = 8080
|
||||
|
||||
|
||||
@ -30,8 +30,8 @@ class Options(optmanager.OptManager):
|
||||
"""
|
||||
)
|
||||
self.add_option(
|
||||
"cadir", str, CA_DIR,
|
||||
"Location of the default mitmproxy CA files."
|
||||
"confdir", str, CONF_DIR,
|
||||
"Location of the default mitmproxy configuration files."
|
||||
)
|
||||
self.add_option(
|
||||
"certs", Sequence[str], [],
|
||||
@ -143,7 +143,7 @@ class Options(optmanager.OptManager):
|
||||
"Do not verify upstream server SSL/TLS certificates."
|
||||
)
|
||||
self.add_option(
|
||||
"ssl_verify_upstream_trusted_cadir", Optional[str], None,
|
||||
"ssl_verify_upstream_trusted_confdir", Optional[str], None,
|
||||
"""
|
||||
Path to a directory of trusted CA certificates for upstream server
|
||||
verification prepared using the c_rehash tool.
|
||||
|
@ -49,7 +49,7 @@ class ProxyConfig:
|
||||
if "tcp_hosts" in updated:
|
||||
self.check_tcp = HostMatcher(options.tcp_hosts)
|
||||
|
||||
certstore_path = os.path.expanduser(options.cadir)
|
||||
certstore_path = os.path.expanduser(options.confdir)
|
||||
if not os.path.exists(os.path.dirname(certstore_path)):
|
||||
raise exceptions.OptionsError(
|
||||
"Certificate Authority parent directory does not exist: %s" %
|
||||
|
@ -4,7 +4,7 @@ import os
|
||||
from mitmproxy.addons import core
|
||||
|
||||
|
||||
CONFIG_PATH = os.path.join(core.CA_DIR, "config.yaml")
|
||||
CONFIG_PATH = os.path.join(core.CONF_DIR, "config.yaml")
|
||||
|
||||
|
||||
def common_options(parser, opts):
|
||||
|
@ -1,6 +1,18 @@
|
||||
import typing
|
||||
import os
|
||||
|
||||
import ruamel.yaml
|
||||
|
||||
from mitmproxy import command
|
||||
from mitmproxy.tools.console import commandexecutor
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import exceptions
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
class KeyBindingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
Contexts = {
|
||||
@ -139,3 +151,91 @@ class Keymap:
|
||||
if b:
|
||||
return self.executor(b.command)
|
||||
return key
|
||||
|
||||
|
||||
keyAttrs = {
|
||||
"key": lambda x: isinstance(x, str),
|
||||
"cmd": lambda x: isinstance(x, str),
|
||||
"ctx": lambda x: isinstance(x, list) and [isinstance(v, str) for v in x],
|
||||
"help": lambda x: isinstance(x, str),
|
||||
}
|
||||
requiredKeyAttrs = set(["key", "cmd"])
|
||||
|
||||
|
||||
class KeymapConfig:
|
||||
defaultFile = "keys.yaml"
|
||||
|
||||
@command.command("console.keymap.load")
|
||||
def keymap_load_path(self, path: mitmproxy.types.Path) -> None:
|
||||
try:
|
||||
self.load_path(ctx.master.keymap, path) # type: ignore
|
||||
except (OSError, KeyBindingError) as e:
|
||||
raise exceptions.CommandError(
|
||||
"Could not load key bindings - %s" % e
|
||||
) from e
|
||||
|
||||
def running(self):
|
||||
p = os.path.join(os.path.expanduser(ctx.options.confdir), self.defaultFile)
|
||||
if os.path.exists(p):
|
||||
try:
|
||||
self.load_path(ctx.master.keymap, p)
|
||||
except KeyBindingError as e:
|
||||
ctx.log.error(e)
|
||||
|
||||
def load_path(self, km, p):
|
||||
if os.path.exists(p) and os.path.isfile(p):
|
||||
with open(p, "rt", encoding="utf8") as f:
|
||||
try:
|
||||
txt = f.read()
|
||||
except UnicodeDecodeError as e:
|
||||
raise KeyBindingError(
|
||||
"Encoding error - expected UTF8: %s: %s" % (p, e)
|
||||
)
|
||||
try:
|
||||
vals = self.parse(txt)
|
||||
except KeyBindingError as e:
|
||||
raise KeyBindingError(
|
||||
"Error reading %s: %s" % (p, e)
|
||||
) from e
|
||||
for v in vals:
|
||||
try:
|
||||
km.add(
|
||||
key = v["key"],
|
||||
command = v["cmd"],
|
||||
contexts = v.get("ctx", ["global"]),
|
||||
help = v.get("help", None),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise KeyBindingError(
|
||||
"Error reading %s: %s" % (p, e)
|
||||
) from e
|
||||
|
||||
def parse(self, text):
|
||||
try:
|
||||
data = ruamel.yaml.safe_load(text)
|
||||
except ruamel.yaml.error.YAMLError as v:
|
||||
if hasattr(v, "problem_mark"):
|
||||
snip = v.problem_mark.get_snippet()
|
||||
raise KeyBindingError(
|
||||
"Key binding config error at line %s:\n%s\n%s" %
|
||||
(v.problem_mark.line + 1, snip, v.problem)
|
||||
)
|
||||
else:
|
||||
raise KeyBindingError("Could not parse key bindings.")
|
||||
if not data:
|
||||
return []
|
||||
if not isinstance(data, list):
|
||||
raise KeyBindingError("Inalid keybinding config - expected a list of keys")
|
||||
|
||||
for k in data:
|
||||
unknown = k.keys() - keyAttrs.keys()
|
||||
if unknown:
|
||||
raise KeyBindingError("Unknown key attributes: %s" % unknown)
|
||||
missing = requiredKeyAttrs - k.keys()
|
||||
if missing:
|
||||
raise KeyBindingError("Missing required key attributes: %s" % unknown)
|
||||
for attr in k.keys():
|
||||
if not keyAttrs[attr](k[attr]):
|
||||
raise KeyBindingError("Invalid type for %s" % attr)
|
||||
|
||||
return data
|
||||
|
@ -56,6 +56,7 @@ class ConsoleMaster(master.Master):
|
||||
consoleaddons.UnsupportedLog(),
|
||||
readfile.ReadFile(),
|
||||
consoleaddons.ConsoleAddon(self),
|
||||
keymap.KeymapConfig(),
|
||||
)
|
||||
|
||||
def sigint_handler(*args, **kwargs):
|
||||
|
@ -1,7 +1,7 @@
|
||||
import sys
|
||||
|
||||
DEPRECATED = """
|
||||
--cadir
|
||||
--confdir
|
||||
-Z
|
||||
--body-size-limit
|
||||
--stream
|
||||
@ -22,7 +22,7 @@ DEPRECATED = """
|
||||
--client-certs
|
||||
--no-upstream-cert
|
||||
--add-upstream-certs-to-client-chain
|
||||
--upstream-trusted-cadir
|
||||
--upstream-trusted-confdir
|
||||
--upstream-trusted-ca
|
||||
--ssl-version-client
|
||||
--ssl-version-server
|
||||
@ -72,7 +72,7 @@ REPLACEMENTS = {
|
||||
"--no-http2-priority": "http2_priority",
|
||||
"--no-websocket": "websocket",
|
||||
"--no-upstream-cert": "upstream_cert",
|
||||
"--upstream-trusted-cadir": "ssl_verify_upstream_trusted_cadir",
|
||||
"--upstream-trusted-confdir": "ssl_verify_upstream_trusted_confdir",
|
||||
"--upstream-trusted-ca": "ssl_verify_upstream_trusted_ca",
|
||||
"--no-onboarding": "onboarding",
|
||||
"--no-pop": "server_replay_nopop",
|
||||
|
@ -338,7 +338,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
|
||||
c.wfile.flush()
|
||||
assert c.rfile.readline() == testval
|
||||
|
||||
def test_mode_strict_w_cadir_should_pass(self, tdata):
|
||||
def test_mode_strict_w_confdir_should_pass(self, tdata):
|
||||
c = tcp.TCPClient(("127.0.0.1", self.port))
|
||||
with c.connect():
|
||||
c.convert_to_tls(
|
||||
|
@ -103,7 +103,7 @@ class _Http2TestBase:
|
||||
upstream_cert=True,
|
||||
ssl_insecure=True
|
||||
)
|
||||
opts.cadir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
opts.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
return opts
|
||||
|
||||
@property
|
||||
|
@ -67,7 +67,7 @@ class _WebSocketTestBase:
|
||||
ssl_insecure=True,
|
||||
websocket=True,
|
||||
)
|
||||
opts.cadir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
opts.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
return opts
|
||||
|
||||
@property
|
||||
|
@ -6,9 +6,9 @@ from mitmproxy.proxy.config import ProxyConfig
|
||||
|
||||
|
||||
class TestProxyConfig:
|
||||
def test_invalid_cadir(self):
|
||||
def test_invalid_confdir(self):
|
||||
opts = options.Options()
|
||||
opts.cadir = "foo"
|
||||
opts.confdir = "foo"
|
||||
with pytest.raises(exceptions.OptionsError, match="parent directory does not exist"):
|
||||
ProxyConfig(opts)
|
||||
|
||||
|
@ -309,10 +309,10 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
|
||||
with p.connect():
|
||||
return p.request("get:/p/242")
|
||||
|
||||
def test_verification_w_cadir(self, tdata):
|
||||
def test_verification_w_confdir(self, tdata):
|
||||
self.options.update(
|
||||
ssl_insecure=False,
|
||||
ssl_verify_upstream_trusted_cadir=tdata.path(
|
||||
ssl_verify_upstream_trusted_confdir=tdata.path(
|
||||
"mitmproxy/data/servercert/"
|
||||
),
|
||||
ssl_verify_upstream_trusted_ca=None,
|
||||
@ -322,7 +322,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
|
||||
def test_verification_w_pemfile(self, tdata):
|
||||
self.options.update(
|
||||
ssl_insecure=False,
|
||||
ssl_verify_upstream_trusted_cadir=None,
|
||||
ssl_verify_upstream_trusted_confdir=None,
|
||||
ssl_verify_upstream_trusted_ca=tdata.path(
|
||||
"mitmproxy/data/servercert/trusted-root.pem"
|
||||
),
|
||||
|
@ -70,3 +70,98 @@ def test_remove():
|
||||
|
||||
km.remove("key", ["commands"])
|
||||
assert len(km.bindings) == 0
|
||||
|
||||
|
||||
def test_load_path(tmpdir):
|
||||
dst = str(tmpdir.join("conf"))
|
||||
|
||||
kmc = keymap.KeymapConfig()
|
||||
with taddons.context(kmc) as tctx:
|
||||
km = keymap.Keymap(tctx.master)
|
||||
tctx.master.keymap = km
|
||||
|
||||
with open(dst, 'wb') as f:
|
||||
f.write(b"\xff\xff\xff")
|
||||
with pytest.raises(keymap.KeyBindingError, match="expected UTF8"):
|
||||
kmc.load_path(km, dst)
|
||||
|
||||
with open(dst, 'w') as f:
|
||||
f.write("'''")
|
||||
with pytest.raises(keymap.KeyBindingError):
|
||||
kmc.load_path(km, dst)
|
||||
|
||||
with open(dst, 'w') as f:
|
||||
f.write(
|
||||
"""
|
||||
- key: key1
|
||||
ctx: [unknown]
|
||||
cmd: >
|
||||
foo bar
|
||||
foo bar
|
||||
"""
|
||||
)
|
||||
with pytest.raises(keymap.KeyBindingError):
|
||||
kmc.load_path(km, dst)
|
||||
|
||||
with open(dst, 'w') as f:
|
||||
f.write(
|
||||
"""
|
||||
- key: key1
|
||||
ctx: [chooser]
|
||||
help: one
|
||||
cmd: >
|
||||
foo bar
|
||||
foo bar
|
||||
"""
|
||||
)
|
||||
kmc.load_path(km, dst)
|
||||
assert(km.get("chooser", "key1"))
|
||||
|
||||
|
||||
def test_parse():
|
||||
kmc = keymap.KeymapConfig()
|
||||
with taddons.context(kmc):
|
||||
assert kmc.parse("") == []
|
||||
assert kmc.parse("\n\n\n \n") == []
|
||||
with pytest.raises(keymap.KeyBindingError, match="expected a list of keys"):
|
||||
kmc.parse("key: val")
|
||||
with pytest.raises(keymap.KeyBindingError, match="expected a list of keys"):
|
||||
kmc.parse("val")
|
||||
with pytest.raises(keymap.KeyBindingError, match="Unknown key attributes"):
|
||||
kmc.parse(
|
||||
"""
|
||||
- key: key1
|
||||
nonexistent: bar
|
||||
"""
|
||||
)
|
||||
with pytest.raises(keymap.KeyBindingError, match="Missing required key attributes"):
|
||||
kmc.parse(
|
||||
"""
|
||||
- help: key1
|
||||
"""
|
||||
)
|
||||
with pytest.raises(keymap.KeyBindingError, match="Invalid type for cmd"):
|
||||
kmc.parse(
|
||||
"""
|
||||
- key: key1
|
||||
cmd: [ cmd ]
|
||||
"""
|
||||
)
|
||||
with pytest.raises(keymap.KeyBindingError, match="Invalid type for ctx"):
|
||||
kmc.parse(
|
||||
"""
|
||||
- key: key1
|
||||
ctx: foo
|
||||
cmd: cmd
|
||||
"""
|
||||
)
|
||||
assert kmc.parse(
|
||||
"""
|
||||
- key: key1
|
||||
ctx: [one, two]
|
||||
help: one
|
||||
cmd: >
|
||||
foo bar
|
||||
foo bar
|
||||
"""
|
||||
) == [{"key": "key1", "ctx": ["one", "two"], "help": "one", "cmd": "foo bar foo bar\n"}]
|
@ -151,7 +151,7 @@ class ProxyTestBase:
|
||||
def teardown_class(cls):
|
||||
# perf: we want to run tests in parallel
|
||||
# should this ever cause an error, travis should catch it.
|
||||
# shutil.rmtree(cls.cadir)
|
||||
# shutil.rmtree(cls.confdir)
|
||||
cls.proxy.shutdown()
|
||||
cls.server.shutdown()
|
||||
cls.server2.shutdown()
|
||||
@ -175,10 +175,10 @@ class ProxyTestBase:
|
||||
|
||||
@classmethod
|
||||
def get_options(cls):
|
||||
cls.cadir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
cls.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
|
||||
return options.Options(
|
||||
listen_port=0,
|
||||
cadir=cls.cadir,
|
||||
confdir=cls.confdir,
|
||||
add_upstream_certs_to_client_chain=cls.add_upstream_certs_to_client_chain,
|
||||
ssl_insecure=True,
|
||||
)
|
||||
|
@ -10,9 +10,9 @@ from mitmproxy.utils import arg_check
|
||||
@pytest.mark.parametrize('arg, output', [
|
||||
(["-T"], "-T is deprecated, please use --mode transparent instead"),
|
||||
(["-U"], "-U is deprecated, please use --mode upstream:SPEC instead"),
|
||||
(["--cadir"], "--cadir is deprecated.\n"
|
||||
"Please use `--set cadir=value` instead.\n"
|
||||
"To show all options and their default values use --options"),
|
||||
(["--confdir"], "--confdir is deprecated.\n"
|
||||
"Please use `--set confdir=value` instead.\n"
|
||||
"To show all options and their default values use --options"),
|
||||
(["--palette"], "--palette is deprecated.\n"
|
||||
"Please use `--set console_palette=value` instead.\n"
|
||||
"To show all options and their default values use --options"),
|
||||
|
Loading…
Reference in New Issue
Block a user