From ed8249023fb7c0d429b9278c63b51ac071700987 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 26 Nov 2014 04:18:21 +0100 Subject: [PATCH] introduce revised views, port over changes from multiple_views branch --- libmproxy/console/__init__.py | 8 +- libmproxy/flow.py | 263 ++++++++++++++++------ libmproxy/protocol/http.py | 5 +- libmproxy/web/__init__.py | 38 +++- libmproxy/web/app.py | 44 ++-- libmproxy/web/static/js/app.js | 312 +++++++++++++++++--------- test/test_console.py | 8 +- test/test_flow.py | 40 ++-- test/test_server.py | 20 +- web/src/js/app.js | 8 +- web/src/js/components/mainview.jsx.js | 12 +- web/src/js/connection.js | 51 +++-- web/src/js/stores/flowstore.js | 241 +++++++++++++------- 13 files changed, 701 insertions(+), 349 deletions(-) diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index e6bc9b41c..38a167518 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -277,16 +277,16 @@ class ConsoleState(flow.State): d = self.flowsettings.get(flow, {}) return d.get(key, default) - def add_request(self, f): - flow.State.add_request(self, f) + def add_flow(self, f): + super(ConsoleState, self).add_flow(f) if self.focus is None: self.set_focus(0) elif self.follow_focus: self.set_focus(len(self.view) - 1) return f - def add_response(self, resp): - f = flow.State.add_response(self, resp) + def update_flow(self, f): + super(ConsoleState, self).update_flow(f) if self.focus is None: self.set_focus(0) return f diff --git a/libmproxy/flow.py b/libmproxy/flow.py index a6bf17d8e..d3ae383e8 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -2,6 +2,7 @@ This module provides more sophisticated flow tracking and provides filtering and interception facilities. """ from __future__ import absolute_import +from abc import abstractmethod, ABCMeta import hashlib import Cookie import cookielib @@ -338,80 +339,216 @@ class StickyAuthState: f.request.headers["authorization"] = self.hosts[host] -class State(object): - def __init__(self): - self._flow_list = [] - self.view = [] +class FlowList(object): + __metaclass__ = ABCMeta - # These are compiled filt expressions: - self._limit = None - self.intercept = None + def __iter__(self): + return iter(self._list) - @property - def limit_txt(self): - if self._limit: - return self._limit.pattern - else: - return None + def __contains__(self, item): + return item in self._list - def flow_count(self): - return len(self._flow_list) + def __getitem__(self, item): + return self._list[item] + + def __nonzero__(self): + return bool(self._list) + + def __len__(self): + return len(self._list) def index(self, f): - return self._flow_list.index(f) + return self._list.index(f) - def active_flow_count(self): + @abstractmethod + def _add(self, f): + return + + @abstractmethod + def _update(self, f): + return + + @abstractmethod + def _remove(self, f): + return + + +class FlowView(FlowList): + def __init__(self, store, filt=None): + self._list = [] + if not filt: + filt = lambda flow: True + self._build(store, filt) + + self.store = store + self.store.views.append(self) + + def _close(self): + self.store.views.remove(self) + + def _build(self, flows, filt=None): + if filt: + self.filt = filt + self._list = list(filter(self.filt, flows)) + + def _add(self, f): + if self.filt(f): + self._list.append(f) + + def _update(self, f): + if f not in self._list: + self._add(f) + elif not self.filt(f): + self._remove(f) + + def _remove(self, f): + if f in self._list: + self._list.remove(f) + + def _recalculate(self, flows): + self._build(flows) + + +class FlowStore(FlowList): + """ + Responsible for handling flows in the state: + Keeps a list of all flows and provides views on them. + """ + def __init__(self): + self._list = [] + self._set = set() # Used for O(1) lookups + self.views = [] + self._recalculate_views() + + def __contains__(self, f): + return f in self._set + + def _add(self, f): + """ + Adds a flow to the state. + The flow to add must not be present in the state. + """ + self._list.append(f) + self._set.add(f) + for view in self.views: + view._add(f) + + def _update(self, f): + """ + Notifies the state that a flow has been updated. + The flow must be present in the state. + """ + for view in self.views: + view._update(f) + + def _remove(self, f): + """ + Deletes a flow from the state. + The flow must be present in the state. + """ + self._list.remove(f) + self._set.remove(f) + for view in self.views: + view._remove(f) + + # Expensive bulk operations + + def _extend(self, flows): + """ + Adds a list of flows to the state. + The list of flows to add must not contain flows that are already in the state. + """ + self._list.extend(flows) + self._set.update(flows) + self._recalculate_views() + + def _clear(self): + self._list = [] + self._set = set() + self._recalculate_views() + + def _recalculate_views(self): + """ + Expensive operation: Recalculate all the views after a bulk change. + """ + for view in self.views: + view._recalculate(self) + + # Utility functions. + # There are some common cases where we need to argue about all flows + # irrespective of filters on the view etc (i.e. on shutdown). + + def active_count(self): c = 0 - for i in self._flow_list: + for i in self._list: if not i.response and not i.error: c += 1 return c - def add_request(self, flow): - """ - Add a request to the state. Returns the matching flow. - """ - if flow in self._flow_list: # catch flow replay - return flow - self._flow_list.append(flow) - if flow.match(self._limit): - self.view.append(flow) - return flow + # TODO: Should accept_all operate on views or on all flows? + def accept_all(self): + for f in self._list: + f.accept_intercept() - def add_response(self, f): + def kill_all(self, master): + for f in self._list: + f.kill(master) + + +class State(object): + def __init__(self): + self.flows = FlowStore() + self.view = FlowView(self.flows, None) + + # These are compiled filt expressions: + self.intercept = None + + @property + def limit_txt(self): + return getattr(self.view.filt, "pattern", None) + + def flow_count(self): + return len(self.flows) + + # TODO: All functions regarding flows that don't cause side-effects should be moved into FlowStore. + def index(self, f): + return self.flows.index(f) + + def active_flow_count(self): + return self.flows.active_count() + + def add_flow(self, f): """ - Add a response to the state. Returns the matching flow. + Add a request to the state. """ - if not f: - return False - if f.match(self._limit) and not f in self.view: - self.view.append(f) + self.flows._add(f) return f - def add_error(self, f): + def update_flow(self, f): """ - Add an error response to the state. Returns the matching flow, or - None if there isn't one. + Add a response to the state. """ - if not f: - return None - if f.match(self._limit) and not f in self.view: - self.view.append(f) + self.flows._update(f) return f + def delete_flow(self, f): + self.flows._remove(f) + def load_flows(self, flows): - self._flow_list.extend(flows) - self.recalculate_view() + self.flows._extend(flows) def set_limit(self, txt): + if txt == self.limit_txt: + return if txt: f = filt.parse(txt) if not f: return "Invalid filter expression." - self._limit = f + self.view._close() + self.view = FlowView(self.flows, f) else: - self._limit = None - self.recalculate_view() + self.view._close() + self.view = FlowView(self.flows, None) def set_intercept(self, txt): if txt: @@ -419,37 +556,24 @@ class State(object): if not f: return "Invalid filter expression." self.intercept = f - self.intercept_txt = txt else: self.intercept = None - self.intercept_txt = None - def recalculate_view(self): - if self._limit: - self.view = [i for i in self._flow_list if i.match(self._limit)] - else: - self.view = self._flow_list[:] - - def delete_flow(self, f): - self._flow_list.remove(f) - if f in self.view: - self.view.remove(f) - return True + @property + def intercept_txt(self): + return getattr(self.intercept, "pattern", None) def clear(self): - for i in self._flow_list[:]: - self.delete_flow(i) + self.flows._clear() def accept_all(self): - for i in self._flow_list[:]: - i.accept_intercept() + self.flows.accept_all() def revert(self, f): f.revert() def killall(self, master): - for i in self._flow_list: - i.kill(master) + self.flows.kill_all(master) class FlowMaster(controller.Master): @@ -716,7 +840,7 @@ class FlowMaster(controller.Master): sc.reply() def handle_error(self, f): - self.state.add_error(f) + self.state.update_flow(f) self.run_script_hook("error", f) if self.client_playback: self.client_playback.clear(f) @@ -736,7 +860,8 @@ class FlowMaster(controller.Master): self.add_event("Error in wsgi app. %s"%err, "error") f.reply(protocol.KILL) return - self.state.add_request(f) + if f not in self.state.flows: # don't add again on replay + self.state.add_flow(f) self.replacehooks.run(f) self.setheaders.run(f) self.run_script_hook("request", f) @@ -757,7 +882,7 @@ class FlowMaster(controller.Master): return f def handle_response(self, f): - self.state.add_response(f) + self.state.update_flow(f) self.replacehooks.run(f) self.setheaders.run(f) self.run_script_hook("response", f) @@ -772,7 +897,7 @@ class FlowMaster(controller.Master): self.unload_scripts() controller.Master.shutdown(self) if self.stream: - for i in self.state._flow_list: + for i in self.state.flows: if not i.response: self.stream.add(i) self.stop_stream() diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py index 26a94040c..b32a55ed7 100644 --- a/libmproxy/protocol/http.py +++ b/libmproxy/protocol/http.py @@ -117,7 +117,10 @@ class HTTPMessage(stateobject.StateObject): def get_state(self, short=False): ret = super(HTTPMessage, self).get_state(short) if short: - ret["contentLength"] = len(self.content) + if self.content: + ret["contentLength"] = len(self.content) + else: + ret["contentLength"] = 0 return ret def get_decoded_content(self): diff --git a/libmproxy/web/__init__.py b/libmproxy/web/__init__.py index 69971436d..f762466ae 100644 --- a/libmproxy/web/__init__.py +++ b/libmproxy/web/__init__.py @@ -9,9 +9,32 @@ class Stop(Exception): pass +class WebFlowView(flow.FlowView): + def __init__(self, store): + super(WebFlowView, self).__init__(store, None) + + def _add(self, f): + super(WebFlowView, self)._add(f) + app.FlowUpdates.broadcast("add", f.get_state(short=True)) + + def _update(self, f): + super(WebFlowView, self)._update(f) + app.FlowUpdates.broadcast("update", f.get_state(short=True)) + + def _remove(self, f): + super(WebFlowView, self)._remove(f) + app.FlowUpdates.broadcast("remove", f.get_state(short=True)) + + def _recalculate(self, flows): + super(WebFlowView, self)._recalculate(flows) + app.FlowUpdates.broadcast("recalculate", None) + + class WebState(flow.State): def __init__(self): - flow.State.__init__(self) + super(WebState, self).__init__() + self.view._close() + self.view = WebFlowView(self.flows) class Options(object): @@ -58,8 +81,8 @@ class Options(object): class WebMaster(flow.FlowMaster): def __init__(self, server, options): self.options = options - self.app = app.Application(self.options.wdebug) super(WebMaster, self).__init__(server, WebState()) + self.app = app.Application(self.state, self.options.wdebug) self.last_log_id = 0 @@ -83,24 +106,17 @@ class WebMaster(flow.FlowMaster): self.shutdown() def handle_request(self, f): - app.ClientConnection.broadcast("add_flow", f.get_state(True)) - flow.FlowMaster.handle_request(self, f) + super(WebMaster, self).handle_request(f) if f: f.reply() return f def handle_response(self, f): - app.ClientConnection.broadcast("update_flow", f.get_state(True)) - flow.FlowMaster.handle_response(self, f) + super(WebMaster, self).handle_response(f) if f: f.reply() return f - def handle_error(self, f): - app.ClientConnection.broadcast("update_flow", f.get_state(True)) - flow.FlowMaster.handle_error(self, f) - return f - def handle_log(self, l): self.last_log_id += 1 app.ClientConnection.broadcast( diff --git a/libmproxy/web/app.py b/libmproxy/web/app.py index e2765a6d8..4fdff7837 100644 --- a/libmproxy/web/app.py +++ b/libmproxy/web/app.py @@ -3,6 +3,7 @@ import tornado.web import tornado.websocket import logging import json +from .. import flow class IndexHandler(tornado.web.RequestHandler): @@ -10,36 +11,53 @@ class IndexHandler(tornado.web.RequestHandler): self.render("index.html") -class ClientConnection(tornado.websocket.WebSocketHandler): - connections = set() +class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler): + connections = None # raise an error if inherited class doesn't specify its own instance. def open(self): - ClientConnection.connections.add(self) + self.connections.add(self) def on_close(self): - ClientConnection.connections.remove(self) + self.connections.remove(self) @classmethod def broadcast(cls, type, data): + message = json.dumps( + { + "type": type, + "data": data + } + ) for conn in cls.connections: try: - conn.write_message( - json.dumps( - { - "type": type, - "data": data - } - ) - ) + conn.write_message(message) except: logging.error("Error sending message", exc_info=True) +class FlowsHandler(tornado.web.RequestHandler): + def get(self): + self.write(dict( + flows=[f.get_state(short=True) for f in self.application.state.flows] + )) + + +class FlowUpdates(WebSocketEventBroadcaster): + connections = set() + + +class ClientConnection(WebSocketEventBroadcaster): + connections = set() + + class Application(tornado.web.Application): - def __init__(self, debug): + def __init__(self, state, debug): + self.state = state handlers = [ (r"/", IndexHandler), (r"/updates", ClientConnection), + (r"/flows", FlowsHandler), + (r"/flows/updates", FlowUpdates), ] settings = dict( template_path=os.path.join(os.path.dirname(__file__), "templates"), diff --git a/libmproxy/web/static/js/app.js b/libmproxy/web/static/js/app.js index fe317d7f0..ddbb14f4e 100644 --- a/libmproxy/web/static/js/app.js +++ b/libmproxy/web/static/js/app.js @@ -335,132 +335,216 @@ _.extend(_EventLogStore.prototype, EventEmitter.prototype, { var EventLogStore = new _EventLogStore(); AppDispatcher.register(EventLogStore.handle.bind(EventLogStore)); -function FlowView(store, live) { - EventEmitter.call(this); - this._store = store; - this.live = live; - this.flows = []; - - this.add = this.add.bind(this); - this.update = this.update.bind(this); - - if (live) { - this._store.addListener(ActionTypes.ADD_FLOW, this.add); - this._store.addListener(ActionTypes.UPDATE_FLOW, this.update); +function FlowStore(endpoint) { + this._views = []; + this.reset(); +} +_.extend(FlowStore.prototype, { + add: function (flow) { + this._pos_map[flow.id] = this._flow_list.length; + this._flow_list.push(flow); + for (var i = 0; i < this._views.length; i++) { + this._views[i].add(flow); + } + }, + update: function (flow) { + this._flow_list[this._pos_map[flow.id]] = flow; + for (var i = 0; i < this._views.length; i++) { + this._views[i].update(flow); + } + }, + remove: function (flow_id) { + this._flow_list.splice(this._pos_map[flow_id], 1); + this._build_map(); + for (var i = 0; i < this._views.length; i++) { + this._views[i].remove(flow_id); + } + }, + reset: function (flows) { + this._flow_list = flows || []; + this._build_map(); + for (var i = 0; i < this._views.length; i++) { + this._views[i].recalculate(this._flow_list); + } + }, + _build_map: function () { + this._pos_map = {}; + for (var i = 0; i < this._flow_list.length; i++) { + var flow = this._flow_list[i]; + this._pos_map[flow.id] = i; + } + }, + open_view: function (filt, sort) { + var view = new FlowView(this._flow_list, filt, sort); + this._views.push(view); + return view; + }, + close_view: function (view) { + this._views = _.without(this._views, view); } +}); + + +function LiveFlowStore(endpoint) { + FlowStore.call(this); + this.updates_before_init = []; // (empty array is true in js) + this.endpoint = endpoint || "/flows"; + this.conn = new Connection(this.endpoint + "/updates"); + this.conn.onopen = this._onopen.bind(this); + this.conn.onmessage = function (e) { + var message = JSON.parse(e.data); + this.handle_update(message.type, message.data); + }.bind(this); +} +_.extend(LiveFlowStore.prototype, FlowStore.prototype, { + handle_update: function (type, data) { + console.log("LiveFlowStore.handle_update", type, data); + if (this.updates_before_init) { + console.log("defer update", type, data); + this.updates_before_init.push(arguments); + } else { + this[type](data); + } + }, + handle_fetch: function (data) { + console.log("Flows fetched."); + this.reset(data.flows); + var updates = this.updates_before_init; + this.updates_before_init = false; + for (var i = 0; i < updates.length; i++) { + this.handle_update.apply(this, updates[i]); + } + }, + _onopen: function () { + //Update stream openend, fetch list of flows. + console.log("Update Connection opened, fetching flows..."); + $.getJSON(this.endpoint, this.handle_fetch.bind(this)); + }, +}); + +function SortByInsertionOrder() { + this.i = 0; + this.map = {}; + this.key = this.key.bind(this); +} +SortByInsertionOrder.prototype.key = function (flow) { + if (!(flow.id in this.map)) { + this.i++; + this.map[flow.id] = this.i; + } + return this.map[flow.id]; +}; + +var default_sort = (new SortByInsertionOrder()).key; + +function FlowView(flows, filt, sort) { + EventEmitter.call(this); + filt = filt || function (flow) { + return true; + }; + sort = sort || default_sort; + this.recalculate(flows, filt, sort); } _.extend(FlowView.prototype, EventEmitter.prototype, { - close: function () { - this._store.removeListener(ActionTypes.ADD_FLOW, this.add); - this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update); - }, - getAll: function () { - return this.flows; + recalculate: function (flows, filt, sort) { + if (filt) { + this.filt = filt; + } + if (sort) { + this.sort = sort; + } + this.flows = flows.filter(this.filt); + this.flows.sort(function (a, b) { + return this.sort(a) - this.sort(b); + }.bind(this)); + this.emit("recalculate"); }, add: function (flow) { - return this.update(flow); - }, - add_bulk: function (flows) { - //Treat all previously received updates as newer than the bulk update. - //If they weren't newer, we're about to receive an update for them very soon. - var updates = this.flows; - this.flows = flows; - updates.forEach(function(flow){ - this._update(flow); - }.bind(this)); - this.emit("change"); - }, - _update: function(flow){ - var idx = _.findIndex(this.flows, function(f){ - return flow.id === f.id; - }); - - if(idx < 0){ - this.flows.push(flow); - //if(this.flows.length > 100){ - // this.flows.shift(); - //} - } else { - this.flows[idx] = flow; + if (this.filt(flow)) { + var idx = _.sortedIndex(this.flows, flow, this.sort); + if (idx === this.flows.length) { //happens often, .push is way faster. + this.flows.push(flow); + } else { + this.flows.splice(idx, 0, flow); + } + this.emit("add", flow, idx); } }, - update: function(flow){ - this._update(flow); - this.emit("change"); - }, -}); - - -function _FlowStore() { - EventEmitter.call(this); -} -_.extend(_FlowStore.prototype, EventEmitter.prototype, { - getView: function (since) { - var view = new FlowView(this, !since); - - $.getJSON("/static/flows.json", function(flows){ - flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows)); - var id = 1; - flows.forEach(function(flow){ - flow.id = "uuid-" + id++; - }); - view.add_bulk(flows); - - }); - - return view; - }, - handle: function (action) { - switch (action.type) { - case ActionTypes.ADD_FLOW: - case ActionTypes.UPDATE_FLOW: - this.emit(action.type, action.data); + update: function (flow) { + var idx; + var i = this.flows.length; + // Search from the back, we usually update the latest flows. + while (i--) { + if (this.flows[i].id === flow.id) { + idx = i; break; - default: - return; + } + } + + if (idx === -1) { //not contained in list + this.add(flow); + } else if (!this.filt(flow)) { + this.remove(flow.id); + } else { + if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed + this.remove(this.flows[idx]); + this.add(flow); + } else { + this.flows[idx] = flow; + this.emit("update", flow, idx); + } + } + }, + remove: function (flow_id) { + var i = this.flows.length; + while (i--) { + if (this.flows[i].id === flow_id) { + this.flows.splice(i, 1); + this.emit("remove", flow_id, i); + break; + } } } }); - - -var FlowStore = new _FlowStore(); -AppDispatcher.register(FlowStore.handle.bind(FlowStore)); - -function _Connection(url) { - this.url = url; +function Connection(url) { + if(url[0] != "/"){ + this.url = url; + } else { + this.url = location.origin.replace("http", "ws") + url; + } + var ws = new WebSocket(this.url); + ws.onopen = function(){ + this.onopen.apply(this, arguments); + }.bind(this); + ws.onmessage = function(){ + this.onmessage.apply(this, arguments); + }.bind(this); + ws.onerror = function(){ + this.onerror.apply(this, arguments); + }.bind(this); + ws.onclose = function(){ + this.onclose.apply(this, arguments); + }.bind(this); + this.ws = ws; } -_Connection.prototype.init = function () { - this.openWebSocketConnection(); -}; -_Connection.prototype.openWebSocketConnection = function () { - this.ws = new WebSocket(this.url.replace("http", "ws")); - var ws = this.ws; - - ws.onopen = this.onopen.bind(this); - ws.onmessage = this.onmessage.bind(this); - ws.onerror = this.onerror.bind(this); - ws.onclose = this.onclose.bind(this); -}; -_Connection.prototype.onopen = function (open) { +Connection.prototype.onopen = function (open) { console.debug("onopen", this, arguments); }; -_Connection.prototype.onmessage = function (message) { - //AppDispatcher.dispatchServerAction(...); - var m = JSON.parse(message.data); - AppDispatcher.dispatchServerAction(m); +Connection.prototype.onmessage = function (message) { + console.warn("onmessage (not implemented)", this, message.data); }; -_Connection.prototype.onerror = function (error) { +Connection.prototype.onerror = function (error) { EventLogActions.add_event("WebSocket Connection Error."); console.debug("onerror", this, arguments); }; -_Connection.prototype.onclose = function (close) { +Connection.prototype.onclose = function (close) { EventLogActions.add_event("WebSocket Connection closed."); console.debug("onclose", this, arguments); }; - -var Connection = new _Connection(location.origin + "/updates"); - +Connection.prototype.close = function(){ + this.ws.close(); +}; /** @jsx React.DOM */ //React utils. For other utilities, see ../utils.js @@ -1214,8 +1298,14 @@ var MainView = React.createClass({displayName: 'MainView', }; }, componentDidMount: function () { - this.flowStore = FlowStore.getView(); - this.flowStore.addListener("change",this.onFlowChange); + //FIXME: The store should be global, move out of here. + window.flowstore = new LiveFlowStore(); + + this.flowStore = window.flowstore.open_view(); + this.flowStore.addListener("add",this.onFlowChange); + this.flowStore.addListener("update",this.onFlowChange); + this.flowStore.addListener("remove",this.onFlowChange); + this.flowStore.addListener("recalculate",this.onFlowChange); }, componentWillUnmount: function () { this.flowStore.removeListener("change",this.onFlowChange); @@ -1223,7 +1313,7 @@ var MainView = React.createClass({displayName: 'MainView', }, onFlowChange: function () { this.setState({ - flows: this.flowStore.getAll() + flows: this.flowStore.flows }); }, selectDetailTab: function(panel) { @@ -1518,7 +1608,11 @@ var ProxyApp = ( ) ); $(function () { - Connection.init(); - app = React.renderComponent(ProxyApp, document.body); + window.app = React.renderComponent(ProxyApp, document.body); + var UpdateConnection = new Connection("/updates"); + UpdateConnection.onmessage = function (message) { + var m = JSON.parse(message.data); + AppDispatcher.dispatchServerAction(m); + }; }); //# sourceMappingURL=app.js.map \ No newline at end of file diff --git a/test/test_console.py b/test/test_console.py index 3b6c941d3..d66bd8b0e 100644 --- a/test/test_console.py +++ b/test/test_console.py @@ -15,7 +15,7 @@ class TestConsoleState: """ c = console.ConsoleState() f = self._add_request(c) - assert f in c._flow_list + assert f in c.flows assert c.get_focus() == (f, 0) def test_focus(self): @@ -52,19 +52,19 @@ class TestConsoleState: def _add_request(self, state): f = tutils.tflow() - return state.add_request(f) + return state.add_flow(f) def _add_response(self, state): f = self._add_request(state) f.response = tutils.tresp() - state.add_response(f) + state.update_flow(f) def test_add_response(self): c = console.ConsoleState() f = self._add_request(c) f.response = tutils.tresp() c.focus = None - c.add_response(f) + c.update_flow(f) def test_focus_view(self): c = console.ConsoleState() diff --git a/test/test_flow.py b/test/test_flow.py index 22abb4d41..fdfac62fb 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -364,7 +364,7 @@ class TestState: def test_backup(self): c = flow.State() f = tutils.tflow() - c.add_request(f) + c.add_flow(f) f.backup() c.revert(f) @@ -376,42 +376,42 @@ class TestState: """ c = flow.State() f = tutils.tflow() - c.add_request(f) + c.add_flow(f) assert f assert c.flow_count() == 1 assert c.active_flow_count() == 1 newf = tutils.tflow() - assert c.add_request(newf) + assert c.add_flow(newf) assert c.active_flow_count() == 2 f.response = tutils.tresp() - assert c.add_response(f) + assert c.update_flow(f) assert c.flow_count() == 2 assert c.active_flow_count() == 1 _ = tutils.tresp() - assert not c.add_response(None) + assert not c.update_flow(None) assert c.active_flow_count() == 1 newf.response = tutils.tresp() - assert c.add_response(newf) + assert c.update_flow(newf) assert c.active_flow_count() == 0 def test_err(self): c = flow.State() f = tutils.tflow() - c.add_request(f) + c.add_flow(f) f.error = Error("message") - assert c.add_error(f) + assert c.update_flow(f) c = flow.State() f = tutils.tflow() - c.add_request(f) + c.add_flow(f) c.set_limit("~e") assert not c.view f.error = tutils.terr() - assert c.add_error(f) + assert c.update_flow(f) assert c.view def test_set_limit(self): @@ -420,20 +420,20 @@ class TestState: f = tutils.tflow() assert len(c.view) == 0 - c.add_request(f) + c.add_flow(f) assert len(c.view) == 1 c.set_limit("~s") assert c.limit_txt == "~s" assert len(c.view) == 0 f.response = tutils.tresp() - c.add_response(f) + c.update_flow(f) assert len(c.view) == 1 c.set_limit(None) assert len(c.view) == 1 f = tutils.tflow() - c.add_request(f) + c.add_flow(f) assert len(c.view) == 2 c.set_limit("~q") assert len(c.view) == 1 @@ -452,18 +452,18 @@ class TestState: def _add_request(self, state): f = tutils.tflow() - state.add_request(f) + state.add_flow(f) return f def _add_response(self, state): f = tutils.tflow() - state.add_request(f) + state.add_flow(f) f.response = tutils.tresp() - state.add_response(f) + state.update_flow(f) def _add_error(self, state): f = tutils.tflow(err=True) - state.add_request(f) + state.add_flow(f) def test_clear(self): c = flow.State() @@ -487,7 +487,7 @@ class TestState: c.clear() c.load_flows(flows) - assert isinstance(c._flow_list[0], Flow) + assert isinstance(c.flows[0], Flow) def test_accept_all(self): c = flow.State() @@ -532,7 +532,7 @@ class TestSerialize: s = flow.State() fm = flow.FlowMaster(None, s) fm.load_flows(r) - assert len(s._flow_list) == 6 + assert len(s.flows) == 6 def test_load_flows_reverse(self): r = self._treader() @@ -540,7 +540,7 @@ class TestSerialize: conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80]) fm = flow.FlowMaster(DummyServer(conf), s) fm.load_flows(r) - assert s._flow_list[0].request.host == "use-this-domain" + assert s.flows[0].request.host == "use-this-domain" def test_filter(self): sio = StringIO() diff --git a/test/test_server.py b/test/test_server.py index c81eab2b9..a611d30f3 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -747,19 +747,19 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest): assert req.content == "content" assert req.status_code == 418 - assert not self.chain[1].tmaster.state._flow_list[0].response # killed - assert self.chain[1].tmaster.state._flow_list[1].response + assert not self.chain[1].tmaster.state.flows[0].response # killed + assert self.chain[1].tmaster.state.flows[1].response - assert self.proxy.tmaster.state._flow_list[0].request.form_in == "authority" - assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative" + assert self.proxy.tmaster.state.flows[0].request.form_in == "authority" + assert self.proxy.tmaster.state.flows[1].request.form_in == "relative" - assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority" - assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative" - assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority" - assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative" + assert self.chain[0].tmaster.state.flows[0].request.form_in == "authority" + assert self.chain[0].tmaster.state.flows[1].request.form_in == "relative" + assert self.chain[0].tmaster.state.flows[2].request.form_in == "authority" + assert self.chain[0].tmaster.state.flows[3].request.form_in == "relative" - assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "relative" - assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative" + assert self.chain[1].tmaster.state.flows[0].request.form_in == "relative" + assert self.chain[1].tmaster.state.flows[1].request.form_in == "relative" req = p.request("get:'/p/418:b\"content2\"'") diff --git a/web/src/js/app.js b/web/src/js/app.js index 736072dc9..4ee35d60f 100644 --- a/web/src/js/app.js +++ b/web/src/js/app.js @@ -1,4 +1,8 @@ $(function () { - Connection.init(); - app = React.renderComponent(ProxyApp, document.body); + window.app = React.renderComponent(ProxyApp, document.body); + var UpdateConnection = new Connection("/updates"); + UpdateConnection.onmessage = function (message) { + var m = JSON.parse(message.data); + AppDispatcher.dispatchServerAction(m); + }; }); \ No newline at end of file diff --git a/web/src/js/components/mainview.jsx.js b/web/src/js/components/mainview.jsx.js index 795b8136e..f0dfb59a7 100644 --- a/web/src/js/components/mainview.jsx.js +++ b/web/src/js/components/mainview.jsx.js @@ -7,8 +7,14 @@ var MainView = React.createClass({ }; }, componentDidMount: function () { - this.flowStore = FlowStore.getView(); - this.flowStore.addListener("change",this.onFlowChange); + //FIXME: The store should be global, move out of here. + window.flowstore = new LiveFlowStore(); + + this.flowStore = window.flowstore.open_view(); + this.flowStore.addListener("add",this.onFlowChange); + this.flowStore.addListener("update",this.onFlowChange); + this.flowStore.addListener("remove",this.onFlowChange); + this.flowStore.addListener("recalculate",this.onFlowChange); }, componentWillUnmount: function () { this.flowStore.removeListener("change",this.onFlowChange); @@ -16,7 +22,7 @@ var MainView = React.createClass({ }, onFlowChange: function () { this.setState({ - flows: this.flowStore.getAll() + flows: this.flowStore.flows }); }, selectDetailTab: function(panel) { diff --git a/web/src/js/connection.js b/web/src/js/connection.js index 3edbfc20d..64d550bf4 100644 --- a/web/src/js/connection.js +++ b/web/src/js/connection.js @@ -1,33 +1,38 @@ -function _Connection(url) { - this.url = url; +function Connection(url) { + if(url[0] != "/"){ + this.url = url; + } else { + this.url = location.origin.replace("http", "ws") + url; + } + var ws = new WebSocket(this.url); + ws.onopen = function(){ + this.onopen.apply(this, arguments); + }.bind(this); + ws.onmessage = function(){ + this.onmessage.apply(this, arguments); + }.bind(this); + ws.onerror = function(){ + this.onerror.apply(this, arguments); + }.bind(this); + ws.onclose = function(){ + this.onclose.apply(this, arguments); + }.bind(this); + this.ws = ws; } -_Connection.prototype.init = function () { - this.openWebSocketConnection(); -}; -_Connection.prototype.openWebSocketConnection = function () { - this.ws = new WebSocket(this.url.replace("http", "ws")); - var ws = this.ws; - - ws.onopen = this.onopen.bind(this); - ws.onmessage = this.onmessage.bind(this); - ws.onerror = this.onerror.bind(this); - ws.onclose = this.onclose.bind(this); -}; -_Connection.prototype.onopen = function (open) { +Connection.prototype.onopen = function (open) { console.debug("onopen", this, arguments); }; -_Connection.prototype.onmessage = function (message) { - //AppDispatcher.dispatchServerAction(...); - var m = JSON.parse(message.data); - AppDispatcher.dispatchServerAction(m); +Connection.prototype.onmessage = function (message) { + console.warn("onmessage (not implemented)", this, message.data); }; -_Connection.prototype.onerror = function (error) { +Connection.prototype.onerror = function (error) { EventLogActions.add_event("WebSocket Connection Error."); console.debug("onerror", this, arguments); }; -_Connection.prototype.onclose = function (close) { +Connection.prototype.onclose = function (close) { EventLogActions.add_event("WebSocket Connection closed."); console.debug("onclose", this, arguments); }; - -var Connection = new _Connection(location.origin + "/updates"); +Connection.prototype.close = function(){ + this.ws.close(); +}; \ No newline at end of file diff --git a/web/src/js/stores/flowstore.js b/web/src/js/stores/flowstore.js index 7c0bddbd4..530484410 100644 --- a/web/src/js/stores/flowstore.js +++ b/web/src/js/stores/flowstore.js @@ -1,91 +1,172 @@ -function FlowView(store, live) { - EventEmitter.call(this); - this._store = store; - this.live = live; - this.flows = []; - - this.add = this.add.bind(this); - this.update = this.update.bind(this); - - if (live) { - this._store.addListener(ActionTypes.ADD_FLOW, this.add); - this._store.addListener(ActionTypes.UPDATE_FLOW, this.update); +function FlowStore(endpoint) { + this._views = []; + this.reset(); +} +_.extend(FlowStore.prototype, { + add: function (flow) { + this._pos_map[flow.id] = this._flow_list.length; + this._flow_list.push(flow); + for (var i = 0; i < this._views.length; i++) { + this._views[i].add(flow); + } + }, + update: function (flow) { + this._flow_list[this._pos_map[flow.id]] = flow; + for (var i = 0; i < this._views.length; i++) { + this._views[i].update(flow); + } + }, + remove: function (flow_id) { + this._flow_list.splice(this._pos_map[flow_id], 1); + this._build_map(); + for (var i = 0; i < this._views.length; i++) { + this._views[i].remove(flow_id); + } + }, + reset: function (flows) { + this._flow_list = flows || []; + this._build_map(); + for (var i = 0; i < this._views.length; i++) { + this._views[i].recalculate(this._flow_list); + } + }, + _build_map: function () { + this._pos_map = {}; + for (var i = 0; i < this._flow_list.length; i++) { + var flow = this._flow_list[i]; + this._pos_map[flow.id] = i; + } + }, + open_view: function (filt, sort) { + var view = new FlowView(this._flow_list, filt, sort); + this._views.push(view); + return view; + }, + close_view: function (view) { + this._views = _.without(this._views, view); } +}); + + +function LiveFlowStore(endpoint) { + FlowStore.call(this); + this.updates_before_init = []; // (empty array is true in js) + this.endpoint = endpoint || "/flows"; + this.conn = new Connection(this.endpoint + "/updates"); + this.conn.onopen = this._onopen.bind(this); + this.conn.onmessage = function (e) { + var message = JSON.parse(e.data); + this.handle_update(message.type, message.data); + }.bind(this); +} +_.extend(LiveFlowStore.prototype, FlowStore.prototype, { + handle_update: function (type, data) { + console.log("LiveFlowStore.handle_update", type, data); + if (this.updates_before_init) { + console.log("defer update", type, data); + this.updates_before_init.push(arguments); + } else { + this[type](data); + } + }, + handle_fetch: function (data) { + console.log("Flows fetched."); + this.reset(data.flows); + var updates = this.updates_before_init; + this.updates_before_init = false; + for (var i = 0; i < updates.length; i++) { + this.handle_update.apply(this, updates[i]); + } + }, + _onopen: function () { + //Update stream openend, fetch list of flows. + console.log("Update Connection opened, fetching flows..."); + $.getJSON(this.endpoint, this.handle_fetch.bind(this)); + }, +}); + +function SortByInsertionOrder() { + this.i = 0; + this.map = {}; + this.key = this.key.bind(this); +} +SortByInsertionOrder.prototype.key = function (flow) { + if (!(flow.id in this.map)) { + this.i++; + this.map[flow.id] = this.i; + } + return this.map[flow.id]; +}; + +var default_sort = (new SortByInsertionOrder()).key; + +function FlowView(flows, filt, sort) { + EventEmitter.call(this); + filt = filt || function (flow) { + return true; + }; + sort = sort || default_sort; + this.recalculate(flows, filt, sort); } _.extend(FlowView.prototype, EventEmitter.prototype, { - close: function () { - this._store.removeListener(ActionTypes.ADD_FLOW, this.add); - this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update); - }, - getAll: function () { - return this.flows; + recalculate: function (flows, filt, sort) { + if (filt) { + this.filt = filt; + } + if (sort) { + this.sort = sort; + } + this.flows = flows.filter(this.filt); + this.flows.sort(function (a, b) { + return this.sort(a) - this.sort(b); + }.bind(this)); + this.emit("recalculate"); }, add: function (flow) { - return this.update(flow); - }, - add_bulk: function (flows) { - //Treat all previously received updates as newer than the bulk update. - //If they weren't newer, we're about to receive an update for them very soon. - var updates = this.flows; - this.flows = flows; - updates.forEach(function(flow){ - this._update(flow); - }.bind(this)); - this.emit("change"); - }, - _update: function(flow){ - var idx = _.findIndex(this.flows, function(f){ - return flow.id === f.id; - }); - - if(idx < 0){ - this.flows.push(flow); - //if(this.flows.length > 100){ - // this.flows.shift(); - //} - } else { - this.flows[idx] = flow; + if (this.filt(flow)) { + var idx = _.sortedIndex(this.flows, flow, this.sort); + if (idx === this.flows.length) { //happens often, .push is way faster. + this.flows.push(flow); + } else { + this.flows.splice(idx, 0, flow); + } + this.emit("add", flow, idx); } }, - update: function(flow){ - this._update(flow); - this.emit("change"); - }, -}); - - -function _FlowStore() { - EventEmitter.call(this); -} -_.extend(_FlowStore.prototype, EventEmitter.prototype, { - getView: function (since) { - var view = new FlowView(this, !since); - - $.getJSON("/static/flows.json", function(flows){ - flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows)); - var id = 1; - flows.forEach(function(flow){ - flow.id = "uuid-" + id++; - }); - view.add_bulk(flows); - - }); - - return view; - }, - handle: function (action) { - switch (action.type) { - case ActionTypes.ADD_FLOW: - case ActionTypes.UPDATE_FLOW: - this.emit(action.type, action.data); + update: function (flow) { + var idx; + var i = this.flows.length; + // Search from the back, we usually update the latest flows. + while (i--) { + if (this.flows[i].id === flow.id) { + idx = i; break; - default: - return; + } + } + + if (idx === -1) { //not contained in list + this.add(flow); + } else if (!this.filt(flow)) { + this.remove(flow.id); + } else { + if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed + this.remove(this.flows[idx]); + this.add(flow); + } else { + this.flows[idx] = flow; + this.emit("update", flow, idx); + } + } + }, + remove: function (flow_id) { + var i = this.flows.length; + while (i--) { + if (this.flows[i].id === flow_id) { + this.flows.splice(i, 1); + this.emit("remove", flow_id, i); + break; + } } } -}); - - -var FlowStore = new _FlowStore(); -AppDispatcher.register(FlowStore.handle.bind(FlowStore)); +}); \ No newline at end of file