mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
[sans-io] tests++
This commit is contained in:
parent
641b1c425b
commit
f0bdf887fc
@ -176,7 +176,8 @@ class _TLSLayer(tunnel.TunnelLayer):
|
||||
err = last_err[2]
|
||||
elif last_err == ('SSL routines', 'ssl3_get_record', 'wrong version number') and data[:4].isascii():
|
||||
err = f"The remote server does not speak TLS."
|
||||
else:
|
||||
else: # pragma: no cover
|
||||
# TODO: Add test case one we find one.
|
||||
err = f"OpenSSL {e!r}"
|
||||
return False, err
|
||||
else:
|
||||
|
@ -1,3 +1,5 @@
|
||||
import pytest
|
||||
|
||||
from mitmproxy.proxy2.commands import CloseConnection, OpenConnection, SendData
|
||||
from mitmproxy.proxy2.context import ConnectionState
|
||||
from mitmproxy.proxy2.events import ConnectionClosed, DataReceived
|
||||
@ -89,10 +91,34 @@ def test_receive_data_after_half_close(tctx):
|
||||
Playbook(tcp.TCPLayer(tctx), hooks=False)
|
||||
<< OpenConnection(tctx.server)
|
||||
>> reply(None)
|
||||
>> ConnectionClosed(tctx.server)
|
||||
<< CloseConnection(tctx.client, half_close=True)
|
||||
>> DataReceived(tctx.client, b"i'm late")
|
||||
<< SendData(tctx.server, b"i'm late")
|
||||
>> DataReceived(tctx.client, b"eof-delimited-request")
|
||||
<< SendData(tctx.server, b"eof-delimited-request")
|
||||
>> ConnectionClosed(tctx.client)
|
||||
<< CloseConnection(tctx.server)
|
||||
<< CloseConnection(tctx.server, half_close=True)
|
||||
>> DataReceived(tctx.server, b"i'm late")
|
||||
<< SendData(tctx.client, b"i'm late")
|
||||
>> ConnectionClosed(tctx.server)
|
||||
<< CloseConnection(tctx.client)
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ignore", [True, False])
|
||||
def test_ignore(tctx, ignore):
|
||||
"""
|
||||
no flow hooks when we set ignore.
|
||||
"""
|
||||
|
||||
def no_flow_hooks():
|
||||
assert (
|
||||
Playbook(tcp.TCPLayer(tctx, ignore=ignore), hooks=True)
|
||||
<< OpenConnection(tctx.server)
|
||||
>> reply(None)
|
||||
>> DataReceived(tctx.client, b"hello!")
|
||||
<< SendData(tctx.server, b"hello!")
|
||||
)
|
||||
|
||||
if ignore:
|
||||
no_flow_hooks()
|
||||
else:
|
||||
with pytest.raises(AssertionError):
|
||||
no_flow_hooks()
|
||||
|
@ -50,6 +50,15 @@ client_hello_no_extensions = bytes.fromhex(
|
||||
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
|
||||
"61006200640100"
|
||||
)
|
||||
client_hello_with_extensions = bytes.fromhex(
|
||||
"16030300bb" # record layer
|
||||
"010000b7" # handshake layer
|
||||
"03033b70638d2523e1cba15f8364868295305e9c52aceabda4b5147210abc783e6e1000022c02bc02fc02cc030"
|
||||
"cca9cca8cc14cc13c009c013c00ac014009c009d002f0035000a0100006cff0100010000000010000e00000b65"
|
||||
"78616d706c652e636f6d0017000000230000000d00120010060106030501050304010403020102030005000501"
|
||||
"00000000001200000010000e000c02683208687474702f312e3175500000000b00020100000a00080006001d00"
|
||||
"170018"
|
||||
)
|
||||
|
||||
|
||||
def test_get_client_hello():
|
||||
@ -66,6 +75,13 @@ def test_get_client_hello():
|
||||
assert tls.get_client_hello(incomplete) is None
|
||||
|
||||
|
||||
def test_parse_client_hello():
|
||||
assert tls.parse_client_hello(client_hello_with_extensions).sni == b"example.com"
|
||||
assert tls.parse_client_hello(client_hello_with_extensions[:50]) is None
|
||||
with pytest.raises(ValueError):
|
||||
tls.parse_client_hello(client_hello_with_extensions[:183] + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||
|
||||
|
||||
class SSLTest:
|
||||
"""Helper container for Python's builtin SSL object."""
|
||||
|
||||
@ -193,6 +209,9 @@ def reply_tls_start(alpn: typing.Optional[bytes] = None, *args, **kwargs) -> tut
|
||||
|
||||
|
||||
class TestServerTLS:
|
||||
def test_repr(self, tctx):
|
||||
assert repr(tls.ServerTLSLayer(tctx))
|
||||
|
||||
def test_not_connected(self, tctx: context.Context):
|
||||
"""Test that we don't do anything if no server connection exists."""
|
||||
layer = tls.ServerTLSLayer(tctx)
|
||||
@ -294,6 +313,23 @@ class TestServerTLS:
|
||||
)
|
||||
assert not tctx.server.tls_established
|
||||
|
||||
def test_remote_speaks_no_tls(self, tctx):
|
||||
playbook = tutils.Playbook(tls.ServerTLSLayer(tctx))
|
||||
tctx.server.state = ConnectionState.OPEN
|
||||
tctx.server.sni = b"example.mitmproxy.org"
|
||||
|
||||
# send ClientHello, receive random garbage back
|
||||
data = tutils.Placeholder(bytes)
|
||||
assert (
|
||||
playbook
|
||||
<< tls.TlsStartHook(tutils.Placeholder())
|
||||
>> reply_tls_start()
|
||||
<< commands.SendData(tctx.server, data)
|
||||
>> events.DataReceived(tctx.server, b"HTTP/1.1 404 Not Found\r\n")
|
||||
<< commands.Log("Server TLS handshake failed. The remote server does not speak TLS.", "warn")
|
||||
<< commands.CloseConnection(tctx.server)
|
||||
)
|
||||
|
||||
|
||||
def make_client_tls_layer(
|
||||
tctx: context.Context,
|
||||
@ -323,6 +359,7 @@ class TestClientTLS:
|
||||
def test_client_only(self, tctx: context.Context):
|
||||
"""Test TLS with client only"""
|
||||
playbook, client_layer, tssl_client = make_client_tls_layer(tctx)
|
||||
client_layer.debug = " "
|
||||
assert not tctx.client.tls_established
|
||||
|
||||
# Send ClientHello, receive ServerHello
|
||||
|
@ -0,0 +1,21 @@
|
||||
import pytest
|
||||
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
|
||||
def test_expect():
|
||||
class Foo:
|
||||
@expect(str, int)
|
||||
def foo(self, x):
|
||||
return "".join(reversed(x))
|
||||
|
||||
@expect(str)
|
||||
def bar(self, x):
|
||||
yield "".join(reversed(x))
|
||||
|
||||
f = Foo()
|
||||
|
||||
assert f.foo("foo") == "oof"
|
||||
assert list(f.bar("bar")) == ["rab"]
|
||||
with pytest.raises(AssertionError, match=r"Expected str\|int, got None."):
|
||||
f.foo(None)
|
Loading…
Reference in New Issue
Block a user