py3++: test/mitmproxy/test_server

This commit is contained in:
Shadab Zafar 2016-06-23 00:34:38 +05:30 committed by Maximilian Hils
parent 98dc9d3d7e
commit 268e42e55e
6 changed files with 39 additions and 35 deletions

View File

@ -8,6 +8,7 @@ import six
from mitmproxy import stateobject from mitmproxy import stateobject
from netlib import certutils from netlib import certutils
from netlib import strutils
from netlib import tcp from netlib import tcp
@ -212,11 +213,11 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
else: else:
path = os.path.join( path = os.path.join(
clientcerts, clientcerts,
self.address.host.encode("idna")) + ".pem" self.address.host.encode("idna").decode()) + ".pem"
if os.path.exists(path): if os.path.exists(path):
clientcert = path clientcert = path
self.convert_to_ssl(cert=clientcert, sni=sni, **kwargs) self.convert_to_ssl(cert=clientcert, sni=strutils.always_bytes(sni), **kwargs)
self.sni = sni self.sni = sni
self.timestamp_ssl_setup = time.time() self.timestamp_ssl_setup = time.time()

View File

@ -181,7 +181,7 @@ class Response(_HTTPMessage):
l.append( l.append(
status_codes.RESPONSES.get( status_codes.RESPONSES.get(
status_code, status_code,
b"Unknown code" "Unknown code"
).encode() ).encode()
) )
return l return l

View File

@ -1,6 +1,6 @@
def modify(chunks): def modify(chunks):
for chunk in chunks: for chunk in chunks:
yield chunk.replace("foo", "bar") yield chunk.replace(b"foo", b"bar")
def responseheaders(context, flow): def responseheaders(context, flow):

View File

@ -91,19 +91,19 @@ class CommonMixin:
def test_invalid_http(self): def test_invalid_http(self):
t = tcp.TCPClient(("127.0.0.1", self.proxy.port)) t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
t.connect() t.connect()
t.wfile.write("invalid\r\n\r\n") t.wfile.write(b"invalid\r\n\r\n")
t.wfile.flush() t.wfile.flush()
line = t.rfile.readline() line = t.rfile.readline()
assert ("Bad Request" in line) or ("Bad Gateway" in line) assert (b"Bad Request" in line) or (b"Bad Gateway" in line)
def test_sni(self): def test_sni(self):
if not self.ssl: if not self.ssl:
return return
f = self.pathod("304", sni="testserver.com") f = self.pathod("304", sni=b"testserver.com")
assert f.status_code == 304 assert f.status_code == 304
log = self.server.last_log() log = self.server.last_log()
assert log["request"]["sni"] == "testserver.com" assert log["request"]["sni"] == b"testserver.com"
class TcpMixin: class TcpMixin:
@ -190,6 +190,8 @@ class TcpMixin:
assert i_cert == i2_cert == n_cert assert i_cert == i2_cert == n_cert
# Make sure that TCP messages are in the event log. # Make sure that TCP messages are in the event log.
# print(m for m in self.master.tlog)
# print(self.master.tlog)
assert any("305" in m for m in self.master.tlog) assert any("305" in m for m in self.master.tlog)
assert any("306" in m for m in self.master.tlog) assert any("306" in m for m in self.master.tlog)
@ -199,7 +201,7 @@ class AppMixin:
def test_app(self): def test_app(self):
ret = self.app("/") ret = self.app("/")
assert ret.status_code == 200 assert ret.status_code == 200
assert "mitmproxy" in ret.content assert b"mitmproxy" in ret.content
class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin): class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
@ -208,14 +210,14 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
p = self.pathoc() p = self.pathoc()
ret = p.request("get:'http://errapp/'") ret = p.request("get:'http://errapp/'")
assert ret.status_code == 500 assert ret.status_code == 500
assert "ValueError" in ret.content assert b"ValueError" in ret.content
def test_invalid_connect(self): def test_invalid_connect(self):
t = tcp.TCPClient(("127.0.0.1", self.proxy.port)) t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
t.connect() t.connect()
t.wfile.write("CONNECT invalid\n\n") t.wfile.write(b"CONNECT invalid\n\n")
t.wfile.flush() t.wfile.flush()
assert "Bad Request" in t.rfile.readline() assert b"Bad Request" in t.rfile.readline()
def test_upstream_ssl_error(self): def test_upstream_ssl_error(self):
p = self.pathoc() p = self.pathoc()
@ -287,7 +289,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
def test_stream_modify(self): def test_stream_modify(self):
self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py")) self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py"))
d = self.pathod('200:b"foo"') d = self.pathod('200:b"foo"')
assert d.content == "bar" assert d.content == b"bar"
self.master.unload_scripts() self.master.unload_scripts()
@ -356,7 +358,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
""" """
ssl = True ssl = True
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn="trusted-cert", cn=b"trusted-cert",
certs=[ certs=[
("trusted-cert", tutils.test_data.path("data/trusted-server.crt")) ("trusted-cert", tutils.test_data.path("data/trusted-server.crt"))
]) ])
@ -383,7 +385,7 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
""" """
ssl = True ssl = True
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn="untrusted-cert", cn=b"untrusted-cert",
certs=[ certs=[
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt")) ("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
]) ])
@ -423,7 +425,7 @@ class TestHTTPSNoCommonName(tservers.HTTPProxyTest):
ssl = True ssl = True
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
certs=[ certs=[
("*", tutils.test_data.path("data/no_common_name.pem")) (b"*", tutils.test_data.path("data/no_common_name.pem"))
] ]
) )
@ -448,7 +450,7 @@ class TestSocks5(tservers.SocksModeTest):
p = self.pathoc() p = self.pathoc()
f = p.request("get:/p/200") f = p.request("get:/p/200")
assert f.status_code == 502 assert f.status_code == 502
assert "SOCKS5 mode failure" in f.content assert b"SOCKS5 mode failure" in f.content
def test_no_connect(self): def test_no_connect(self):
""" """
@ -471,7 +473,7 @@ class TestSocks5(tservers.SocksModeTest):
p.rfile.read(2) # read server greeting p.rfile.read(2) # read server greeting
f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway. f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway.
assert f.status_code == 502 assert f.status_code == 502
assert "SOCKS5 mode failure" in f.content assert b"SOCKS5 mode failure" in f.content
class TestHttps2Http(tservers.ReverseProxyTest): class TestHttps2Http(tservers.ReverseProxyTest):
@ -497,7 +499,7 @@ class TestHttps2Http(tservers.ReverseProxyTest):
assert p.request("get:'/p/200'").status_code == 200 assert p.request("get:'/p/200'").status_code == 200
def test_sni(self): def test_sni(self):
p = self.pathoc(ssl=True, sni="example.com") p = self.pathoc(ssl=True, sni=b"example.com")
assert p.request("get:'/p/200'").status_code == 200 assert p.request("get:'/p/200'").status_code == 200
assert all("Error in handle_sni" not in msg for msg in self.proxy.tlog) assert all("Error in handle_sni" not in msg for msg in self.proxy.tlog)
@ -561,10 +563,10 @@ class TestProxy(tservers.HTTPProxyTest):
# call pathod server, wait a second to complete the request # call pathod server, wait a second to complete the request
connection.send( connection.send(
"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" % b"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
self.server.port) self.server.port)
time.sleep(1) time.sleep(1)
connection.send("\r\n") connection.send(b"\r\n")
connection.recv(50000) connection.recv(50000)
connection.close() connection.close()
@ -579,17 +581,17 @@ class TestProxy(tservers.HTTPProxyTest):
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(("localhost", self.proxy.port)) connection.connect(("localhost", self.proxy.port))
connection.send( connection.send(
"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" % b"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
self.server.port) self.server.port)
connection.send("\r\n") connection.send(b"\r\n")
# a bit hacky: make sure that we don't just read the headers only. # a bit hacky: make sure that we don't just read the headers only.
recvd = 0 recvd = 0
while recvd < 1024: while recvd < 1024:
recvd += len(connection.recv(5000)) recvd += len(connection.recv(5000))
connection.send( connection.send(
"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" % b"GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
self.server.port) self.server.port)
connection.send("\r\n") connection.send(b"\r\nb")
recvd = 0 recvd = 0
while recvd < 1024: while recvd < 1024:
recvd += len(connection.recv(5000)) recvd += len(connection.recv(5000))
@ -724,9 +726,9 @@ class TestStreamRequest(tservers.HTTPProxyTest):
fconn = connection.makefile() fconn = connection.makefile()
spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"' spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"'
connection.send( connection.send(
"GET %s/p/%s HTTP/1.1\r\n" % b"GET %s/p/%s HTTP/1.1\r\n" %
(self.server.urlbase, spec)) (self.server.urlbase.encode(), spec.encode()))
connection.send("\r\n") connection.send(b"\r\n")
resp = http1.read_response_head(fconn) resp = http1.read_response_head(fconn)
@ -907,7 +909,7 @@ class TestUpstreamProxySSL(
def test_simple(self): def test_simple(self):
p = self.pathoc() p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'") req = p.request("get:'/p/418:b\"content\"'")
assert req.content == "content" assert req.content == b"content"
assert req.status_code == 418 assert req.status_code == 418
# CONNECT from pathoc to chain[0], # CONNECT from pathoc to chain[0],
@ -965,7 +967,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
p = self.pathoc() p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'") req = p.request("get:'/p/418:b\"content\"'")
assert req.content == "content" assert req.content == b"content"
assert req.status_code == 418 assert req.status_code == 418
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
@ -1013,9 +1015,9 @@ class AddUpstreamCertsToClientChainMixin:
ssl = True ssl = True
servercert = tutils.test_data.path("data/trusted-server.crt") servercert = tutils.test_data.path("data/trusted-server.crt")
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn="trusted-cert", cn=b"trusted-cert",
certs=[ certs=[
("trusted-cert", servercert) (b"trusted-cert", servercert)
] ]
) )

View File

@ -11,6 +11,8 @@ import pathod.pathoc
from mitmproxy import flow, controller from mitmproxy import flow, controller
from mitmproxy.cmdline import APP_HOST, APP_PORT from mitmproxy.cmdline import APP_HOST, APP_PORT
from netlib import strutils
testapp = flask.Flask(__name__) testapp = flask.Flask(__name__)
@ -43,7 +45,7 @@ class TestMaster(flow.FlowMaster):
self.tlog = [] self.tlog = []
def add_event(self, message, level=None): def add_event(self, message, level=None):
self.tlog.append(message) self.tlog.append(strutils.native(message, "utf8"))
class ProxyThread(threading.Thread): class ProxyThread(threading.Thread):
@ -148,7 +150,6 @@ class HTTPProxyTest(ProxyTestBase):
Constructs a pathod GET request, with the appropriate base and proxy. Constructs a pathod GET request, with the appropriate base and proxy.
""" """
p = self.pathoc(sni=sni) p = self.pathoc(sni=sni)
spec = spec.encode("string_escape")
if self.ssl: if self.ssl:
q = "get:'/p/%s'" % spec q = "get:'/p/%s'" % spec
else: else:

View File

@ -16,7 +16,7 @@ commands =
[testenv:py35] [testenv:py35]
setenv = setenv =
TESTS = test/netlib test/pathod/ test/mitmproxy/script test/mitmproxy/test_contentview.py test/mitmproxy/test_custom_contentview.py test/mitmproxy/test_app.py test/mitmproxy/test_controller.py test/mitmproxy/test_fuzzing.py test/mitmproxy/test_script.py test/mitmproxy/test_web_app.py test/mitmproxy/test_utils.py test/mitmproxy/test_stateobject.py test/mitmproxy/test_cmdline.py test/mitmproxy/test_contrib_tnetstring.py test/mitmproxy/test_proxy.py test/mitmproxy/test_protocol_http1.py test/mitmproxy/test_platform_pf.py TESTS = test/netlib test/pathod/ test/mitmproxy/script test/mitmproxy/test_contentview.py test/mitmproxy/test_custom_contentview.py test/mitmproxy/test_app.py test/mitmproxy/test_controller.py test/mitmproxy/test_fuzzing.py test/mitmproxy/test_script.py test/mitmproxy/test_web_app.py test/mitmproxy/test_utils.py test/mitmproxy/test_stateobject.py test/mitmproxy/test_cmdline.py test/mitmproxy/test_contrib_tnetstring.py test/mitmproxy/test_proxy.py test/mitmproxy/test_protocol_http1.py test/mitmproxy/test_platform_pf.py test/mitmproxy/test_server.py
HOME = {envtmpdir} HOME = {envtmpdir}
[testenv:docs] [testenv:docs]