Redesign client replay

- Move to an addon
- Use a much simpler synchronisation mechanism
This commit is contained in:
Aldo Cortesi 2016-09-11 18:00:02 +12:00
parent 6427176cf1
commit cfb943bfdd
16 changed files with 143 additions and 182 deletions

View File

@ -9,6 +9,7 @@ from mitmproxy.builtins import script
from mitmproxy.builtins import replace
from mitmproxy.builtins import setheaders
from mitmproxy.builtins import serverplayback
from mitmproxy.builtins import clientplayback
def default_addons():
@ -21,5 +22,6 @@ def default_addons():
filestreamer.FileStreamer(),
replace.Replace(),
setheaders.SetHeaders(),
serverplayback.ServerPlayback()
serverplayback.ServerPlayback(),
clientplayback.ClientPlayback(),
]

View File

@ -0,0 +1,36 @@
from mitmproxy import exceptions, flow, ctx
class ClientPlayback:
def __init__(self):
self.flows = None
self.current = None
self.keepserving = None
def count(self):
if self.flows:
return len(self.flows)
return 0
def load(self, flows):
self.flows = flows
def configure(self, options, updated):
if "client_replay" in updated:
if options.client_replay:
try:
flows = flow.read_flows_from_paths(options.client_replay)
except exceptions.FlowReadException as e:
raise exceptions.OptionsError(str(e))
self.load(flows)
else:
self.flows = None
self.keepserving = options.keepserving
def tick(self):
if self.current and not self.current.is_alive():
self.current = None
if self.flows and not self.current:
self.current = ctx.master.replay_request(self.flows.pop(0))
if not self.flows and not self.current and not self.keepserving:
ctx.master.shutdown()

View File

@ -180,9 +180,7 @@ class ConnectionItem(urwid.WidgetWrap):
self.state.enable_marked_filter()
signals.flowlist_change.send(self)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
signals.status_message.send(message=r)
self.master.replay_request(self.flow)
signals.flowlist_change.send(self)
elif key == "S":
def stop_server_playback(response):

View File

@ -544,9 +544,7 @@ class FlowView(tabs.Tabs):
elif key == "p":
self.view_prev_flow(self.flow)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
signals.status_message.send(message=r)
self.master.replay_request(self.flow)
signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if self.flow.modified():

View File

@ -244,9 +244,6 @@ class ConsoleMaster(flow.FlowMaster):
self.logbuffer = urwid.SimpleListWalker([])
self.follow = options.follow
if options.client_replay:
self.client_playback_path(options.client_replay)
self.view_stack = []
if options.app:
@ -354,13 +351,6 @@ class ConsoleMaster(flow.FlowMaster):
except exceptions.FlowReadException as e:
signals.status_message.send(message=str(e))
def client_playback_path(self, path):
if not isinstance(path, list):
path = [path]
flows = self._readflows(path)
if flows:
self.start_client_playback(flows, False)
def spawn_editor(self, data):
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp('', "mproxy", text=text)

View File

@ -137,6 +137,7 @@ class StatusBar(urwid.WidgetWrap):
r = []
sreplay = self.master.addons.get("serverplayback")
creplay = self.master.addons.get("clientplayback")
if len(self.master.options.setheaders):
r.append("[")
@ -146,10 +147,10 @@ class StatusBar(urwid.WidgetWrap):
r.append("[")
r.append(("heading_key", "R"))
r.append("eplacing]")
if self.master.client_playback:
if creplay.count():
r.append("[")
r.append(("heading_key", "cplayback"))
r.append(":%s]" % self.master.client_playback.count())
r.append(":%s]" % creplay.count())
if sreplay.count():
r.append("[")
r.append(("heading_key", "splayback"))

View File

@ -53,12 +53,6 @@ class DumpMaster(flow.FlowMaster):
"HTTP/2 is disabled. Use --no-http2 to silence this warning.",
file=sys.stderr)
if options.client_replay:
self.start_client_playback(
self._readflow(options.client_replay),
not options.keepserving
)
if options.rfile:
try:
self.load_flows_file(options.rfile)

View File

@ -117,3 +117,7 @@ class OptionsError(Exception):
class AddonError(Exception):
pass
class ReplayError(Exception):
pass

View File

@ -4,14 +4,12 @@ from mitmproxy.flow import export, modules
from mitmproxy.flow.io import FlowWriter, FilteredFlowWriter, FlowReader, read_flows_from_paths
from mitmproxy.flow.master import FlowMaster
from mitmproxy.flow.modules import (
AppRegistry, StreamLargeBodies, ClientPlaybackState
AppRegistry, StreamLargeBodies
)
from mitmproxy.flow.state import State, FlowView
__all__ = [
"export", "modules",
"FlowWriter", "FilteredFlowWriter", "FlowReader", "read_flows_from_paths",
"FlowMaster",
"AppRegistry", "StreamLargeBodies", "ClientPlaybackState",
"State", "FlowView",
"FlowMaster", "AppRegistry", "StreamLargeBodies", "State", "FlowView",
]

View File

@ -53,7 +53,6 @@ class FlowMaster(controller.Master):
if server:
self.add_server(server)
self.state = state
self.client_playback = None # type: Optional[modules.ClientPlaybackState]
self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies]
self.apps = modules.AppRegistry()
@ -70,29 +69,7 @@ class FlowMaster(controller.Master):
else:
self.stream_large_bodies = False
def start_client_playback(self, flows, exit):
"""
flows: List of flows.
"""
self.client_playback = modules.ClientPlaybackState(flows, exit)
def stop_client_playback(self):
self.client_playback = None
def tick(self, timeout):
if self.client_playback:
stop = (
self.client_playback.done() and
self.state.active_flow_count() == 0
)
exit = self.client_playback.exit
if stop:
self.stop_client_playback()
if exit:
self.shutdown()
else:
self.client_playback.tick(self)
return super(FlowMaster, self).tick(timeout)
def duplicate_flow(self, f):
@ -168,35 +145,47 @@ class FlowMaster(controller.Master):
def replay_request(self, f, block=False):
"""
Returns None if successful, or error message if not.
Returns an http_replay.RequestReplayThred object.
May raise exceptions.ReplayError.
"""
if f.live:
return "Can't replay live request."
if f.intercepted:
return "Can't replay while intercepting..."
if f.request.raw_content is None:
return "Can't replay request with missing content..."
if f.request:
f.backup()
f.request.is_replay = True
# TODO: We should be able to remove this.
if "Content-Length" in f.request.headers:
f.request.headers["Content-Length"] = str(len(f.request.raw_content))
f.response = None
f.error = None
# FIXME: process through all addons?
# self.process_new_request(f)
rt = http_replay.RequestReplayThread(
self.server.config,
f,
self.event_queue,
self.should_exit
raise exceptions.ReplayError(
"Can't replay live flow."
)
rt.start() # pragma: no cover
if block:
rt.join()
if f.intercepted:
raise exceptions.ReplayError(
"Can't replay intercepted flow."
)
if f.request.raw_content is None:
raise exceptions.ReplayError(
"Can't replay flow with missing content."
)
if not f.request:
raise exceptions.ReplayError(
"Can't replay flow with missing request."
)
f.backup()
f.request.is_replay = True
# TODO: We should be able to remove this.
if "Content-Length" in f.request.headers:
f.request.headers["Content-Length"] = str(len(f.request.raw_content))
f.response = None
f.error = None
# FIXME: process through all addons?
# self.process_new_request(f)
rt = http_replay.RequestReplayThread(
self.server.config,
f,
self.event_queue,
self.should_exit
)
rt.start() # pragma: no cover
if block:
rt.join()
return rt
@controller.handler
def log(self, l):
@ -225,9 +214,6 @@ class FlowMaster(controller.Master):
@controller.handler
def error(self, f):
self.state.update_flow(f)
if self.client_playback:
self.client_playback.clear(f)
return f
@controller.handler
def request(self, f):
@ -245,7 +231,6 @@ class FlowMaster(controller.Master):
return
if f not in self.state.flows: # don't add again on replay
self.state.add_flow(f)
return f
@controller.handler
def responseheaders(self, f):
@ -255,18 +240,14 @@ class FlowMaster(controller.Master):
except netlib.exceptions.HttpException:
f.reply.kill()
return
return f
@controller.handler
def response(self, f):
self.state.update_flow(f)
if self.client_playback:
self.client_playback.clear(f)
return f
@controller.handler
def websockets_handshake(self, f):
return f
pass
def handle_intercept(self, f):
self.state.update_flow(f)

View File

@ -1,6 +1,5 @@
from __future__ import absolute_import, print_function, division
from mitmproxy import controller
from netlib import wsgi
from netlib import version
from netlib.http import http1
@ -45,37 +44,3 @@ class StreamLargeBodies(object):
if not r.raw_content and not (0 <= expected_size <= self.max_size):
# r.stream may already be a callable, which we want to preserve.
r.stream = r.stream or True
class ClientPlaybackState:
def __init__(self, flows, exit):
self.flows, self.exit = flows, exit
self.current = None
self.testing = False # Disables actual replay for testing.
def count(self):
return len(self.flows)
def done(self):
if len(self.flows) == 0 and not self.current:
return True
return False
def clear(self, flow):
"""
A request has returned in some way - if this is the one we're
servicing, go to the next flow.
"""
if flow is self.current:
self.current = None
def tick(self, master):
if self.flows and not self.current:
self.current = self.flows.pop(0).copy()
if not self.testing:
master.replay_request(self.current)
else:
self.current.reply = controller.DummyReply()
master.request(self.current)
if self.current.response:
master.response(self.current)

View File

@ -0,0 +1,37 @@
import mock
from mitmproxy.builtins import clientplayback
from mitmproxy import options
from .. import tutils, mastertest
class TestClientPlayback:
def test_playback(self):
cp = clientplayback.ClientPlayback()
cp.configure(options.Options(), [])
assert cp.count() == 0
f = tutils.tflow(resp=True)
cp.load([f])
assert cp.count() == 1
RP = "mitmproxy.protocol.http_replay.RequestReplayThread"
with mock.patch(RP) as rp:
assert not cp.current
with mastertest.mockctx():
cp.tick()
rp.assert_called()
assert cp.current
cp.keepserving = False
cp.flows = None
cp.current = None
with mock.patch("mitmproxy.controller.Master.shutdown") as sd:
with mastertest.mockctx():
cp.tick()
sd.assert_called()
def test_configure(self):
cp = clientplayback.ClientPlayback()
cp.configure(
options.Options(), []
)

View File

@ -1,8 +1,10 @@
import contextlib
from . import tutils
import netlib.tutils
from mitmproxy.flow import master
from mitmproxy import flow, proxy, models, controller
from mitmproxy import flow, proxy, models, controller, options
class TestMaster:
@ -47,3 +49,12 @@ class RecordingMaster(master.FlowMaster):
def add_log(self, e, level):
self.event_log.append((level, e))
@contextlib.contextmanager
def mockctx():
state = flow.State()
o = options.Options(refresh_server_playback = True, keepserving=False)
m = RecordingMaster(o, proxy.DummyServer(o), state)
with m.handlecontext():
yield

View File

@ -45,7 +45,6 @@ class TestDumpMaster(mastertest.MasterTest):
m = dump.DumpMaster(None, o)
f = tutils.tflow(err=True)
m.error(f)
assert m.error(f)
assert "error" in o.tfile.getvalue()
def test_replay(self):

View File

@ -37,39 +37,6 @@ def test_app_registry():
assert ar.get(r)
class TestClientPlaybackState:
def test_tick(self):
first = tutils.tflow()
s = flow.State()
fm = flow.FlowMaster(None, None, s)
fm.start_client_playback([first, tutils.tflow()], True)
c = fm.client_playback
c.testing = True
assert not c.done()
assert not s.flow_count()
assert c.count() == 2
c.tick(fm)
assert s.flow_count()
assert c.count() == 1
c.tick(fm)
assert c.count() == 1
c.clear(c.current)
c.tick(fm)
assert c.count() == 0
c.clear(c.current)
assert c.done()
fm.state.clear()
fm.tick(timeout=0)
fm.stop_client_playback()
assert not fm.client_playback
class TestHTTPFlow(object):
def test_copy(self):
@ -477,13 +444,13 @@ class TestFlowMaster:
fm = flow.FlowMaster(None, None, s)
f = tutils.tflow(resp=True)
f.request.content = None
assert "missing" in fm.replay_request(f)
tutils.raises("missing", fm.replay_request, f)
f.intercepted = True
assert "intercepting" in fm.replay_request(f)
tutils.raises("intercepted", fm.replay_request, f)
f.live = True
assert "live" in fm.replay_request(f)
tutils.raises("live", fm.replay_request, f)
def test_duplicate_flow(self):
s = flow.State()
@ -521,26 +488,6 @@ class TestFlowMaster:
fm.shutdown()
def test_client_playback(self):
s = flow.State()
f = tutils.tflow(resp=True)
pb = [tutils.tflow(resp=True), f]
fm = flow.FlowMaster(
options.Options(),
DummyServer(ProxyConfig(options.Options())),
s
)
assert not fm.start_client_playback(pb, False)
fm.client_playback.testing = True
assert not fm.state.flow_count()
fm.tick(0)
assert fm.state.flow_count()
f.error = Error("error")
fm.error(f)
class TestRequest:

View File

@ -60,7 +60,7 @@ class CommonMixin:
# Disconnect error
l.request.path = "/p/305:d0"
rt = self.master.replay_request(l, block=True)
assert not rt
assert rt
if isinstance(self, tservers.HTTPUpstreamProxyTest):
assert l.response.status_code == 502
else:
@ -72,7 +72,7 @@ class CommonMixin:
# In upstream mode with ssl, the replay will fail as we cannot establish
# SSL with the upstream proxy.
rt = self.master.replay_request(l, block=True)
assert not rt
assert rt
if isinstance(self, tservers.HTTPUpstreamProxyTest):
assert l.response.status_code == 502
else: