[sans-io] enable "Secure Web Proxy" mode

This commit is contained in:
Maximilian Hils 2020-11-24 14:24:24 +01:00
parent 1d4bdeb68d
commit 41f69749f2
10 changed files with 258 additions and 140 deletions

View File

@ -60,6 +60,12 @@ class NextLayer:
]
def ignore_connection(self, context: context.Context, data_client: bytes) -> typing.Optional[bool]:
"""
Returns:
True, if the connection should be ignored.
False, if it should not be ignored.
None, if we need to wait for more input data.
"""
if not ctx.options.ignore_hosts and not ctx.options.allow_hosts:
return False
@ -112,7 +118,8 @@ class NextLayer:
if client_tls:
# client tls requires a server tls layer as parent layer
# reverse proxy mode manages this itself.
if isinstance(top_layer, layers.ServerTLSLayer) or ctx.options.mode.startswith("reverse:"):
# a secure web proxy doesn't have a server part.
if isinstance(top_layer, layers.ServerTLSLayer) or s(modes.ReverseProxy) or s(modes.HttpProxy):
return layers.ClientTLSLayer(context)
else:
return layers.ServerTLSLayer(context)
@ -120,17 +127,18 @@ class NextLayer:
# 3. Setup the HTTP layer for a regular HTTP proxy or an upstream proxy.
if any([
s(modes.HttpProxy),
# or a "Secure Web Proxy", see https://www.chromium.org/developers/design-documents/secure-web-proxy
s(modes.HttpProxy, layers.ClientTLSLayer),
]):
return layers.HttpLayer(context, HTTPMode.regular)
if ctx.options.mode.startswith("upstream:") and len(context.layers) <= 3 and isinstance(top_layer,
layers.ServerTLSLayer):
raise NotImplementedError()
if ctx.options.mode == "regular":
return layers.HttpLayer(context, HTTPMode.regular)
else:
return layers.HttpLayer(context, HTTPMode.upstream)
# 4. Check for --tcp
if any(
address and rex.search(address)
for address in (context.server.address[0], context.client.sni.decode("idna"))
rex.search(context.server.address[0]) or
(context.client.sni and rex.search(context.client.sni))
for rex in self.tcp_hosts
):
return layers.TCPLayer(context)

View File

@ -137,6 +137,8 @@ class TlsConfig:
if not server.alpn_offers:
if client.alpn_offers:
server.alpn_offers = tuple(client.alpn_offers)
if not ctx.options.http2:
server.alpn_offers = tuple(x for x in server.alpn_offers if x != b"h2")
elif ctx.options.http2:
server.alpn_offers = tls.HTTP_ALPNS
else:

View File

@ -196,6 +196,8 @@ class NextLayer(Layer):
# Has an addon decided on the next layer yet?
if self.layer:
if self.debug:
yield commands.Log(f"{self.debug}[nextlayer] {self.layer!r}", "debug")
for e in self.events:
yield from self.layer.handle_event(e)
self.events.clear()

View File

@ -371,41 +371,18 @@ class HttpStream(layer.Layer):
yield from self.handle_connect_finish()
def handle_connect_upstream(self):
assert self.context.server.via.scheme == "http"
http_proxy = _upstream_proxy.HttpUpstreamProxy(self.context, self.context.server.via.address, True)
assert self.context.server.via.scheme in ("http", "https")
if not self.flow.response and self.context.options.connection_strategy == "eager":
# We're bending over backwards here to 1) open a connection and 2) do an HTTP CONNECT cycle.
# If this turns out to be too error-prone, we may just want to default to "lazy" for upstream proxy mode.
http_proxy = Server(self.context.server.via.address)
stub = tunnel.OpenConnectionStub(self.context)
http_proxy.child_layer = stub
stack = tunnel.LayerStack(self.context)
if self.context.server.via.scheme == "https":
http_proxy.sni = self.context.server.via.address[0].encode()
stack /= lambda ctx: tls.ServerTLSLayer(ctx, http_proxy)
stack /= lambda ctx: _upstream_proxy.HttpUpstreamProxy(ctx, http_proxy, True)
yield from http_proxy.handle_event(events.Start())
def _wait_for_reply(event: events.Event):
yield from http_proxy.handle_event(event)
if stub.err:
self.flow.response = http.HTTPResponse.make(
502, f"HTTP CONNECT failed "
f"for {human.format_address(self.context.server.address)} "
f"at {human.format_address(http_proxy.tunnel_connection.address)}: {stub.err}"
)
yield from self.send_response()
elif stub.done:
self.flow.response = http.make_connect_response(self.flow.request.data.http_version)
yield SendHttp(ResponseHeaders(self.stream_id, self.flow.response), self.context.client)
self.child_layer = http_proxy
http_proxy.child_layer = layer.NextLayer(self.context)
yield from http_proxy.child_layer.handle_event(events.Start())
self._handle_event = self.passthrough
self._handle_event = _wait_for_reply
else:
self.child_layer = http_proxy
http_proxy.child_layer = layer.NextLayer(self.context)
yield from self.handle_connect_finish()
self.child_layer = stack[0]
yield from self.handle_connect_finish()
def handle_connect_finish(self):
if not self.flow.response:
@ -490,7 +467,7 @@ class HttpLayer(layer.Layer):
}
def __repr__(self):
return f"HttpLayer(conns: {len(self.connections)})"
return f"HttpLayer({self.mode.name}, conns: {len(self.connections)})"
def _handle_event(self, event: events.Event):
if isinstance(event, events.Start):
@ -578,23 +555,26 @@ class HttpLayer(layer.Layer):
)
context = self.context.fork()
stack = tunnel.LayerStack()
stack = tunnel.LayerStack(context)
if not can_use_context_connection:
context.server = Server(event.address)
if context.options.http2:
context.server.alpn_offers = tls.HTTP_ALPNS
else:
context.server.alpn_offers = tls.HTTP1_ALPNS
if event.via:
assert event.via.scheme == "http"
context.server.via = event.via
send_connect = not (self.mode == HTTPMode.upstream and not event.tls)
stack /= _upstream_proxy.HttpUpstreamProxy(context, event.via.address, send_connect)
if event.tls:
stack /= tls.ServerTLSLayer(context)
stack /= HttpClient(context)
context.server = Server(event.address)
if event.via:
assert event.via.scheme in ("http", "https")
http_proxy = Server(event.via.address)
if event.via.scheme == "https":
http_proxy.sni = event.via.address[0].encode()
stack /= lambda ctx: tls.ServerTLSLayer(ctx, http_proxy)
send_connect = not (self.mode == HTTPMode.upstream and not event.tls)
stack /= lambda ctx: _upstream_proxy.HttpUpstreamProxy(ctx, http_proxy, send_connect)
if event.tls:
stack /= lambda ctx: tls.ServerTLSLayer(ctx)
stack /= HttpClient
self.connections[context.server] = stack[0]
self.waiting_for_establishment[context.server].append(event)

View File

@ -16,17 +16,31 @@ class HttpUpstreamProxy(tunnel.TunnelLayer):
conn: context.Server
tunnel_connection: context.Server
def __init__(self, ctx: context.Context, address: tuple, send_connect: bool):
def __init__(
self,
ctx: context.Context,
tunnel_conn: context.Server,
send_connect: bool
):
super().__init__(
ctx,
tunnel_connection=context.Server(address),
tunnel_connection=tunnel_conn,
conn=ctx.server
)
self.conn.via = server_spec.ServerSpec("http", self.tunnel_connection.address)
self.conn.via = server_spec.ServerSpec(
"https" if self.tunnel_connection.tls else "http",
self.tunnel_connection.address
)
self.buf = ReceiveBuffer()
self.send_connect = send_connect
def start_handshake(self) -> layer.CommandGenerator[None]:
if self.tunnel_connection.tls:
# "Secure Web Proxy": We may have negotiated an ALPN when connecting to the upstream proxy.
# The semantics are not really clear here, but we make sure that if we negotiated h2,
# we act as an h2 client.
self.conn.alpn = self.tunnel_connection.alpn
if not self.send_connect:
return (yield from super().start_handshake())
req = http.make_connect_request(self.conn.address)

View File

@ -127,7 +127,6 @@ class _TLSLayer(tunnel.TunnelLayer):
conn=conn,
)
assert not conn.tls
conn.tls = True
def __repr__(self):
@ -248,8 +247,8 @@ class ServerTLSLayer(_TLSLayer):
"""
command_to_reply_to: Optional[commands.OpenConnection] = None
def __init__(self, context: context.Context):
super().__init__(context, context.server)
def __init__(self, context: context.Context, conn: Optional[context.Server] = None):
super().__init__(context, conn or context.server)
def start_handshake(self) -> layer.CommandGenerator[None]:
yield from self.start_tls()

View File

@ -1,7 +1,8 @@
from enum import Enum, auto
from typing import Optional, Tuple
from typing import Callable, List, Optional, Tuple, Type
from mitmproxy.proxy2 import commands, context, events, layer
from mitmproxy.proxy2.layer import Layer
class TunnelState(Enum):

View File

@ -2,6 +2,7 @@ import pytest
from mitmproxy.flow import Error
from mitmproxy.http import HTTPFlow, HTTPResponse
from mitmproxy.net.server_spec import ServerSpec
from mitmproxy.proxy.protocol.http import HTTPMode
from mitmproxy.proxy2 import layer
from mitmproxy.proxy2.commands import CloseConnection, OpenConnection, SendData
@ -442,40 +443,39 @@ def test_server_aborts(tctx, data):
assert b"502 Bad Gateway" in err()
@pytest.mark.parametrize("redirect", [None, "destination"])
@pytest.mark.parametrize("redirect", ["", "change-destination", "change-proxy"])
@pytest.mark.parametrize("scheme", ["http", "https"])
@pytest.mark.parametrize("strategy", ["eager", "lazy"])
def test_upstream_proxy(tctx, redirect, scheme, strategy):
def test_upstream_proxy(tctx, redirect, scheme):
"""Test that an upstream HTTP proxy is used."""
server = Placeholder(Server)
server2 = Placeholder(Server)
flow = Placeholder(HTTPFlow)
tctx.options.mode = "upstream:http://proxy:8080"
tctx.options.connection_strategy = strategy
playbook = Playbook(http.HttpLayer(tctx, HTTPMode.upstream), hooks=False)
if scheme == "http":
playbook >> DataReceived(tctx.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
playbook << OpenConnection(server)
playbook >> reply(None)
playbook << SendData(server, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
assert (
playbook
>> DataReceived(tctx.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
)
else:
playbook >> DataReceived(tctx.client, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
if strategy == "eager":
playbook << OpenConnection(server)
playbook >> reply(None)
playbook << SendData(server, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook << SendData(tctx.client, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook >> DataReceived(tctx.client, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
playbook << layer.NextLayerHook(Placeholder())
playbook >> reply_next_layer(lambda ctx: http.HttpLayer(ctx, HTTPMode.transparent))
if strategy == "lazy":
playbook << OpenConnection(server)
playbook >> reply(None)
playbook << SendData(server, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook << SendData(server, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
assert (
playbook
>> DataReceived(tctx.client, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
<< SendData(tctx.client, b"HTTP/1.1 200 Connection established\r\n\r\n")
>> DataReceived(tctx.client, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: http.HttpLayer(ctx, HTTPMode.transparent))
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
>> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
<< SendData(server, b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
)
playbook >> DataReceived(server, b"HTTP/1.1 418 OK\r\nContent-Length: 0\r\n\r\n")
playbook << SendData(tctx.client, b"HTTP/1.1 418 OK\r\nContent-Length: 0\r\n\r\n")
@ -489,12 +489,14 @@ def test_upstream_proxy(tctx, redirect, scheme, strategy):
playbook >> DataReceived(tctx.client, b"GET /two HTTP/1.1\r\nHost: example.com\r\n\r\n")
assert (playbook << http.HttpRequestHook(flow))
if redirect == "destination":
if redirect == "change-destination":
flow().request.host = "other-server"
flow().request.host_header = "example.com"
elif redirect == "change-proxy":
flow().server_conn.via = ServerSpec("http", address=("other-proxy", 1234))
playbook >> reply()
if redirect == "destination":
if redirect:
# Protocol-wise we wouldn't need to open a new connection for plain http host redirects,
# but we disregard this edge case to simplify implementation.
playbook << OpenConnection(server2)
@ -503,14 +505,17 @@ def test_upstream_proxy(tctx, redirect, scheme, strategy):
server2 = server
if scheme == "http":
if redirect == "destination":
if redirect == "change-destination":
playbook << SendData(server2, b"GET http://other-server/two HTTP/1.1\r\nHost: example.com\r\n\r\n")
else:
playbook << SendData(server2, b"GET http://example.com/two HTTP/1.1\r\nHost: example.com\r\n\r\n")
else:
if redirect == "destination":
if redirect == "change-destination":
playbook << SendData(server2, b"CONNECT other-server:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server2, b"HTTP/1.1 200 Connection established\r\n\r\n")
elif redirect == "change-proxy":
playbook << SendData(server2, b"CONNECT example.com:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server2, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook << SendData(server2, b"GET /two HTTP/1.1\r\nHost: example.com\r\n\r\n")
playbook >> DataReceived(server2, b"HTTP/1.1 418 OK\r\nContent-Length: 0\r\n\r\n")
@ -518,7 +523,7 @@ def test_upstream_proxy(tctx, redirect, scheme, strategy):
assert playbook
if redirect == "proxy":
if redirect == "change-proxy":
assert server2().address == ("other-proxy", 1234)
else:
assert server2().address == ("proxy", 8080)
@ -531,9 +536,8 @@ def test_upstream_proxy(tctx, redirect, scheme, strategy):
@pytest.mark.parametrize("mode", ["regular", "upstream"])
@pytest.mark.parametrize("strategy", ["eager", "lazy"])
@pytest.mark.parametrize("close_first", ["client", "server"])
def test_http_proxy_tcp(tctx, mode, strategy, close_first):
def test_http_proxy_tcp(tctx, mode, close_first):
"""Test TCP over HTTP CONNECT."""
server = Placeholder(Server)
@ -544,35 +548,29 @@ def test_http_proxy_tcp(tctx, mode, strategy, close_first):
tctx.options.mode = "regular"
toplayer = http.HttpLayer(tctx, HTTPMode.regular)
tctx.options.connection_strategy = strategy
playbook = Playbook(toplayer, hooks=False)
assert (
playbook
>> DataReceived(tctx.client, b"CONNECT example:443 HTTP/1.1\r\n\r\n")
<< SendData(tctx.client, b"HTTP/1.1 200 Connection established\r\n\r\n")
>> DataReceived(tctx.client, b"this is not http")
<< layer.NextLayerHook(Placeholder())
>> reply_next_layer(lambda ctx: TCPLayer(ctx, ignore=True))
<< OpenConnection(server)
)
playbook >> DataReceived(tctx.client, b"CONNECT example:443 HTTP/1.1\r\n\r\n")
if strategy == "eager":
playbook << OpenConnection(server)
playbook >> reply(None)
if mode == "upstream":
playbook << SendData(server, b"CONNECT example:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook >> reply(None)
if mode == "upstream":
playbook << SendData(server, b"CONNECT example:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook << SendData(tctx.client, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook >> DataReceived(tctx.client, b"this is not http")
assert (
playbook
<< SendData(server, b"this is not http")
>> DataReceived(server, b"true that")
<< SendData(tctx.client, b"true that")
)
playbook << layer.NextLayerHook(Placeholder())
playbook >> reply_next_layer(lambda ctx: TCPLayer(ctx, ignore=True))
if strategy == "lazy":
playbook << OpenConnection(server)
playbook >> reply(None)
if mode == "upstream":
playbook << SendData(server, b"CONNECT example:443 HTTP/1.1\r\n\r\n")
playbook >> DataReceived(server, b"HTTP/1.1 200 Connection established\r\n\r\n")
playbook << SendData(server, b"this is not http")
playbook >> DataReceived(server, b"true that")
playbook << SendData(tctx.client, b"true that")
assert playbook
if mode == "regular":
assert server().address == ("example", 443)
else:
@ -891,18 +889,18 @@ def test_connection_close_header(tctx, client_close, server_close):
Playbook(http.HttpLayer(tctx, HTTPMode.regular), hooks=False)
>> DataReceived(tctx.client, b"GET http://example/ HTTP/1.1\r\n"
b"Host: example\r\n" + client_close +
b"\r\n")
b"\r\n")
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b"GET / HTTP/1.1\r\n"
b"Host: example\r\n" + client_close +
b"\r\n")
b"\r\n")
>> DataReceived(server, b"HTTP/1.1 200 OK\r\n"
b"Content-Length: 0\r\n" + server_close +
b"\r\n")
b"\r\n")
<< CloseConnection(server)
<< SendData(tctx.client, b"HTTP/1.1 200 OK\r\n"
b"Content-Length: 0\r\n" + server_close +
b"\r\n")
b"\r\n")
<< CloseConnection(tctx.client)
)
)

View File

@ -0,0 +1,104 @@
import copy
from mitmproxy.http import HTTPFlow
from mitmproxy.proxy.protocol.http import HTTPMode
from mitmproxy.proxy2.commands import CloseConnection, OpenConnection, SendData
from mitmproxy.proxy2.context import Client, Context, Server
from mitmproxy.proxy2.events import DataReceived
from mitmproxy.proxy2.layer import NextLayer, NextLayerHook
from mitmproxy.proxy2.layers import http, modes
from mitmproxy.proxy2.layers.tls import ClientTLSLayer, TlsStartHook
from test.mitmproxy.proxy2.layers.test_tls import reply_tls_start
from test.mitmproxy.proxy2.tutils import Placeholder, Playbook, reply, reply_next_layer
def test_upstream_https(tctx):
"""
Test mitmproxy in HTTPS upstream mode with another mitmproxy instance upstream.
In other words:
mitmdump --mode upstream:https://localhost:8081 --ssl-insecure
mitmdump -p 8081
curl -x localhost:8080 -k http://example.com
"""
tctx1 = Context(
Client(
("client", 1234),
("127.0.0.1", 8080),
1605699329
),
copy.deepcopy(tctx.options)
)
tctx1.options.mode = "upstream:https://example.mitmproxy.org:8081"
tctx2 = Context(
Client(
("client", 4321),
("127.0.0.1", 8080),
1605699329
),
copy.deepcopy(tctx.options)
)
assert tctx2.options.mode == "regular"
del tctx
proxy1 = Playbook(modes.HttpProxy(tctx1), hooks=False)
proxy2 = Playbook(modes.HttpProxy(tctx2), hooks=False)
upstream = Placeholder(Server)
server = Placeholder(Server)
clienthello = Placeholder(bytes)
serverhello = Placeholder(bytes)
request = Placeholder(bytes)
tls_finished = Placeholder(bytes)
h2_client_settings_ack = Placeholder(bytes)
response = Placeholder(bytes)
h2_server_settings_ack = Placeholder(bytes)
assert (
proxy1
>> DataReceived(tctx1.client, b"GET http://example.com/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
<< NextLayerHook(Placeholder(NextLayer))
>> reply_next_layer(lambda ctx: http.HttpLayer(ctx, HTTPMode.upstream))
<< OpenConnection(upstream)
>> reply(None)
<< TlsStartHook(Placeholder())
>> reply_tls_start(alpn=b"h2")
<< SendData(upstream, clienthello)
)
assert upstream().address == ("example.mitmproxy.org", 8081)
assert (
proxy2
>> DataReceived(tctx2.client, clienthello())
<< NextLayerHook(Placeholder(NextLayer))
>> reply_next_layer(ClientTLSLayer)
<< TlsStartHook(Placeholder())
>> reply_tls_start(alpn=b"h2")
<< SendData(tctx2.client, serverhello)
)
assert (
proxy1
>> DataReceived(upstream, serverhello())
<< SendData(upstream, request)
)
assert (
proxy2
>> DataReceived(tctx2.client, request())
<< SendData(tctx2.client, tls_finished)
<< NextLayerHook(Placeholder(NextLayer))
>> reply_next_layer(lambda ctx: http.HttpLayer(ctx, HTTPMode.regular))
<< SendData(tctx2.client, h2_client_settings_ack)
<< OpenConnection(server)
>> reply(None)
<< SendData(server, b'GET / HTTP/1.1\r\nhost: example.com\r\n\r\n')
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
<< CloseConnection(server)
<< SendData(tctx2.client, response)
)
assert server().address == ("example.com", 80)
assert (
proxy1
>> DataReceived(upstream, tls_finished() + h2_client_settings_ack() + response())
<< SendData(upstream, h2_server_settings_ack)
<< SendData(tctx1.client, b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
)

View File

@ -68,20 +68,24 @@ def _fmt_entry(x: PlaybookEntry):
return f"{arrow} {x}"
def _merge_sends(lst: PlaybookEntryList) -> PlaybookEntryList:
merged = lst[:1]
for x in lst[1:]:
prev = merged[-1]
two_subsequent_sends_to_the_same_remote = (
isinstance(x, commands.SendData) and
isinstance(prev, commands.SendData) and
x.connection is prev.connection
)
if two_subsequent_sends_to_the_same_remote:
prev.data += x.data
def _merge_sends(lst: typing.List[commands.Command], ignore_hooks: bool, ignore_logs: bool) -> PlaybookEntryList:
current_send = None
for x in lst:
if isinstance(x, commands.SendData):
if current_send is None:
current_send = x
yield x
else:
current_send.data += x.data
else:
merged.append(x)
return merged
ignore = (
(ignore_hooks and isinstance(x, commands.Hook))
or
(ignore_logs and isinstance(x, commands.Log))
)
if not ignore:
current_send = None
yield x
class _TracebackInPlaybook(commands.Command):
@ -199,10 +203,11 @@ class Playbook:
self.actual.append(_TracebackInPlaybook(traceback.format_exc()))
break
cmds = _merge_sends(cmds)
cmds = list(_merge_sends(cmds, ignore_hooks=not self.hooks, ignore_logs=not self.logs))
self.actual.extend(cmds)
pos = len(self.actual) - len(cmds) - 1
hook_replies = []
for cmd in cmds:
pos += 1
assert self.actual[pos] == cmd
@ -238,16 +243,21 @@ class Playbook:
if cmd.blocking:
# the current event may still have yielded more events, so we need to insert
# the reply *after* those additional events.
self.expected.insert(pos + len(cmds) - cmds.index(cmd), events.HookReply(cmd))
hook_replies.append(events.HookReply(cmd))
self.expected = self.expected[:pos+1] + hook_replies + self.expected[pos+1:]
eq(self.expected[i:], self.actual[i:]) # compare now already to set placeholders
i += 1
if not eq(self.expected, self.actual):
self._errored = True
diff = "\n".join(difflib.ndiff(
diffs = list(difflib.ndiff(
[_fmt_entry(x) for x in self.expected],
[_fmt_entry(x) for x in self.actual]
))
if already_asserted:
diffs.insert(already_asserted, "==== asserted until here ====")
diff = "\n".join(diffs)
raise AssertionError(f"Playbook mismatch!\n{diff}")
else:
return True
@ -381,8 +391,8 @@ def reply_next_layer(
**kwargs
) -> reply:
"""Helper function to simplify the syntax for next_layer events to this:
<< commands.Hook("next_layer", next_layer)
>> tutils.next_layer(next_layer, tutils.EchoLayer)
<< NextLayerHook(nl)
>> reply_next_layer(tutils.EchoLayer)
"""
def set_layer(next_layer: layer.NextLayer) -> None: