mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
protocols: add playbook.fork()
This commit is contained in:
parent
41597272f9
commit
2679387849
@ -1,9 +1,10 @@
|
||||
import typing
|
||||
|
||||
import pytest
|
||||
|
||||
from mitmproxy.proxy.protocol2 import events, commands
|
||||
from mitmproxy.proxy.protocol2.layer import Layer
|
||||
from mitmproxy.proxy.protocol2.test import tutils
|
||||
from mitmproxy.proxy.protocol2.utils import expect
|
||||
|
||||
|
||||
class TEvent(events.Event):
|
||||
@ -20,21 +21,29 @@ class TCommand(commands.Command):
|
||||
self.x = x
|
||||
|
||||
|
||||
class TCommandReply(events.CommandReply):
|
||||
pass
|
||||
|
||||
|
||||
class TLayer(Layer):
|
||||
"""
|
||||
Simple echo layer
|
||||
"""
|
||||
|
||||
@expect(TEvent)
|
||||
def _handle_event(self, event: TEvent) -> commands.TCommandGenerator:
|
||||
def _handle_event(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, TEvent):
|
||||
for x in event.commands:
|
||||
yield TCommand(x)
|
||||
|
||||
|
||||
def test_playbook_simple(tctx):
|
||||
playbook = tutils.playbook(TLayer(tctx), [])
|
||||
@pytest.fixture
|
||||
def tplaybook(tctx):
|
||||
return tutils.playbook(TLayer(tctx), [])
|
||||
|
||||
|
||||
def test_simple(tplaybook):
|
||||
assert (
|
||||
playbook
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
>> TEvent([])
|
||||
@ -42,18 +51,145 @@ def test_playbook_simple(tctx):
|
||||
)
|
||||
|
||||
|
||||
def test_playbook_partial_assert(tctx):
|
||||
playbook = tutils.playbook(TLayer(tctx), [])
|
||||
playbook = (
|
||||
playbook
|
||||
def test_mismatch(tplaybook):
|
||||
with pytest.raises(AssertionError, message="Playbook mismatch"):
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent([])
|
||||
<< TCommand()
|
||||
)
|
||||
|
||||
|
||||
def test_partial_assert(tplaybook):
|
||||
"""Developers can assert parts of a playbook and the continue later on."""
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
)
|
||||
assert playbook
|
||||
playbook = (
|
||||
playbook
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
)
|
||||
assert playbook
|
||||
assert len(playbook.actual) == len(playbook.playbook) == 4
|
||||
assert len(tplaybook.actual) == len(tplaybook.expected) == 4
|
||||
|
||||
|
||||
def test_placeholder(tplaybook):
|
||||
"""Developers can specify placeholders for yet unknown attributes."""
|
||||
f = tutils.Placeholder()
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent([42])
|
||||
<< TCommand(f)
|
||||
)
|
||||
assert f() == 42
|
||||
|
||||
|
||||
def test_fork(tplaybook):
|
||||
"""Playbooks can be forked to test multiple execution streams."""
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
)
|
||||
p2 = tplaybook.fork()
|
||||
p3 = tplaybook.fork()
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
)
|
||||
assert (
|
||||
p2
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
)
|
||||
assert len(tplaybook.actual) == len(tplaybook.expected) == 4
|
||||
assert len(p2.actual) == len(p2.expected) == 4
|
||||
assert len(p3.actual) == len(p3.expected) == 2
|
||||
|
||||
|
||||
def test_fork_placeholder(tplaybook):
|
||||
"""Forks require new placeholders."""
|
||||
f = tutils.Placeholder()
|
||||
flow = object()
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent([flow])
|
||||
<< TCommand(f)
|
||||
)
|
||||
assert f() == flow
|
||||
p2 = tplaybook.fork()
|
||||
|
||||
p2_flow = p2.expected[0].commands[0]
|
||||
assert p2_flow != flow
|
||||
|
||||
# As we have forked, we need a new placeholder.
|
||||
f2 = tutils.Placeholder()
|
||||
assert (
|
||||
p2
|
||||
>> TEvent([p2_flow])
|
||||
<< TCommand(f2)
|
||||
)
|
||||
assert f2() == p2_flow
|
||||
|
||||
# re-using the old placeholder does not work.
|
||||
with pytest.raises(AssertionError, message="Playbook mismatch"):
|
||||
assert (
|
||||
p2
|
||||
>> TEvent([p2_flow])
|
||||
<< TCommand(f)
|
||||
)
|
||||
|
||||
|
||||
def test_unfinished(tplaybook):
|
||||
"""We show a warning when playbooks aren't asserted."""
|
||||
tplaybook >> TEvent()
|
||||
with pytest.raises(RuntimeError, message="Unfinished playbook"):
|
||||
tplaybook.__del__()
|
||||
tplaybook._errored = True
|
||||
tplaybook.__del__()
|
||||
|
||||
|
||||
def test_command_reply(tplaybook):
|
||||
"""CommandReplies can use relative offsets to point to the matching command."""
|
||||
assert (
|
||||
tplaybook
|
||||
>> TEvent()
|
||||
<< TCommand()
|
||||
>> TCommandReply(-1, 42)
|
||||
)
|
||||
assert tplaybook.actual[1] == tplaybook.actual[2].command
|
||||
|
||||
|
||||
def test_default_playbook(tctx):
|
||||
p = tutils.playbook(TLayer(tctx))
|
||||
assert p
|
||||
assert len(p.actual) == 1
|
||||
assert isinstance(p.actual[0], events.Start)
|
||||
|
||||
|
||||
def test_eq_blocking():
|
||||
"""_eq should not consider differences in .blocking"""
|
||||
a = TCommand()
|
||||
a.blocking = True
|
||||
b = TCommand()
|
||||
b.blocking = False
|
||||
assert tutils._eq(a, b)
|
||||
|
||||
|
||||
def test_eq_placeholder():
|
||||
"""_eq should assign placeholders."""
|
||||
a = TCommand()
|
||||
a.foo = 42
|
||||
a.bar = tutils.Placeholder()
|
||||
b = TCommand()
|
||||
b.foo = tutils.Placeholder()
|
||||
b.bar = 43
|
||||
assert tutils._eq(a, b)
|
||||
assert a.foo == b.foo() == 42
|
||||
assert a.bar() == b.bar == 43
|
||||
|
||||
b.foo.obj = 44
|
||||
assert not tutils._eq(a, b)
|
||||
|
@ -1,9 +1,9 @@
|
||||
import copy
|
||||
import difflib
|
||||
import itertools
|
||||
import re
|
||||
import typing
|
||||
|
||||
import copy
|
||||
import re
|
||||
|
||||
from mitmproxy.proxy.protocol2 import commands
|
||||
from mitmproxy.proxy.protocol2 import events
|
||||
@ -64,58 +64,46 @@ class playbook:
|
||||
"""
|
||||
layer: layer.Layer
|
||||
"""The base layer"""
|
||||
playbook: TPlaybook
|
||||
expected: TPlaybook
|
||||
"""expected command/event sequence"""
|
||||
actual: TPlaybook
|
||||
"""actual command/event sequence"""
|
||||
_final: bool
|
||||
"""True if no << or >> operation has been called on this."""
|
||||
_errored: bool
|
||||
"""used to check if playbook as been fully asserted"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
layer,
|
||||
playbook=None,
|
||||
expected=None,
|
||||
):
|
||||
if playbook is None:
|
||||
playbook = [
|
||||
if expected is None:
|
||||
expected = [
|
||||
events.Start()
|
||||
]
|
||||
|
||||
self.layer = layer
|
||||
self.playbook = playbook
|
||||
self.expected = expected
|
||||
self.actual = []
|
||||
self._final = True
|
||||
|
||||
def _copy_with(self, entry: TPlaybookEntry):
|
||||
self._final = False
|
||||
p = playbook(
|
||||
self.layer,
|
||||
self.playbook + [entry]
|
||||
)
|
||||
p.actual = self.actual.copy()
|
||||
return p
|
||||
self._errored = False
|
||||
|
||||
def __rshift__(self, e):
|
||||
"""Add an event to send"""
|
||||
assert isinstance(e, events.Event)
|
||||
return self._copy_with(e)
|
||||
self.expected.append(e)
|
||||
return self
|
||||
|
||||
def __lshift__(self, c):
|
||||
"""Add an expected command"""
|
||||
if c is None:
|
||||
return self
|
||||
assert isinstance(c, commands.Command)
|
||||
return self._copy_with(c)
|
||||
self.expected.append(c)
|
||||
return self
|
||||
|
||||
def __bool__(self):
|
||||
"""Determine if playbook is correct."""
|
||||
|
||||
self.layer = copy.deepcopy(self.layer)
|
||||
self.playbook = copy.deepcopy(self.playbook)
|
||||
self.actual = copy.deepcopy(self.actual)
|
||||
|
||||
already_asserted = len(self.actual)
|
||||
for i, x in enumerate(self.playbook[already_asserted:], already_asserted):
|
||||
for i, x in enumerate(self.expected[already_asserted:], already_asserted):
|
||||
if isinstance(x, commands.Command):
|
||||
pass
|
||||
else:
|
||||
@ -130,27 +118,30 @@ class playbook:
|
||||
|
||||
success = all(
|
||||
_eq(e, a)
|
||||
for e, a in itertools.zip_longest(self.playbook, self.actual)
|
||||
for e, a in itertools.zip_longest(self.expected, self.actual)
|
||||
)
|
||||
if not success:
|
||||
def _str(x):
|
||||
# add arrows to diff
|
||||
return f"{'>' if isinstance(x, events.Event) else '<'} {x}"
|
||||
self._errored = True
|
||||
|
||||
diff = difflib.ndiff(
|
||||
[_str(x) for x in self.playbook],
|
||||
def _str(x):
|
||||
x_str = re.sub(r'Placeholder\((.*?)\)', r'\1', str(x))
|
||||
return f"{'>' if isinstance(x, events.Event) else '<'} {x_str}"
|
||||
|
||||
diff = "\n".join(difflib.ndiff(
|
||||
[_str(x) for x in self.expected],
|
||||
[_str(x) for x in self.actual]
|
||||
)
|
||||
print("✗ Playbook mismatch:")
|
||||
print("\n".join(diff))
|
||||
return False
|
||||
))
|
||||
raise AssertionError(f"Playbook mismatch!\n{diff}")
|
||||
else:
|
||||
return True
|
||||
|
||||
def __del__(self):
|
||||
if self._final and len(self.actual) < len(self.playbook):
|
||||
if not self._errored and len(self.actual) < len(self.expected):
|
||||
raise RuntimeError("Unfinished playbook!")
|
||||
|
||||
def fork(self):
|
||||
return copy.deepcopy(self)
|
||||
|
||||
|
||||
class Placeholder:
|
||||
"""Placeholder value in playbooks, so that flows can be referenced before they are initialized."""
|
||||
@ -163,4 +154,4 @@ class Placeholder:
|
||||
return self.obj
|
||||
|
||||
def __repr__(self):
|
||||
return f"P({repr(self.obj)})"
|
||||
return f"Placeholder({repr(self.obj)})"
|
||||
|
Loading…
Reference in New Issue
Block a user