mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 11:19:23 +00:00
1090 lines
36 KiB
Python
1090 lines
36 KiB
Python
import os
|
|
import socket
|
|
import time
|
|
|
|
import mock
|
|
|
|
from mitmproxy.test import tutils
|
|
from mitmproxy import controller
|
|
from mitmproxy import options
|
|
from mitmproxy.addons import script
|
|
from mitmproxy.addons import proxyauth
|
|
from mitmproxy import http
|
|
from mitmproxy.proxy.config import HostMatcher, parse_server_spec
|
|
import mitmproxy.net.http
|
|
from mitmproxy.net import tcp
|
|
from mitmproxy.net import socks
|
|
from mitmproxy import certs
|
|
from mitmproxy import exceptions
|
|
from mitmproxy.net.http import http1
|
|
from mitmproxy.net.tcp import Address
|
|
from pathod import pathoc
|
|
from pathod import pathod
|
|
|
|
from . import tutils as ttutils
|
|
|
|
from . import tservers
|
|
|
|
"""
|
|
Note that the choice of response code in these tests matters more than you
|
|
might think. libcurl treats a 304 response code differently from, say, a
|
|
200 response code - it will correctly terminate a 304 response with no
|
|
content-length header, whereas it will block forever waiting for content
|
|
for a 200 response.
|
|
"""
|
|
|
|
|
|
class CommonMixin:
|
|
|
|
def test_large(self):
|
|
assert len(self.pathod("200:b@50k").content) == 1024 * 50
|
|
|
|
@staticmethod
|
|
def wait_until_not_live(flow):
|
|
"""
|
|
Race condition: We don't want to replay the flow while it is still live.
|
|
"""
|
|
s = time.time()
|
|
while flow.live:
|
|
time.sleep(0.001)
|
|
if time.time() - s > 5:
|
|
raise RuntimeError("Flow is live for too long.")
|
|
|
|
def test_replay(self):
|
|
assert self.pathod("304").status_code == 304
|
|
assert len(self.master.state.flows) == 1
|
|
l = self.master.state.flows[-1]
|
|
assert l.response.status_code == 304
|
|
l.request.path = "/p/305"
|
|
self.wait_until_not_live(l)
|
|
rt = self.master.replay_request(l, block=True)
|
|
assert l.response.status_code == 305
|
|
|
|
# Disconnect error
|
|
l.request.path = "/p/305:d0"
|
|
rt = self.master.replay_request(l, block=True)
|
|
assert rt
|
|
if isinstance(self, tservers.HTTPUpstreamProxyTest):
|
|
assert l.response.status_code == 502
|
|
else:
|
|
assert l.error
|
|
|
|
# Port error
|
|
l.request.port = 1
|
|
# In upstream mode, we get a 502 response from the upstream proxy server.
|
|
# In upstream mode with ssl, the replay will fail as we cannot establish
|
|
# SSL with the upstream proxy.
|
|
rt = self.master.replay_request(l, block=True)
|
|
assert rt
|
|
if isinstance(self, tservers.HTTPUpstreamProxyTest):
|
|
assert l.response.status_code == 502
|
|
else:
|
|
assert l.error
|
|
|
|
def test_http(self):
|
|
f = self.pathod("304")
|
|
assert f.status_code == 304
|
|
|
|
# In Upstream mode with SSL, we may already have a previous CONNECT
|
|
# request.
|
|
l = self.master.state.flows[-1]
|
|
assert l.client_conn.address
|
|
assert "host" in l.request.headers
|
|
assert l.response.status_code == 304
|
|
|
|
def test_invalid_http(self):
|
|
t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
|
|
with t.connect():
|
|
t.wfile.write(b"invalid\r\n\r\n")
|
|
t.wfile.flush()
|
|
line = t.rfile.readline()
|
|
assert (b"Bad Request" in line) or (b"Bad Gateway" in line)
|
|
|
|
def test_sni(self):
|
|
if not self.ssl:
|
|
return
|
|
|
|
if getattr(self, 'reverse', False):
|
|
# In reverse proxy mode, we expect to use the upstream host as our SNI value
|
|
expected_sni = "127.0.0.1"
|
|
else:
|
|
expected_sni = "testserver.com"
|
|
|
|
f = self.pathod("304", sni="testserver.com")
|
|
assert f.status_code == 304
|
|
log = self.server.last_log()
|
|
assert log["request"]["sni"] == expected_sni
|
|
|
|
|
|
class TcpMixin:
|
|
|
|
def _ignore_on(self):
|
|
assert not hasattr(self, "_ignore_backup")
|
|
self._ignore_backup = self.config.check_ignore
|
|
self.config.check_ignore = HostMatcher(
|
|
[".+:%s" % self.server.port] + self.config.check_ignore.patterns)
|
|
|
|
def _ignore_off(self):
|
|
assert hasattr(self, "_ignore_backup")
|
|
self.config.check_ignore = self._ignore_backup
|
|
del self._ignore_backup
|
|
|
|
def test_ignore(self):
|
|
n = self.pathod("304")
|
|
self._ignore_on()
|
|
i = self.pathod("305")
|
|
i2 = self.pathod("306")
|
|
self._ignore_off()
|
|
|
|
self.master.event_queue.join()
|
|
|
|
assert n.status_code == 304
|
|
assert i.status_code == 305
|
|
assert i2.status_code == 306
|
|
assert any(f.response.status_code == 304 for f in self.master.state.flows)
|
|
assert not any(f.response.status_code == 305 for f in self.master.state.flows)
|
|
assert not any(f.response.status_code == 306 for f in self.master.state.flows)
|
|
|
|
# Test that we get the original SSL cert
|
|
if self.ssl:
|
|
i_cert = certs.SSLCert(i.sslinfo.certchain[0])
|
|
i2_cert = certs.SSLCert(i2.sslinfo.certchain[0])
|
|
n_cert = certs.SSLCert(n.sslinfo.certchain[0])
|
|
|
|
assert i_cert == i2_cert
|
|
assert i_cert != n_cert
|
|
|
|
# Test Non-HTTP traffic
|
|
spec = "200:i0,@100:d0" # this results in just 100 random bytes
|
|
# mitmproxy responds with bad gateway
|
|
assert self.pathod(spec).status_code == 502
|
|
self._ignore_on()
|
|
with tutils.raises(exceptions.HttpException):
|
|
self.pathod(spec) # pathoc tries to parse answer as HTTP
|
|
|
|
self._ignore_off()
|
|
|
|
def _tcpproxy_on(self):
|
|
assert not hasattr(self, "_tcpproxy_backup")
|
|
self._tcpproxy_backup = self.config.check_tcp
|
|
self.config.check_tcp = HostMatcher(
|
|
[".+:%s" % self.server.port] + self.config.check_tcp.patterns)
|
|
|
|
def _tcpproxy_off(self):
|
|
assert hasattr(self, "_tcpproxy_backup")
|
|
self.config.check_tcp = self._tcpproxy_backup
|
|
del self._tcpproxy_backup
|
|
|
|
def test_tcp(self):
|
|
n = self.pathod("304")
|
|
self._tcpproxy_on()
|
|
i = self.pathod("305")
|
|
i2 = self.pathod("306")
|
|
self._tcpproxy_off()
|
|
|
|
self.master.event_queue.join()
|
|
|
|
assert n.status_code == 304
|
|
assert i.status_code == 305
|
|
assert i2.status_code == 306
|
|
assert any(f.response.status_code == 304 for f in self.master.state.flows if isinstance(f, http.HTTPFlow))
|
|
assert not any(f.response.status_code == 305 for f in self.master.state.flows if isinstance(f, http.HTTPFlow))
|
|
assert not any(f.response.status_code == 306 for f in self.master.state.flows if isinstance(f, http.HTTPFlow))
|
|
|
|
# Test that we get the original SSL cert
|
|
if self.ssl:
|
|
i_cert = certs.SSLCert(i.sslinfo.certchain[0])
|
|
i2_cert = certs.SSLCert(i2.sslinfo.certchain[0])
|
|
n_cert = certs.SSLCert(n.sslinfo.certchain[0])
|
|
|
|
assert i_cert == i2_cert == n_cert
|
|
|
|
# Make sure that TCP messages are in the event log.
|
|
# Re-enable and fix this when we start keeping TCPFlows in the state.
|
|
# assert any("305" in m for m in self.master.tlog)
|
|
# assert any("306" in m for m in self.master.tlog)
|
|
|
|
|
|
class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
|
|
def test_invalid_connect(self):
|
|
t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
|
|
with t.connect():
|
|
t.wfile.write(b"CONNECT invalid\n\n")
|
|
t.wfile.flush()
|
|
assert b"Bad Request" in t.rfile.readline()
|
|
|
|
def test_upstream_ssl_error(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
ret = p.request("get:'https://localhost:%s/'" % self.server.port)
|
|
assert ret.status_code == 400
|
|
|
|
def test_connection_close(self):
|
|
# Add a body, so we have a content-length header, which combined with
|
|
# HTTP1.1 means the connection is kept alive.
|
|
response = '%s/p/200:b@1' % self.server.urlbase
|
|
|
|
# Lets sanity check that the connection does indeed stay open by
|
|
# issuing two requests over the same connection
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request("get:'%s'" % response)
|
|
assert p.request("get:'%s'" % response)
|
|
|
|
# Now check that the connection is closed as the client specifies
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request("get:'%s':h'Connection'='close'" % response)
|
|
# There's a race here, which means we can get any of a number of errors.
|
|
# Rather than introduce yet another sleep into the test suite, we just
|
|
# relax the Exception specification.
|
|
with tutils.raises(Exception):
|
|
p.request("get:'%s'" % response)
|
|
|
|
def test_reconnect(self):
|
|
req = "get:'%s/p/200:b@1:da'" % self.server.urlbase
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request(req)
|
|
# Server has disconnected. Mitmproxy should detect this, and reconnect.
|
|
assert p.request(req)
|
|
assert p.request(req)
|
|
|
|
def test_get_connection_switching(self):
|
|
def switched(l):
|
|
for i in l:
|
|
if "serverdisconnect" in i:
|
|
return True
|
|
|
|
req = "get:'%s/p/200:b@1'"
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request(req % self.server.urlbase)
|
|
assert p.request(req % self.server2.urlbase)
|
|
assert switched(self.proxy.tlog)
|
|
|
|
def test_blank_leading_line(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
req = "get:'%s/p/201':i0,'\r\n'"
|
|
assert p.request(req % self.server.urlbase).status_code == 201
|
|
|
|
def test_invalid_headers(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
resp = p.request("get:'http://foo':h':foo'='bar'")
|
|
assert resp.status_code == 400
|
|
|
|
def test_stream_modify(self):
|
|
s = script.Script(
|
|
tutils.test_data.path("mitmproxy/data/addonscripts/stream_modify.py")
|
|
)
|
|
self.master.addons.add(s)
|
|
d = self.pathod('200:b"foo"')
|
|
assert d.content == b"bar"
|
|
self.master.addons.remove(s)
|
|
|
|
def test_first_line_rewrite(self):
|
|
"""
|
|
If mitmproxy is a regular HTTP proxy, it must rewrite an absolute-form request like
|
|
GET http://example.com/foo HTTP/1.0
|
|
to
|
|
GET /foo HTTP/1.0
|
|
when sending the request upstream. While any server should technically accept
|
|
the absolute form, this is not the case in practice.
|
|
"""
|
|
req = "get:'%s/p/200'" % self.server.urlbase
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request(req).status_code == 200
|
|
assert self.server.last_log()["request"]["first_line_format"] == "relative"
|
|
|
|
|
|
class TestHTTPAuth(tservers.HTTPProxyTest):
|
|
def test_auth(self):
|
|
self.master.addons.add(proxyauth.ProxyAuth())
|
|
self.master.options.auth_singleuser = "test:test"
|
|
assert self.pathod("202").status_code == 407
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
ret = p.request("""
|
|
get
|
|
'http://localhost:%s/p/202'
|
|
h'%s'='%s'
|
|
""" % (
|
|
self.server.port,
|
|
"Proxy-Authorization",
|
|
proxyauth.mkauth("test", "test")
|
|
))
|
|
assert ret.status_code == 202
|
|
|
|
|
|
class TestHTTPReverseAuth(tservers.ReverseProxyTest):
|
|
def test_auth(self):
|
|
self.master.addons.add(proxyauth.ProxyAuth())
|
|
self.master.options.auth_singleuser = "test:test"
|
|
assert self.pathod("202").status_code == 401
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
ret = p.request("""
|
|
get
|
|
'/p/202'
|
|
h'%s'='%s'
|
|
""" % (
|
|
"Authorization",
|
|
proxyauth.mkauth("test", "test")
|
|
))
|
|
assert ret.status_code == 202
|
|
|
|
|
|
class TestHTTPS(tservers.HTTPProxyTest, CommonMixin, TcpMixin):
|
|
ssl = True
|
|
ssloptions = pathod.SSLOptions(request_client_cert=True)
|
|
|
|
def test_clientcert_file(self):
|
|
try:
|
|
self.config.clientcerts = os.path.join(
|
|
tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem")
|
|
f = self.pathod("304")
|
|
assert f.status_code == 304
|
|
assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
|
|
finally:
|
|
self.config.clientcerts = None
|
|
|
|
def test_clientcert_dir(self):
|
|
try:
|
|
self.config.clientcerts = tutils.test_data.path("mitmproxy/data/clientcert")
|
|
f = self.pathod("304")
|
|
assert f.status_code == 304
|
|
assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
|
|
finally:
|
|
self.config.clientcerts = None
|
|
|
|
def test_error_post_connect(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
assert p.request("get:/:i0,'invalid\r\n\r\n'").status_code == 400
|
|
|
|
|
|
class TestHTTPSCertfile(tservers.HTTPProxyTest, CommonMixin):
|
|
ssl = True
|
|
certfile = True
|
|
|
|
def test_certfile(self):
|
|
assert self.pathod("304")
|
|
|
|
|
|
class TestHTTPSSecureByDefault:
|
|
def test_secure_by_default(self):
|
|
"""
|
|
Certificate verification should be turned on by default.
|
|
"""
|
|
default_opts = options.Options()
|
|
assert not default_opts.ssl_insecure
|
|
|
|
|
|
class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
|
|
|
|
"""
|
|
Test upstream server certificate verification with a trusted server cert.
|
|
"""
|
|
ssl = True
|
|
ssloptions = pathod.SSLOptions(
|
|
cn=b"example.mitmproxy.org",
|
|
certs=[
|
|
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/trusted-leaf.pem"))
|
|
]
|
|
)
|
|
|
|
def _request(self):
|
|
p = self.pathoc(sni="example.mitmproxy.org")
|
|
with p.connect():
|
|
return p.request("get:/p/242")
|
|
|
|
def test_verification_w_cadir(self):
|
|
self.config.options.update(
|
|
ssl_insecure=False,
|
|
ssl_verify_upstream_trusted_cadir=tutils.test_data.path(
|
|
"mitmproxy/data/servercert/"
|
|
),
|
|
ssl_verify_upstream_trusted_ca=None,
|
|
)
|
|
assert self._request().status_code == 242
|
|
|
|
def test_verification_w_pemfile(self):
|
|
self.config.options.update(
|
|
ssl_insecure=False,
|
|
ssl_verify_upstream_trusted_cadir=None,
|
|
ssl_verify_upstream_trusted_ca=tutils.test_data.path(
|
|
"mitmproxy/data/servercert/trusted-root.pem"
|
|
),
|
|
)
|
|
assert self._request().status_code == 242
|
|
|
|
|
|
class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
|
|
|
|
"""
|
|
Test upstream server certificate verification with an untrusted server cert.
|
|
"""
|
|
ssl = True
|
|
ssloptions = pathod.SSLOptions(
|
|
cn=b"example.mitmproxy.org",
|
|
certs=[
|
|
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/self-signed.pem"))
|
|
])
|
|
|
|
def _request(self):
|
|
p = self.pathoc(sni="example.mitmproxy.org")
|
|
with p.connect():
|
|
return p.request("get:/p/242")
|
|
|
|
@classmethod
|
|
def get_options(cls):
|
|
opts = super().get_options()
|
|
opts.ssl_verify_upstream_trusted_ca = tutils.test_data.path(
|
|
"mitmproxy/data/servercert/trusted-root.pem"
|
|
)
|
|
return opts
|
|
|
|
def test_no_verification_w_bad_cert(self):
|
|
self.config.options.ssl_insecure = True
|
|
r = self._request()
|
|
assert r.status_code == 242
|
|
|
|
def test_verification_w_bad_cert(self):
|
|
# We only test for a single invalid cert here.
|
|
# Actual testing of different root-causes (invalid hostname, expired, ...)
|
|
# is done in mitmproxy.net.
|
|
self.config.options.ssl_insecure = False
|
|
r = self._request()
|
|
assert r.status_code == 502
|
|
assert b"Certificate Verification Error" in r.raw_content
|
|
|
|
|
|
class TestHTTPSNoCommonName(tservers.HTTPProxyTest):
|
|
|
|
"""
|
|
Test what happens if we get a cert without common name back.
|
|
"""
|
|
ssl = True
|
|
ssloptions = pathod.SSLOptions(
|
|
certs=[
|
|
(b"*", tutils.test_data.path("mitmproxy/data/no_common_name.pem"))
|
|
]
|
|
)
|
|
|
|
def test_http(self):
|
|
f = self.pathod("202")
|
|
assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
|
|
|
|
|
|
class TestReverse(tservers.ReverseProxyTest, CommonMixin, TcpMixin):
|
|
reverse = True
|
|
|
|
|
|
class TestReverseSSL(tservers.ReverseProxyTest, CommonMixin, TcpMixin):
|
|
reverse = True
|
|
ssl = True
|
|
|
|
|
|
class TestSocks5(tservers.SocksModeTest):
|
|
|
|
def test_simple(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
p.socks_connect(("localhost", self.server.port))
|
|
f = p.request("get:/p/200")
|
|
assert f.status_code == 200
|
|
|
|
def test_with_authentication_only(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
f = p.request("get:/p/200")
|
|
assert f.status_code == 502
|
|
assert b"SOCKS5 mode failure" in f.content
|
|
|
|
def test_no_connect(self):
|
|
"""
|
|
mitmproxy doesn't support UDP or BIND SOCKS CMDs
|
|
"""
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
socks.ClientGreeting(
|
|
socks.VERSION.SOCKS5,
|
|
[socks.METHOD.NO_AUTHENTICATION_REQUIRED]
|
|
).to_file(p.wfile)
|
|
socks.Message(
|
|
socks.VERSION.SOCKS5,
|
|
socks.CMD.BIND,
|
|
socks.ATYP.DOMAINNAME,
|
|
("example.com", 8080)
|
|
).to_file(p.wfile)
|
|
|
|
p.wfile.flush()
|
|
p.rfile.read(2) # read server greeting
|
|
f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway.
|
|
assert f.status_code == 502
|
|
assert b"SOCKS5 mode failure" in f.content
|
|
|
|
|
|
class TestSocks5SSL(tservers.SocksModeTest):
|
|
ssl = True
|
|
|
|
def test_simple(self):
|
|
p = self.pathoc_raw()
|
|
with p.connect():
|
|
p.socks_connect(("localhost", self.server.port))
|
|
p.convert_to_ssl()
|
|
f = p.request("get:/p/200")
|
|
assert f.status_code == 200
|
|
|
|
|
|
class TestHttps2Http(tservers.ReverseProxyTest):
|
|
|
|
@classmethod
|
|
def get_options(cls):
|
|
opts = super().get_options()
|
|
s = parse_server_spec(opts.upstream_server)
|
|
opts.upstream_server = "http://%s" % s.address
|
|
return opts
|
|
|
|
def pathoc(self, ssl, sni=None):
|
|
"""
|
|
Returns a connected Pathoc instance.
|
|
"""
|
|
p = pathoc.Pathoc(
|
|
("localhost", self.proxy.port), ssl=True, sni=sni, fp=None
|
|
)
|
|
return p
|
|
|
|
def test_all(self):
|
|
p = self.pathoc(ssl=True)
|
|
with p.connect():
|
|
assert p.request("get:'/p/200'").status_code == 200
|
|
|
|
def test_sni(self):
|
|
p = self.pathoc(ssl=True, sni="example.com")
|
|
with p.connect():
|
|
assert p.request("get:'/p/200'").status_code == 200
|
|
assert all("Error in handle_sni" not in msg for msg in self.proxy.tlog)
|
|
|
|
def test_http(self):
|
|
p = self.pathoc(ssl=False)
|
|
with p.connect():
|
|
assert p.request("get:'/p/200'").status_code == 200
|
|
|
|
|
|
class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
|
ssl = False
|
|
|
|
def test_tcp_stream_modify(self):
|
|
s = script.Script(
|
|
tutils.test_data.path("mitmproxy/data/addonscripts/tcp_stream_modify.py")
|
|
)
|
|
self.master.addons.add(s)
|
|
self._tcpproxy_on()
|
|
d = self.pathod('200:b"foo"')
|
|
self._tcpproxy_off()
|
|
assert d.content == b"bar"
|
|
self.master.addons.remove(s)
|
|
|
|
|
|
class TestTransparentSSL(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
|
ssl = True
|
|
|
|
def test_sslerr(self):
|
|
p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None)
|
|
p.connect()
|
|
r = p.request("get:/")
|
|
assert r.status_code == 502
|
|
|
|
|
|
class TestProxy(tservers.HTTPProxyTest):
|
|
|
|
def test_http(self):
|
|
f = self.pathod("304")
|
|
assert f.status_code == 304
|
|
|
|
f = self.master.state.flows[0]
|
|
assert f.client_conn.address
|
|
assert "host" in f.request.headers
|
|
assert f.response.status_code == 304
|
|
|
|
@ttutils.skip_appveyor
|
|
def test_response_timestamps(self):
|
|
# test that we notice at least 1 sec delay between timestamps
|
|
# in response object
|
|
f = self.pathod("304:b@1k:p50,1")
|
|
assert f.status_code == 304
|
|
|
|
response = self.master.state.flows[0].response
|
|
# timestamp_start might fire a bit late, so we play safe and only require 300ms.
|
|
assert 0.3 <= response.timestamp_end - response.timestamp_start
|
|
|
|
@ttutils.skip_appveyor
|
|
def test_request_timestamps(self):
|
|
# test that we notice a delay between timestamps in request object
|
|
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
connection.connect(("127.0.0.1", self.proxy.port))
|
|
|
|
# call pathod server, wait a second to complete the request
|
|
connection.send(
|
|
b"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
|
|
self.server.port)
|
|
time.sleep(1)
|
|
connection.send(b"\r\n")
|
|
connection.recv(50000)
|
|
connection.close()
|
|
|
|
request, response = self.master.state.flows[
|
|
0].request, self.master.state.flows[0].response
|
|
assert response.status_code == 304 # sanity test for our low level request
|
|
# timestamp_start might fire a bit late, so we play safe and only require 300ms.
|
|
assert 0.3 <= request.timestamp_end - request.timestamp_start
|
|
|
|
def test_request_tcp_setup_timestamp_presence(self):
|
|
# tests that the client_conn a tcp connection has a tcp_setup_timestamp
|
|
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
connection.connect(("localhost", self.proxy.port))
|
|
connection.send(
|
|
b"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
|
|
self.server.port)
|
|
connection.send(b"\r\n")
|
|
# a bit hacky: make sure that we don't just read the headers only.
|
|
recvd = 0
|
|
while recvd < 1024:
|
|
recvd += len(connection.recv(5000))
|
|
connection.send(
|
|
b"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
|
|
self.server.port)
|
|
connection.send(b"\r\nb")
|
|
recvd = 0
|
|
while recvd < 1024:
|
|
recvd += len(connection.recv(5000))
|
|
connection.close()
|
|
|
|
first_flow = self.master.state.flows[0]
|
|
second_flow = self.master.state.flows[1]
|
|
assert first_flow.server_conn.timestamp_tcp_setup
|
|
assert first_flow.server_conn.timestamp_ssl_setup is None
|
|
assert second_flow.server_conn.timestamp_tcp_setup
|
|
assert first_flow.server_conn.timestamp_tcp_setup == second_flow.server_conn.timestamp_tcp_setup
|
|
|
|
def test_request_ip(self):
|
|
f = self.pathod("200:b@100")
|
|
assert f.status_code == 200
|
|
f = self.master.state.flows[0]
|
|
assert f.server_conn.address == ("127.0.0.1", self.server.port)
|
|
|
|
|
|
class TestProxySSL(tservers.HTTPProxyTest):
|
|
ssl = True
|
|
|
|
def test_request_ssl_setup_timestamp_presence(self):
|
|
# tests that the ssl timestamp is present when ssl is used
|
|
f = self.pathod("304:b@10k")
|
|
assert f.status_code == 304
|
|
first_flow = self.master.state.flows[0]
|
|
assert first_flow.server_conn.timestamp_ssl_setup
|
|
|
|
def test_via(self):
|
|
# tests that the ssl timestamp is present when ssl is used
|
|
f = self.pathod("200:b@10")
|
|
assert f.status_code == 200
|
|
first_flow = self.master.state.flows[0]
|
|
assert not first_flow.server_conn.via
|
|
|
|
|
|
class MasterRedirectRequest(tservers.TestMaster):
|
|
redirect_port = None # Set by TestRedirectRequest
|
|
|
|
@controller.handler
|
|
def request(self, f):
|
|
if f.request.path == "/p/201":
|
|
|
|
# This part should have no impact, but it should also not cause any exceptions.
|
|
addr = f.live.server_conn.address
|
|
addr2 = Address(("127.0.0.1", self.redirect_port))
|
|
f.live.set_server(addr2)
|
|
f.live.set_server(addr)
|
|
|
|
# This is the actual redirection.
|
|
f.request.port = self.redirect_port
|
|
super().request(f)
|
|
|
|
@controller.handler
|
|
def response(self, f):
|
|
f.response.content = bytes(f.client_conn.address.port)
|
|
f.response.headers["server-conn-id"] = str(f.server_conn.source_address.port)
|
|
super().response(f)
|
|
|
|
|
|
class TestRedirectRequest(tservers.HTTPProxyTest):
|
|
masterclass = MasterRedirectRequest
|
|
ssl = True
|
|
|
|
def test_redirect(self):
|
|
"""
|
|
Imagine a single HTTPS connection with three requests:
|
|
|
|
1. First request should pass through unmodified
|
|
2. Second request will be redirected to a different host by an inline script
|
|
3. Third request should pass through unmodified
|
|
|
|
This test verifies that the original destination is restored for the third request.
|
|
"""
|
|
self.master.redirect_port = self.server2.port
|
|
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
self.server.clear_log()
|
|
self.server2.clear_log()
|
|
r1 = p.request("get:'/p/200'")
|
|
assert r1.status_code == 200
|
|
assert self.server.last_log()
|
|
assert not self.server2.last_log()
|
|
|
|
self.server.clear_log()
|
|
self.server2.clear_log()
|
|
r2 = p.request("get:'/p/201'")
|
|
assert r2.status_code == 201
|
|
assert not self.server.last_log()
|
|
assert self.server2.last_log()
|
|
|
|
self.server.clear_log()
|
|
self.server2.clear_log()
|
|
r3 = p.request("get:'/p/202'")
|
|
assert r3.status_code == 202
|
|
assert self.server.last_log()
|
|
assert not self.server2.last_log()
|
|
|
|
assert r1.content == r2.content == r3.content
|
|
|
|
|
|
class MasterStreamRequest(tservers.TestMaster):
|
|
|
|
"""
|
|
Enables the stream flag on the flow for all requests
|
|
"""
|
|
@controller.handler
|
|
def responseheaders(self, f):
|
|
f.response.stream = True
|
|
|
|
|
|
class TestStreamRequest(tservers.HTTPProxyTest):
|
|
masterclass = MasterStreamRequest
|
|
|
|
def test_stream_simple(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
# a request with 100k of data but without content-length
|
|
r1 = p.request("get:'%s/p/200:r:b@100k:d102400'" % self.server.urlbase)
|
|
assert r1.status_code == 200
|
|
assert len(r1.content) > 100000
|
|
|
|
def test_stream_multiple(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
# simple request with streaming turned on
|
|
r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
|
|
assert r1.status_code == 200
|
|
|
|
# now send back 100k of data, streamed but not chunked
|
|
r1 = p.request("get:'%s/p/201:b@100k'" % self.server.urlbase)
|
|
assert r1.status_code == 201
|
|
|
|
def test_stream_chunked(self):
|
|
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
connection.connect(("127.0.0.1", self.proxy.port))
|
|
fconn = connection.makefile("rb")
|
|
spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"'
|
|
connection.send(
|
|
b"GET %s/p/%s HTTP/1.1\r\n" %
|
|
(self.server.urlbase.encode(), spec.encode()))
|
|
connection.send(b"\r\n")
|
|
|
|
resp = http1.read_response_head(fconn)
|
|
|
|
assert resp.headers["Transfer-Encoding"] == 'chunked'
|
|
assert resp.status_code == 200
|
|
|
|
chunks = list(http1.read_body(fconn, None))
|
|
assert chunks == [b"this", b"isatest__reachhex"]
|
|
|
|
connection.close()
|
|
|
|
|
|
class MasterFakeResponse(tservers.TestMaster):
|
|
@controller.handler
|
|
def request(self, f):
|
|
f.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp())
|
|
|
|
|
|
class TestFakeResponse(tservers.HTTPProxyTest):
|
|
masterclass = MasterFakeResponse
|
|
|
|
def test_fake(self):
|
|
f = self.pathod("200")
|
|
assert "header-response" in f.headers
|
|
|
|
|
|
class TestServerConnect(tservers.HTTPProxyTest):
|
|
masterclass = MasterFakeResponse
|
|
ssl = True
|
|
|
|
@classmethod
|
|
def get_options(cls):
|
|
opts = tservers.HTTPProxyTest.get_options()
|
|
opts.no_upstream_cert = True
|
|
return opts
|
|
|
|
def test_unnecessary_serverconnect(self):
|
|
"""A replayed/fake response with no_upstream_cert should not connect to an upstream server"""
|
|
assert self.pathod("200").status_code == 200
|
|
for msg in self.proxy.tmaster.tlog:
|
|
assert "serverconnect" not in msg
|
|
|
|
|
|
class MasterKillRequest(tservers.TestMaster):
|
|
|
|
@controller.handler
|
|
def request(self, f):
|
|
f.reply.kill()
|
|
|
|
|
|
class TestKillRequest(tservers.HTTPProxyTest):
|
|
masterclass = MasterKillRequest
|
|
|
|
def test_kill(self):
|
|
with tutils.raises(exceptions.HttpReadDisconnect):
|
|
self.pathod("200")
|
|
# Nothing should have hit the server
|
|
assert not self.server.last_log()
|
|
|
|
|
|
class MasterKillResponse(tservers.TestMaster):
|
|
|
|
@controller.handler
|
|
def response(self, f):
|
|
f.reply.kill()
|
|
|
|
|
|
class TestKillResponse(tservers.HTTPProxyTest):
|
|
masterclass = MasterKillResponse
|
|
|
|
def test_kill(self):
|
|
with tutils.raises(exceptions.HttpReadDisconnect):
|
|
self.pathod("200")
|
|
# The server should have seen a request
|
|
assert self.server.last_log()
|
|
|
|
|
|
class TestTransparentResolveError(tservers.TransparentProxyTest):
|
|
@mock.patch("mitmproxy.platform.original_addr")
|
|
def test_resolve_error(self, original_addr):
|
|
original_addr.side_effect = RuntimeError
|
|
assert self.pathod("304").status_code == 502
|
|
|
|
|
|
class MasterIncomplete(tservers.TestMaster):
|
|
|
|
@controller.handler
|
|
def request(self, f):
|
|
resp = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp())
|
|
resp.content = None
|
|
f.response = resp
|
|
|
|
|
|
class TestIncompleteResponse(tservers.HTTPProxyTest):
|
|
masterclass = MasterIncomplete
|
|
|
|
def test_incomplete(self):
|
|
assert self.pathod("200").status_code == 502
|
|
|
|
|
|
class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest, CommonMixin):
|
|
ssl = False
|
|
|
|
|
|
class TestUpstreamProxySSL(
|
|
tservers.HTTPUpstreamProxyTest,
|
|
CommonMixin,
|
|
TcpMixin):
|
|
ssl = True
|
|
|
|
def _host_pattern_on(self, attr):
|
|
"""
|
|
Updates config.check_tcp or check_ignore, depending on attr.
|
|
"""
|
|
assert not hasattr(self, "_ignore_%s_backup" % attr)
|
|
backup = []
|
|
for proxy in self.chain:
|
|
old_matcher = getattr(
|
|
proxy.tmaster.server.config,
|
|
"check_%s" %
|
|
attr)
|
|
backup.append(old_matcher)
|
|
setattr(
|
|
proxy.tmaster.server.config,
|
|
"check_%s" % attr,
|
|
HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns)
|
|
)
|
|
|
|
setattr(self, "_ignore_%s_backup" % attr, backup)
|
|
|
|
def _host_pattern_off(self, attr):
|
|
backup = getattr(self, "_ignore_%s_backup" % attr)
|
|
for proxy in reversed(self.chain):
|
|
setattr(
|
|
proxy.tmaster.server.config,
|
|
"check_%s" % attr,
|
|
backup.pop()
|
|
)
|
|
|
|
assert not backup
|
|
delattr(self, "_ignore_%s_backup" % attr)
|
|
|
|
def _ignore_on(self):
|
|
super()._ignore_on()
|
|
self._host_pattern_on("ignore")
|
|
|
|
def _ignore_off(self):
|
|
super()._ignore_off()
|
|
self._host_pattern_off("ignore")
|
|
|
|
def _tcpproxy_on(self):
|
|
super()._tcpproxy_on()
|
|
self._host_pattern_on("tcp")
|
|
|
|
def _tcpproxy_off(self):
|
|
super()._tcpproxy_off()
|
|
self._host_pattern_off("tcp")
|
|
|
|
def test_simple(self):
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
req = p.request("get:'/p/418:b\"content\"'")
|
|
assert req.content == b"content"
|
|
assert req.status_code == 418
|
|
|
|
# CONNECT from pathoc to chain[0],
|
|
assert self.proxy.tmaster.state.flow_count() == 1
|
|
assert self.proxy.tmaster.state.flows[0].server_conn.via
|
|
# request from pathoc to chain[0]
|
|
# CONNECT from proxy to chain[1],
|
|
assert self.chain[0].tmaster.state.flow_count() == 1
|
|
assert self.chain[0].tmaster.state.flows[0].server_conn.via
|
|
# request from proxy to chain[1]
|
|
# request from chain[0] (regular proxy doesn't store CONNECTs)
|
|
assert not self.chain[1].tmaster.state.flows[0].server_conn.via
|
|
assert self.chain[1].tmaster.state.flow_count() == 1
|
|
|
|
|
|
class RequestKiller:
|
|
def __init__(self, exclude):
|
|
self.exclude = exclude
|
|
self.k = 0
|
|
|
|
def request(self, f):
|
|
self.k += 1
|
|
if self.k not in self.exclude:
|
|
f.reply.kill()
|
|
|
|
|
|
class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
|
|
ssl = True
|
|
|
|
def test_reconnect(self):
|
|
"""
|
|
Tests proper functionality of ConnectionHandler.server_reconnect mock.
|
|
If we have a disconnect on a secure connection that's transparently
|
|
proxified to an upstream http proxy, we need to send the CONNECT
|
|
request again.
|
|
"""
|
|
self.chain[0].tmaster.addons.add(RequestKiller([1, 2]))
|
|
self.chain[1].tmaster.addons.add(RequestKiller([1]))
|
|
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
req = p.request("get:'/p/418:b\"content\"'")
|
|
assert req.content == b"content"
|
|
assert req.status_code == 418
|
|
|
|
# First request goes through all three proxies exactly once
|
|
assert self.proxy.tmaster.state.flow_count() == 1
|
|
assert self.chain[0].tmaster.state.flow_count() == 1
|
|
assert self.chain[1].tmaster.state.flow_count() == 1
|
|
|
|
req = p.request("get:'/p/418:b\"content2\"'")
|
|
assert req.status_code == 502
|
|
|
|
assert self.proxy.tmaster.state.flow_count() == 2
|
|
assert self.chain[0].tmaster.state.flow_count() == 2
|
|
# Upstream sees two requests due to reconnection attempt
|
|
assert self.chain[1].tmaster.state.flow_count() == 3
|
|
assert not self.chain[1].tmaster.state.flows[-1].response
|
|
assert not self.chain[1].tmaster.state.flows[-2].response
|
|
|
|
# Reconnection failed, so we're now disconnected
|
|
tutils.raises(
|
|
exceptions.HttpException,
|
|
p.request,
|
|
"get:'/p/418:b\"content3\"'"
|
|
)
|
|
|
|
|
|
class AddUpstreamCertsToClientChainMixin:
|
|
|
|
ssl = True
|
|
servercert = tutils.test_data.path("mitmproxy/data/servercert/trusted-root.pem")
|
|
ssloptions = pathod.SSLOptions(
|
|
cn=b"example.mitmproxy.org",
|
|
certs=[
|
|
(b"example.mitmproxy.org", servercert)
|
|
]
|
|
)
|
|
|
|
def test_add_upstream_certs_to_client_chain(self):
|
|
with open(self.servercert, "rb") as f:
|
|
d = f.read()
|
|
upstreamCert = certs.SSLCert.from_pem(d)
|
|
p = self.pathoc()
|
|
with p.connect():
|
|
upstream_cert_found_in_client_chain = False
|
|
for receivedCert in p.server_certs:
|
|
if receivedCert.digest('sha256') == upstreamCert.digest('sha256'):
|
|
upstream_cert_found_in_client_chain = True
|
|
break
|
|
assert(upstream_cert_found_in_client_chain == self.master.options.add_upstream_certs_to_client_chain)
|
|
|
|
|
|
class TestHTTPSAddUpstreamCertsToClientChainTrue(
|
|
AddUpstreamCertsToClientChainMixin,
|
|
tservers.HTTPProxyTest
|
|
):
|
|
"""
|
|
If --add-server-certs-to-client-chain is True, then the client should
|
|
receive the upstream server's certificates
|
|
"""
|
|
@classmethod
|
|
def get_options(cls):
|
|
opts = super().get_options()
|
|
opts.add_upstream_certs_to_client_chain = True
|
|
return opts
|
|
|
|
|
|
class TestHTTPSAddUpstreamCertsToClientChainFalse(
|
|
AddUpstreamCertsToClientChainMixin,
|
|
tservers.HTTPProxyTest
|
|
):
|
|
"""
|
|
If --add-server-certs-to-client-chain is False, then the client should not
|
|
receive the upstream server's certificates
|
|
"""
|
|
@classmethod
|
|
def get_options(cls):
|
|
opts = super().get_options()
|
|
opts.add_upstream_certs_to_client_chain = False
|
|
return opts
|