From 572e8a49629a889129527285e8e2f339fe72df35 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Mon, 9 Jul 2012 10:18:37 +1200 Subject: [PATCH] Add streaming to FlowMaster --- libmproxy/console/__init__.py | 2 +- libmproxy/flow.py | 30 +++++++++++++++++++++++------- test/test_flow.py | 28 +++++++++++++++++++++++++++- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index 727765428..4034c1889 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -928,7 +928,7 @@ class ConsoleMaster(flow.FlowMaster): self.state.killall(self) if self.stream: self.stream.fo.close() - controller.Master.shutdown(self) + flow.FlowMaster.shutdown(self) def sync_list_view(self): self.flow_list_walker._modified() diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 5f5cad4c0..17e88bc11 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -17,7 +17,7 @@ This module provides more sophisticated flow tracking. These match requests with their responses, and provide filtering and interception facilities. """ -import hashlib, Cookie, cookielib, copy, re, urlparse +import hashlib, Cookie, cookielib, copy, re, urlparse, os import time import tnetstring, filt, script, utils, encoding, proxy from email.utils import parsedate_tz, formatdate, mktime_tz @@ -1200,6 +1200,8 @@ class FlowMaster(controller.Master): self.refresh_server_playback = False self.replacehooks = ReplaceHooks() + self.stream = None + def add_event(self, e, level="info"): """ level: info, error @@ -1415,21 +1417,35 @@ class FlowMaster(controller.Master): def handle_response(self, r): f = self.state.add_response(r) - self.replacehooks.run(f) if f: + self.replacehooks.run(f) self.run_script_hook("response", f) - if self.client_playback: - self.client_playback.clear(f) - if not f: - r._ack() - if f: + if self.client_playback: + self.client_playback.clear(f) self.process_new_response(f) + if self.stream: + self.stream.add(f) + else: + r._ack() return f def shutdown(self): if self.script: self.load_script(None) controller.Master.shutdown(self) + if self.stream: + for i in self.state._flow_list: + if not i.response: + self.stream.add(i) + self.stream.fo.close() + self.stop_stream() + + def start_stream(self, fp): + self.stream = FlowWriter(fp) + + def stop_stream(self): + self.stream = None + class FlowWriter: diff --git a/test/test_flow.py b/test/test_flow.py index 170b16bb7..99df9ed0e 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -1,4 +1,4 @@ -import Queue, time +import Queue, time, os.path from cStringIO import StringIO import email.utils from libmproxy import filt, flow, controller, utils, tnetstring @@ -689,6 +689,32 @@ class TestFlowMaster: fm.handle_request(f.request) assert f.request.headers["authorization"] == ["foo"] + def test_stream(self): + with tutils.tmpdir() as tdir: + p = os.path.join(tdir, "foo") + def r(): + r = flow.FlowReader(open(p)) + return list(r.stream()) + + s = flow.State() + fm = flow.FlowMaster(None, s) + tf = tutils.tflow_full() + + fm.start_stream(file(p, "ab")) + fm.handle_request(tf.request) + fm.handle_response(tf.response) + fm.stop_stream() + + assert r()[0].response + + tf = tutils.tflow_full() + fm.start_stream(file(p, "ab")) + fm.handle_request(tf.request) + fm.shutdown() + + assert not r()[1].response + + class TestRequest: def test_simple(self): h = flow.ODictCaseless()