[sans-io] minor test improvements

This commit is contained in:
Maximilian Hils 2018-05-22 13:20:35 +02:00
parent 8938aec2c0
commit a860fe4a4b
2 changed files with 218 additions and 208 deletions

View File

@ -1,5 +1,6 @@
import os
import ssl
import typing
import pytest
@ -81,139 +82,10 @@ class SSLTest:
)
def test_server_no_tls(tctx: context.Context):
"""Test TLS layer without TLS"""
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
# Handshake
assert (
playbook
>> events.DataReceived(tctx.client, b"Hello World")
<< commands.Hook("next_layer", tutils.Placeholder())
>> tutils.next_layer(tutils.EchoLayer)
<< commands.SendData(tctx.client, b"hello world")
)
def test_client_tls_only(tctx: context.Context):
"""Test TLS with client only"""
layer = tls.ClientTLSLayer(tctx)
playbook = tutils.playbook(layer)
tssl = SSLTest()
# Handshake
assert playbook
assert layer._handle_event == layer.state_wait_for_clienthello
def interact():
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl.out.read())
<< commands.SendData(tctx.client, data)
)
tssl.inc.write(data())
try:
tssl.obj.do_handshake()
except ssl.SSLWantReadError:
return False
else:
return True
# receive ClientHello, send ServerHello
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.do_handshake()
assert not interact()
# Finish Handshake
assert interact()
tssl.obj.do_handshake()
assert layer._handle_event == layer.state_process
# Echo
echo(playbook, tssl, tctx.client)
assert (
playbook
>> events.DataReceived(tctx.server, b"Hello")
<< commands.SendData(tctx.server, b"hello")
)
def echo(playbook: tutils.playbook, tssl: SSLTest, conn: context.Connection) -> None:
tssl.obj.write(b"Hello World")
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(conn, tssl.out.read())
<< commands.Hook("next_layer", tutils.Placeholder())
>> tutils.next_layer(tutils.EchoLayer)
<< commands.SendData(conn, data)
)
tssl.inc.write(data())
assert tssl.obj.read() == b"hello world"
def test_server_tls_no_conn(tctx):
"""
The server TLS layer is initiated, but there is no active connection yet, so nothing
should be done.
"""
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
tctx.server.tls = True
# We did not have a server connection before, so let's do nothing.
assert (
playbook
<< None
)
def test_server_tls(tctx):
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
tctx.server.connected = True
tctx.server.address = ("example.com", 443)
tctx.server.tls = True
tssl = SSLTest(server_side=True)
# send ClientHello
data = tutils.Placeholder()
assert (
playbook
<< commands.SendData(tctx.server, data)
)
# receive ServerHello, finish client handshake
tssl.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.out.read())
<< commands.SendData(tctx.server, data)
)
tssl.inc.write(data())
# finish server handshake
tssl.obj.do_handshake()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.out.read())
<< None
)
assert tctx.server.tls_established
assert tctx.server.sni == b"example.com"
# Echo
echo(playbook, tssl, tctx.server)
def _test_tls_client_server(tctx, alpn):
def _test_tls_client_server(
tctx: context.Context,
alpn: typing.Optional[str]
) -> typing.Tuple[tutils.playbook[tls.ClientTLSLayer], SSLTest]:
layer = tls.ClientTLSLayer(tctx)
playbook = tutils.playbook(layer)
tctx.server.tls = True
@ -240,85 +112,215 @@ def _test_tls_client_server(tctx, alpn):
return playbook, tssl_client
def test_tls_client_server_no_server_conn(tctx):
"""
Here we test the scenario where a server connection is _not_ required
to establish TLS with the client. After determining this when parsing the ClientHello,
we only establish a connection with the client. The server connection may ultimately
be established when OpenConnection is called.
"""
playbook, _ = _test_tls_client_server(tctx, None)
def echo(playbook: tutils.playbook, tssl: SSLTest, conn: context.Connection) -> None:
tssl.obj.write(b"Hello World")
data = tutils.Placeholder()
assert (
playbook
<< commands.SendData(tctx.client, data)
>> events.DataReceived(conn, tssl.out.read())
<< commands.Hook("next_layer", tutils.Placeholder())
>> tutils.next_layer(tutils.EchoLayer)
<< commands.SendData(conn, data)
)
assert data()
assert playbook.layer._handle_event == playbook.layer.state_process
tssl.inc.write(data())
assert tssl.obj.read() == b"hello world"
def test_tls_client_server_alpn(tctx):
"""
Here we test the scenario where a server connection is required (e.g. because of ALPN negotation)
to establish TLS with the client.
"""
tssl_server = SSLTest(server_side=True, alpn=["foo", "bar"])
class TestServerTLS:
def test_no_tls(self, tctx: context.Context):
"""Test TLS layer without TLS"""
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
playbook, tssl_client = _test_tls_client_server(tctx, ["qux", "foo"])
# Handshake
assert (
playbook
>> events.DataReceived(tctx.client, b"Hello World")
<< commands.Hook("next_layer", tutils.Placeholder())
>> tutils.next_layer(tutils.EchoLayer)
<< commands.SendData(tctx.client, b"hello world")
)
# We should now get instructed to open a server connection.
assert (
playbook
<< commands.OpenConnection(tctx.server)
)
tctx.server.connected = True
data = tutils.Placeholder()
assert (
playbook
>> events.OpenConnectionReply(-1, None)
<< commands.SendData(tctx.server, data)
)
assert playbook.layer._handle_event == playbook.layer.state_wait_for_server_tls
assert playbook.layer.child_layer.tls[tctx.server]
def test_no_connection(self, tctx):
"""
The server TLS layer is initiated, but there is no active connection yet, so nothing
should be done.
"""
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
tctx.server.tls = True
# Establish TLS with the server...
tssl_server.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
# We did not have a server connection before, so let's do nothing.
assert (
playbook
<< None
)
def test_simple(self, tctx):
layer = tls.ServerTLSLayer(tctx)
playbook = tutils.playbook(layer)
tctx.server.connected = True
tctx.server.address = ("example.com", 443)
tctx.server.tls = True
tssl = SSLTest(server_side=True)
# send ClientHello
data = tutils.Placeholder()
assert (
playbook
<< commands.SendData(tctx.server, data)
)
# receive ServerHello, finish client handshake
tssl.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.out.read())
<< commands.SendData(tctx.server, data)
)
tssl.inc.write(data())
# finish server handshake
tssl.obj.do_handshake()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.out.read())
<< None
)
assert tctx.server.tls_established
assert tctx.server.sni == b"example.com"
# Echo
echo(playbook, tssl, tctx.server)
class TestClientTLS:
def test_simple(self, tctx: context.Context):
"""Test TLS with client only"""
layer = tls.ClientTLSLayer(tctx)
playbook = tutils.playbook(layer)
tssl = SSLTest()
# Handshake
assert playbook
assert layer._handle_event == layer.state_wait_for_clienthello
def interact():
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl.out.read())
<< commands.SendData(tctx.client, data)
)
tssl.inc.write(data())
try:
tssl.obj.do_handshake()
except ssl.SSLWantReadError:
return False
else:
return True
# receive ClientHello, send ServerHello
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.do_handshake()
assert not interact()
# Finish Handshake
assert interact()
tssl.obj.do_handshake()
assert layer._handle_event == layer.state_process
# Echo
echo(playbook, tssl, tctx.client)
assert (
playbook
>> events.DataReceived(tctx.server, b"Hello")
<< commands.SendData(tctx.server, b"hello")
)
def test_no_server_conn_required(self, tctx):
"""
Here we test the scenario where a server connection is _not_ required
to establish TLS with the client. After determining this when parsing the ClientHello,
we only establish a connection with the client. The server connection may ultimately
be established when OpenConnection is called.
"""
playbook, _ = _test_tls_client_server(tctx, None)
data = tutils.Placeholder()
assert (
playbook
<< commands.SendData(tctx.client, data)
)
assert data()
assert playbook.layer._handle_event == playbook.layer.state_process
def test_alpn(self, tctx):
"""
Here we test the scenario where a server connection is required (e.g. because of ALPN negotation)
to establish TLS with the client.
"""
tssl_server = SSLTest(server_side=True, alpn=["foo", "bar"])
playbook, tssl_client = _test_tls_client_server(tctx, ["qux", "foo"])
# We should now get instructed to open a server connection.
assert (
playbook
<< commands.OpenConnection(tctx.server)
)
tctx.server.connected = True
data = tutils.Placeholder()
assert (
playbook
>> events.OpenConnectionReply(-1, None)
<< commands.SendData(tctx.server, data)
)
assert playbook.layer._handle_event == playbook.layer.state_wait_for_server_tls
assert playbook.layer.child_layer.tls[tctx.server]
# Establish TLS with the server...
tssl_server.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl_server.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.out.read())
<< commands.SendData(tctx.server, data)
)
tssl_server.inc.write(data())
tssl_server.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.out.read())
<< commands.SendData(tctx.server, data)
)
tssl_server.inc.write(data())
tssl_server.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.out.read())
<< commands.SendData(tctx.client, data)
)
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.out.read())
<< commands.SendData(tctx.client, data)
)
assert playbook.layer._handle_event == playbook.layer.state_process
assert tctx.server.tls_established
assert playbook.layer._handle_event == playbook.layer.state_process
assert tctx.server.tls_established
# Server TLS is established, we can now reply to the client handshake...
tssl_client.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
# Server TLS is established, we can now reply to the client handshake...
tssl_client.inc.write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl_client.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.SendData(tctx.client, data)
)
tssl_client.inc.write(data())
tssl_client.obj.do_handshake()
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.SendData(tctx.client, data)
)
tssl_client.inc.write(data())
tssl_client.obj.do_handshake()
# Both handshakes completed!
assert tctx.client.tls_established
assert tctx.server.tls_established
# Both handshakes completed!
assert tctx.client.tls_established
assert tctx.server.tls_established
assert tssl_client.obj.selected_alpn_protocol() == "foo"
assert tssl_server.obj.selected_alpn_protocol() == "foo"
assert tssl_client.obj.selected_alpn_protocol() == "foo"
assert tssl_server.obj.selected_alpn_protocol() == "foo"

View File

@ -30,9 +30,9 @@ def _eq(
x, y = a[k], b[k]
# if there's a placeholder, make it x.
if isinstance(y, Placeholder):
if isinstance(y, _Placeholder):
x, y = y, x
if isinstance(x, Placeholder):
if isinstance(x, _Placeholder):
if x.obj is None:
x.obj = y
x = x.obj
@ -56,7 +56,11 @@ def eq(
return _eq(a, b)
class playbook:
T = typing.TypeVar('T', bound=Layer)
# noinspection PyPep8Naming
class playbook(typing.Generic[T]):
"""
Assert that a layer emits the expected commands in reaction to a given sequence of events.
For example, the following code asserts that the TCP layer emits an OpenConnection command
@ -76,7 +80,7 @@ class playbook:
x2 = list(t.handle_event(events.OpenConnectionReply(x1[-1])))
assert x2 == []
"""
layer: Layer
layer: T
"""The base layer"""
expected: TPlaybook
"""expected command/event sequence"""
@ -89,9 +93,9 @@ class playbook:
def __init__(
self,
layer: Layer,
expected: typing.Optional[TPlaybook]=None,
ignore_log: bool=True
layer: T,
expected: typing.Optional[TPlaybook] = None,
ignore_log: bool = True
):
if expected is None:
expected = [
@ -176,7 +180,7 @@ class playbook:
return copy.deepcopy(self)
class Placeholder:
class _Placeholder:
"""
Placeholder value in playbooks, so that objects (flows in particular) can be referenced before
they are known. Example:
@ -205,6 +209,10 @@ class Placeholder:
return f"Placeholder:{str(self.obj)}"
def Placeholder() -> typing.Any:
return _Placeholder()
class EchoLayer(Layer):
"""Echo layer that sends all data back to the client in lowercase."""