Add streaming to FlowMaster

This commit is contained in:
Aldo Cortesi 2012-07-09 10:18:37 +12:00
parent 4b6fdc92dc
commit 572e8a4962
3 changed files with 51 additions and 9 deletions

View File

@ -928,7 +928,7 @@ class ConsoleMaster(flow.FlowMaster):
self.state.killall(self) self.state.killall(self)
if self.stream: if self.stream:
self.stream.fo.close() self.stream.fo.close()
controller.Master.shutdown(self) flow.FlowMaster.shutdown(self)
def sync_list_view(self): def sync_list_view(self):
self.flow_list_walker._modified() self.flow_list_walker._modified()

View File

@ -17,7 +17,7 @@
This module provides more sophisticated flow tracking. These match requests This module provides more sophisticated flow tracking. These match requests
with their responses, and provide filtering and interception facilities. with their responses, and provide filtering and interception facilities.
""" """
import hashlib, Cookie, cookielib, copy, re, urlparse import hashlib, Cookie, cookielib, copy, re, urlparse, os
import time import time
import tnetstring, filt, script, utils, encoding, proxy import tnetstring, filt, script, utils, encoding, proxy
from email.utils import parsedate_tz, formatdate, mktime_tz from email.utils import parsedate_tz, formatdate, mktime_tz
@ -1200,6 +1200,8 @@ class FlowMaster(controller.Master):
self.refresh_server_playback = False self.refresh_server_playback = False
self.replacehooks = ReplaceHooks() self.replacehooks = ReplaceHooks()
self.stream = None
def add_event(self, e, level="info"): def add_event(self, e, level="info"):
""" """
level: info, error level: info, error
@ -1415,21 +1417,35 @@ class FlowMaster(controller.Master):
def handle_response(self, r): def handle_response(self, r):
f = self.state.add_response(r) f = self.state.add_response(r)
self.replacehooks.run(f)
if f: if f:
self.replacehooks.run(f)
self.run_script_hook("response", f) self.run_script_hook("response", f)
if self.client_playback: if self.client_playback:
self.client_playback.clear(f) self.client_playback.clear(f)
if not f:
r._ack()
if f:
self.process_new_response(f) self.process_new_response(f)
if self.stream:
self.stream.add(f)
else:
r._ack()
return f return f
def shutdown(self): def shutdown(self):
if self.script: if self.script:
self.load_script(None) self.load_script(None)
controller.Master.shutdown(self) controller.Master.shutdown(self)
if self.stream:
for i in self.state._flow_list:
if not i.response:
self.stream.add(i)
self.stream.fo.close()
self.stop_stream()
def start_stream(self, fp):
self.stream = FlowWriter(fp)
def stop_stream(self):
self.stream = None
class FlowWriter: class FlowWriter:

View File

@ -1,4 +1,4 @@
import Queue, time import Queue, time, os.path
from cStringIO import StringIO from cStringIO import StringIO
import email.utils import email.utils
from libmproxy import filt, flow, controller, utils, tnetstring from libmproxy import filt, flow, controller, utils, tnetstring
@ -689,6 +689,32 @@ class TestFlowMaster:
fm.handle_request(f.request) fm.handle_request(f.request)
assert f.request.headers["authorization"] == ["foo"] assert f.request.headers["authorization"] == ["foo"]
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
def r():
r = flow.FlowReader(open(p))
return list(r.stream())
s = flow.State()
fm = flow.FlowMaster(None, s)
tf = tutils.tflow_full()
fm.start_stream(file(p, "ab"))
fm.handle_request(tf.request)
fm.handle_response(tf.response)
fm.stop_stream()
assert r()[0].response
tf = tutils.tflow_full()
fm.start_stream(file(p, "ab"))
fm.handle_request(tf.request)
fm.shutdown()
assert not r()[1].response
class TestRequest: class TestRequest:
def test_simple(self): def test_simple(self):
h = flow.ODictCaseless() h = flow.ODictCaseless()