diff --git a/mitmproxy/proxy/protocol2/test/test_tutils.py b/mitmproxy/proxy/protocol2/test/test_tutils.py index 8c6206e37..7148044a6 100644 --- a/mitmproxy/proxy/protocol2/test/test_tutils.py +++ b/mitmproxy/proxy/protocol2/test/test_tutils.py @@ -1,9 +1,10 @@ import typing +import pytest + from mitmproxy.proxy.protocol2 import events, commands from mitmproxy.proxy.protocol2.layer import Layer from mitmproxy.proxy.protocol2.test import tutils -from mitmproxy.proxy.protocol2.utils import expect class TEvent(events.Event): @@ -20,21 +21,29 @@ class TCommand(commands.Command): self.x = x +class TCommandReply(events.CommandReply): + pass + + class TLayer(Layer): """ Simple echo layer """ - @expect(TEvent) - def _handle_event(self, event: TEvent) -> commands.TCommandGenerator: - for x in event.commands: - yield TCommand(x) + def _handle_event(self, event: events.Event) -> commands.TCommandGenerator: + if isinstance(event, TEvent): + for x in event.commands: + yield TCommand(x) -def test_playbook_simple(tctx): - playbook = tutils.playbook(TLayer(tctx), []) +@pytest.fixture +def tplaybook(tctx): + return tutils.playbook(TLayer(tctx), []) + + +def test_simple(tplaybook): assert ( - playbook + tplaybook >> TEvent() << TCommand() >> TEvent([]) @@ -42,18 +51,145 @@ def test_playbook_simple(tctx): ) -def test_playbook_partial_assert(tctx): - playbook = tutils.playbook(TLayer(tctx), []) - playbook = ( - playbook +def test_mismatch(tplaybook): + with pytest.raises(AssertionError, message="Playbook mismatch"): + assert ( + tplaybook + >> TEvent([]) + << TCommand() + ) + + +def test_partial_assert(tplaybook): + """Developers can assert parts of a playbook and the continue later on.""" + assert ( + tplaybook >> TEvent() << TCommand() ) - assert playbook - playbook = ( - playbook + assert ( + tplaybook >> TEvent() << TCommand() ) - assert playbook - assert len(playbook.actual) == len(playbook.playbook) == 4 + assert len(tplaybook.actual) == len(tplaybook.expected) == 4 + + +def test_placeholder(tplaybook): + """Developers can specify placeholders for yet unknown attributes.""" + f = tutils.Placeholder() + assert ( + tplaybook + >> TEvent([42]) + << TCommand(f) + ) + assert f() == 42 + + +def test_fork(tplaybook): + """Playbooks can be forked to test multiple execution streams.""" + assert ( + tplaybook + >> TEvent() + << TCommand() + ) + p2 = tplaybook.fork() + p3 = tplaybook.fork() + assert ( + tplaybook + >> TEvent() + << TCommand() + ) + assert ( + p2 + >> TEvent() + << TCommand() + ) + assert len(tplaybook.actual) == len(tplaybook.expected) == 4 + assert len(p2.actual) == len(p2.expected) == 4 + assert len(p3.actual) == len(p3.expected) == 2 + + +def test_fork_placeholder(tplaybook): + """Forks require new placeholders.""" + f = tutils.Placeholder() + flow = object() + assert ( + tplaybook + >> TEvent([flow]) + << TCommand(f) + ) + assert f() == flow + p2 = tplaybook.fork() + + p2_flow = p2.expected[0].commands[0] + assert p2_flow != flow + + # As we have forked, we need a new placeholder. + f2 = tutils.Placeholder() + assert ( + p2 + >> TEvent([p2_flow]) + << TCommand(f2) + ) + assert f2() == p2_flow + + # re-using the old placeholder does not work. + with pytest.raises(AssertionError, message="Playbook mismatch"): + assert ( + p2 + >> TEvent([p2_flow]) + << TCommand(f) + ) + + +def test_unfinished(tplaybook): + """We show a warning when playbooks aren't asserted.""" + tplaybook >> TEvent() + with pytest.raises(RuntimeError, message="Unfinished playbook"): + tplaybook.__del__() + tplaybook._errored = True + tplaybook.__del__() + + +def test_command_reply(tplaybook): + """CommandReplies can use relative offsets to point to the matching command.""" + assert ( + tplaybook + >> TEvent() + << TCommand() + >> TCommandReply(-1, 42) + ) + assert tplaybook.actual[1] == tplaybook.actual[2].command + + +def test_default_playbook(tctx): + p = tutils.playbook(TLayer(tctx)) + assert p + assert len(p.actual) == 1 + assert isinstance(p.actual[0], events.Start) + + +def test_eq_blocking(): + """_eq should not consider differences in .blocking""" + a = TCommand() + a.blocking = True + b = TCommand() + b.blocking = False + assert tutils._eq(a, b) + + +def test_eq_placeholder(): + """_eq should assign placeholders.""" + a = TCommand() + a.foo = 42 + a.bar = tutils.Placeholder() + b = TCommand() + b.foo = tutils.Placeholder() + b.bar = 43 + assert tutils._eq(a, b) + assert a.foo == b.foo() == 42 + assert a.bar() == b.bar == 43 + + b.foo.obj = 44 + assert not tutils._eq(a, b) diff --git a/mitmproxy/proxy/protocol2/test/tutils.py b/mitmproxy/proxy/protocol2/test/tutils.py index af3177ffc..8e974bee4 100644 --- a/mitmproxy/proxy/protocol2/test/tutils.py +++ b/mitmproxy/proxy/protocol2/test/tutils.py @@ -1,9 +1,9 @@ +import copy import difflib import itertools -import re import typing -import copy +import re from mitmproxy.proxy.protocol2 import commands from mitmproxy.proxy.protocol2 import events @@ -64,58 +64,46 @@ class playbook: """ layer: layer.Layer """The base layer""" - playbook: TPlaybook + expected: TPlaybook """expected command/event sequence""" actual: TPlaybook """actual command/event sequence""" - _final: bool - """True if no << or >> operation has been called on this.""" + _errored: bool + """used to check if playbook as been fully asserted""" def __init__( self, layer, - playbook=None, + expected=None, ): - if playbook is None: - playbook = [ + if expected is None: + expected = [ events.Start() ] self.layer = layer - self.playbook = playbook + self.expected = expected self.actual = [] - self._final = True - - def _copy_with(self, entry: TPlaybookEntry): - self._final = False - p = playbook( - self.layer, - self.playbook + [entry] - ) - p.actual = self.actual.copy() - return p + self._errored = False def __rshift__(self, e): """Add an event to send""" assert isinstance(e, events.Event) - return self._copy_with(e) + self.expected.append(e) + return self def __lshift__(self, c): """Add an expected command""" if c is None: return self assert isinstance(c, commands.Command) - return self._copy_with(c) + self.expected.append(c) + return self def __bool__(self): """Determine if playbook is correct.""" - - self.layer = copy.deepcopy(self.layer) - self.playbook = copy.deepcopy(self.playbook) - self.actual = copy.deepcopy(self.actual) - already_asserted = len(self.actual) - for i, x in enumerate(self.playbook[already_asserted:], already_asserted): + for i, x in enumerate(self.expected[already_asserted:], already_asserted): if isinstance(x, commands.Command): pass else: @@ -130,27 +118,30 @@ class playbook: success = all( _eq(e, a) - for e, a in itertools.zip_longest(self.playbook, self.actual) + for e, a in itertools.zip_longest(self.expected, self.actual) ) if not success: - def _str(x): - # add arrows to diff - return f"{'>' if isinstance(x, events.Event) else '<'} {x}" + self._errored = True - diff = difflib.ndiff( - [_str(x) for x in self.playbook], + def _str(x): + x_str = re.sub(r'Placeholder\((.*?)\)', r'\1', str(x)) + return f"{'>' if isinstance(x, events.Event) else '<'} {x_str}" + + diff = "\n".join(difflib.ndiff( + [_str(x) for x in self.expected], [_str(x) for x in self.actual] - ) - print("✗ Playbook mismatch:") - print("\n".join(diff)) - return False + )) + raise AssertionError(f"Playbook mismatch!\n{diff}") else: return True def __del__(self): - if self._final and len(self.actual) < len(self.playbook): + if not self._errored and len(self.actual) < len(self.expected): raise RuntimeError("Unfinished playbook!") + def fork(self): + return copy.deepcopy(self) + class Placeholder: """Placeholder value in playbooks, so that flows can be referenced before they are initialized.""" @@ -163,4 +154,4 @@ class Placeholder: return self.obj def __repr__(self): - return f"P({repr(self.obj)})" + return f"Placeholder({repr(self.obj)})"