introduce revised views, port over changes from multiple_views branch

This commit is contained in:
Maximilian Hils 2014-11-26 04:18:21 +01:00
parent 47a78e3c72
commit ed8249023f
13 changed files with 701 additions and 349 deletions

View File

@ -277,16 +277,16 @@ class ConsoleState(flow.State):
d = self.flowsettings.get(flow, {}) d = self.flowsettings.get(flow, {})
return d.get(key, default) return d.get(key, default)
def add_request(self, f): def add_flow(self, f):
flow.State.add_request(self, f) super(ConsoleState, self).add_flow(f)
if self.focus is None: if self.focus is None:
self.set_focus(0) self.set_focus(0)
elif self.follow_focus: elif self.follow_focus:
self.set_focus(len(self.view) - 1) self.set_focus(len(self.view) - 1)
return f return f
def add_response(self, resp): def update_flow(self, f):
f = flow.State.add_response(self, resp) super(ConsoleState, self).update_flow(f)
if self.focus is None: if self.focus is None:
self.set_focus(0) self.set_focus(0)
return f return f

View File

@ -2,6 +2,7 @@
This module provides more sophisticated flow tracking and provides filtering and interception facilities. This module provides more sophisticated flow tracking and provides filtering and interception facilities.
""" """
from __future__ import absolute_import from __future__ import absolute_import
from abc import abstractmethod, ABCMeta
import hashlib import hashlib
import Cookie import Cookie
import cookielib import cookielib
@ -338,80 +339,216 @@ class StickyAuthState:
f.request.headers["authorization"] = self.hosts[host] f.request.headers["authorization"] = self.hosts[host]
class State(object): class FlowList(object):
def __init__(self): __metaclass__ = ABCMeta
self._flow_list = []
self.view = []
# These are compiled filt expressions: def __iter__(self):
self._limit = None return iter(self._list)
self.intercept = None
@property def __contains__(self, item):
def limit_txt(self): return item in self._list
if self._limit:
return self._limit.pattern
else:
return None
def flow_count(self): def __getitem__(self, item):
return len(self._flow_list) return self._list[item]
def __nonzero__(self):
return bool(self._list)
def __len__(self):
return len(self._list)
def index(self, f): def index(self, f):
return self._flow_list.index(f) return self._list.index(f)
def active_flow_count(self): @abstractmethod
def _add(self, f):
return
@abstractmethod
def _update(self, f):
return
@abstractmethod
def _remove(self, f):
return
class FlowView(FlowList):
def __init__(self, store, filt=None):
self._list = []
if not filt:
filt = lambda flow: True
self._build(store, filt)
self.store = store
self.store.views.append(self)
def _close(self):
self.store.views.remove(self)
def _build(self, flows, filt=None):
if filt:
self.filt = filt
self._list = list(filter(self.filt, flows))
def _add(self, f):
if self.filt(f):
self._list.append(f)
def _update(self, f):
if f not in self._list:
self._add(f)
elif not self.filt(f):
self._remove(f)
def _remove(self, f):
if f in self._list:
self._list.remove(f)
def _recalculate(self, flows):
self._build(flows)
class FlowStore(FlowList):
"""
Responsible for handling flows in the state:
Keeps a list of all flows and provides views on them.
"""
def __init__(self):
self._list = []
self._set = set() # Used for O(1) lookups
self.views = []
self._recalculate_views()
def __contains__(self, f):
return f in self._set
def _add(self, f):
"""
Adds a flow to the state.
The flow to add must not be present in the state.
"""
self._list.append(f)
self._set.add(f)
for view in self.views:
view._add(f)
def _update(self, f):
"""
Notifies the state that a flow has been updated.
The flow must be present in the state.
"""
for view in self.views:
view._update(f)
def _remove(self, f):
"""
Deletes a flow from the state.
The flow must be present in the state.
"""
self._list.remove(f)
self._set.remove(f)
for view in self.views:
view._remove(f)
# Expensive bulk operations
def _extend(self, flows):
"""
Adds a list of flows to the state.
The list of flows to add must not contain flows that are already in the state.
"""
self._list.extend(flows)
self._set.update(flows)
self._recalculate_views()
def _clear(self):
self._list = []
self._set = set()
self._recalculate_views()
def _recalculate_views(self):
"""
Expensive operation: Recalculate all the views after a bulk change.
"""
for view in self.views:
view._recalculate(self)
# Utility functions.
# There are some common cases where we need to argue about all flows
# irrespective of filters on the view etc (i.e. on shutdown).
def active_count(self):
c = 0 c = 0
for i in self._flow_list: for i in self._list:
if not i.response and not i.error: if not i.response and not i.error:
c += 1 c += 1
return c return c
def add_request(self, flow): # TODO: Should accept_all operate on views or on all flows?
""" def accept_all(self):
Add a request to the state. Returns the matching flow. for f in self._list:
""" f.accept_intercept()
if flow in self._flow_list: # catch flow replay
return flow
self._flow_list.append(flow)
if flow.match(self._limit):
self.view.append(flow)
return flow
def add_response(self, f): def kill_all(self, master):
for f in self._list:
f.kill(master)
class State(object):
def __init__(self):
self.flows = FlowStore()
self.view = FlowView(self.flows, None)
# These are compiled filt expressions:
self.intercept = None
@property
def limit_txt(self):
return getattr(self.view.filt, "pattern", None)
def flow_count(self):
return len(self.flows)
# TODO: All functions regarding flows that don't cause side-effects should be moved into FlowStore.
def index(self, f):
return self.flows.index(f)
def active_flow_count(self):
return self.flows.active_count()
def add_flow(self, f):
""" """
Add a response to the state. Returns the matching flow. Add a request to the state.
""" """
if not f: self.flows._add(f)
return False
if f.match(self._limit) and not f in self.view:
self.view.append(f)
return f return f
def add_error(self, f): def update_flow(self, f):
""" """
Add an error response to the state. Returns the matching flow, or Add a response to the state.
None if there isn't one.
""" """
if not f: self.flows._update(f)
return None
if f.match(self._limit) and not f in self.view:
self.view.append(f)
return f return f
def delete_flow(self, f):
self.flows._remove(f)
def load_flows(self, flows): def load_flows(self, flows):
self._flow_list.extend(flows) self.flows._extend(flows)
self.recalculate_view()
def set_limit(self, txt): def set_limit(self, txt):
if txt == self.limit_txt:
return
if txt: if txt:
f = filt.parse(txt) f = filt.parse(txt)
if not f: if not f:
return "Invalid filter expression." return "Invalid filter expression."
self._limit = f self.view._close()
self.view = FlowView(self.flows, f)
else: else:
self._limit = None self.view._close()
self.recalculate_view() self.view = FlowView(self.flows, None)
def set_intercept(self, txt): def set_intercept(self, txt):
if txt: if txt:
@ -419,37 +556,24 @@ class State(object):
if not f: if not f:
return "Invalid filter expression." return "Invalid filter expression."
self.intercept = f self.intercept = f
self.intercept_txt = txt
else: else:
self.intercept = None self.intercept = None
self.intercept_txt = None
def recalculate_view(self): @property
if self._limit: def intercept_txt(self):
self.view = [i for i in self._flow_list if i.match(self._limit)] return getattr(self.intercept, "pattern", None)
else:
self.view = self._flow_list[:]
def delete_flow(self, f):
self._flow_list.remove(f)
if f in self.view:
self.view.remove(f)
return True
def clear(self): def clear(self):
for i in self._flow_list[:]: self.flows._clear()
self.delete_flow(i)
def accept_all(self): def accept_all(self):
for i in self._flow_list[:]: self.flows.accept_all()
i.accept_intercept()
def revert(self, f): def revert(self, f):
f.revert() f.revert()
def killall(self, master): def killall(self, master):
for i in self._flow_list: self.flows.kill_all(master)
i.kill(master)
class FlowMaster(controller.Master): class FlowMaster(controller.Master):
@ -716,7 +840,7 @@ class FlowMaster(controller.Master):
sc.reply() sc.reply()
def handle_error(self, f): def handle_error(self, f):
self.state.add_error(f) self.state.update_flow(f)
self.run_script_hook("error", f) self.run_script_hook("error", f)
if self.client_playback: if self.client_playback:
self.client_playback.clear(f) self.client_playback.clear(f)
@ -736,7 +860,8 @@ class FlowMaster(controller.Master):
self.add_event("Error in wsgi app. %s"%err, "error") self.add_event("Error in wsgi app. %s"%err, "error")
f.reply(protocol.KILL) f.reply(protocol.KILL)
return return
self.state.add_request(f) if f not in self.state.flows: # don't add again on replay
self.state.add_flow(f)
self.replacehooks.run(f) self.replacehooks.run(f)
self.setheaders.run(f) self.setheaders.run(f)
self.run_script_hook("request", f) self.run_script_hook("request", f)
@ -757,7 +882,7 @@ class FlowMaster(controller.Master):
return f return f
def handle_response(self, f): def handle_response(self, f):
self.state.add_response(f) self.state.update_flow(f)
self.replacehooks.run(f) self.replacehooks.run(f)
self.setheaders.run(f) self.setheaders.run(f)
self.run_script_hook("response", f) self.run_script_hook("response", f)
@ -772,7 +897,7 @@ class FlowMaster(controller.Master):
self.unload_scripts() self.unload_scripts()
controller.Master.shutdown(self) controller.Master.shutdown(self)
if self.stream: if self.stream:
for i in self.state._flow_list: for i in self.state.flows:
if not i.response: if not i.response:
self.stream.add(i) self.stream.add(i)
self.stop_stream() self.stop_stream()

View File

@ -117,7 +117,10 @@ class HTTPMessage(stateobject.StateObject):
def get_state(self, short=False): def get_state(self, short=False):
ret = super(HTTPMessage, self).get_state(short) ret = super(HTTPMessage, self).get_state(short)
if short: if short:
ret["contentLength"] = len(self.content) if self.content:
ret["contentLength"] = len(self.content)
else:
ret["contentLength"] = 0
return ret return ret
def get_decoded_content(self): def get_decoded_content(self):

View File

@ -9,9 +9,32 @@ class Stop(Exception):
pass pass
class WebFlowView(flow.FlowView):
def __init__(self, store):
super(WebFlowView, self).__init__(store, None)
def _add(self, f):
super(WebFlowView, self)._add(f)
app.FlowUpdates.broadcast("add", f.get_state(short=True))
def _update(self, f):
super(WebFlowView, self)._update(f)
app.FlowUpdates.broadcast("update", f.get_state(short=True))
def _remove(self, f):
super(WebFlowView, self)._remove(f)
app.FlowUpdates.broadcast("remove", f.get_state(short=True))
def _recalculate(self, flows):
super(WebFlowView, self)._recalculate(flows)
app.FlowUpdates.broadcast("recalculate", None)
class WebState(flow.State): class WebState(flow.State):
def __init__(self): def __init__(self):
flow.State.__init__(self) super(WebState, self).__init__()
self.view._close()
self.view = WebFlowView(self.flows)
class Options(object): class Options(object):
@ -58,8 +81,8 @@ class Options(object):
class WebMaster(flow.FlowMaster): class WebMaster(flow.FlowMaster):
def __init__(self, server, options): def __init__(self, server, options):
self.options = options self.options = options
self.app = app.Application(self.options.wdebug)
super(WebMaster, self).__init__(server, WebState()) super(WebMaster, self).__init__(server, WebState())
self.app = app.Application(self.state, self.options.wdebug)
self.last_log_id = 0 self.last_log_id = 0
@ -83,24 +106,17 @@ class WebMaster(flow.FlowMaster):
self.shutdown() self.shutdown()
def handle_request(self, f): def handle_request(self, f):
app.ClientConnection.broadcast("add_flow", f.get_state(True)) super(WebMaster, self).handle_request(f)
flow.FlowMaster.handle_request(self, f)
if f: if f:
f.reply() f.reply()
return f return f
def handle_response(self, f): def handle_response(self, f):
app.ClientConnection.broadcast("update_flow", f.get_state(True)) super(WebMaster, self).handle_response(f)
flow.FlowMaster.handle_response(self, f)
if f: if f:
f.reply() f.reply()
return f return f
def handle_error(self, f):
app.ClientConnection.broadcast("update_flow", f.get_state(True))
flow.FlowMaster.handle_error(self, f)
return f
def handle_log(self, l): def handle_log(self, l):
self.last_log_id += 1 self.last_log_id += 1
app.ClientConnection.broadcast( app.ClientConnection.broadcast(

View File

@ -3,6 +3,7 @@ import tornado.web
import tornado.websocket import tornado.websocket
import logging import logging
import json import json
from .. import flow
class IndexHandler(tornado.web.RequestHandler): class IndexHandler(tornado.web.RequestHandler):
@ -10,36 +11,53 @@ class IndexHandler(tornado.web.RequestHandler):
self.render("index.html") self.render("index.html")
class ClientConnection(tornado.websocket.WebSocketHandler): class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
connections = set() connections = None # raise an error if inherited class doesn't specify its own instance.
def open(self): def open(self):
ClientConnection.connections.add(self) self.connections.add(self)
def on_close(self): def on_close(self):
ClientConnection.connections.remove(self) self.connections.remove(self)
@classmethod @classmethod
def broadcast(cls, type, data): def broadcast(cls, type, data):
message = json.dumps(
{
"type": type,
"data": data
}
)
for conn in cls.connections: for conn in cls.connections:
try: try:
conn.write_message( conn.write_message(message)
json.dumps(
{
"type": type,
"data": data
}
)
)
except: except:
logging.error("Error sending message", exc_info=True) logging.error("Error sending message", exc_info=True)
class FlowsHandler(tornado.web.RequestHandler):
def get(self):
self.write(dict(
flows=[f.get_state(short=True) for f in self.application.state.flows]
))
class FlowUpdates(WebSocketEventBroadcaster):
connections = set()
class ClientConnection(WebSocketEventBroadcaster):
connections = set()
class Application(tornado.web.Application): class Application(tornado.web.Application):
def __init__(self, debug): def __init__(self, state, debug):
self.state = state
handlers = [ handlers = [
(r"/", IndexHandler), (r"/", IndexHandler),
(r"/updates", ClientConnection), (r"/updates", ClientConnection),
(r"/flows", FlowsHandler),
(r"/flows/updates", FlowUpdates),
] ]
settings = dict( settings = dict(
template_path=os.path.join(os.path.dirname(__file__), "templates"), template_path=os.path.join(os.path.dirname(__file__), "templates"),

View File

@ -335,132 +335,216 @@ _.extend(_EventLogStore.prototype, EventEmitter.prototype, {
var EventLogStore = new _EventLogStore(); var EventLogStore = new _EventLogStore();
AppDispatcher.register(EventLogStore.handle.bind(EventLogStore)); AppDispatcher.register(EventLogStore.handle.bind(EventLogStore));
function FlowView(store, live) { function FlowStore(endpoint) {
EventEmitter.call(this); this._views = [];
this._store = store; this.reset();
this.live = live; }
this.flows = []; _.extend(FlowStore.prototype, {
add: function (flow) {
this.add = this.add.bind(this); this._pos_map[flow.id] = this._flow_list.length;
this.update = this.update.bind(this); this._flow_list.push(flow);
for (var i = 0; i < this._views.length; i++) {
if (live) { this._views[i].add(flow);
this._store.addListener(ActionTypes.ADD_FLOW, this.add); }
this._store.addListener(ActionTypes.UPDATE_FLOW, this.update); },
update: function (flow) {
this._flow_list[this._pos_map[flow.id]] = flow;
for (var i = 0; i < this._views.length; i++) {
this._views[i].update(flow);
}
},
remove: function (flow_id) {
this._flow_list.splice(this._pos_map[flow_id], 1);
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].remove(flow_id);
}
},
reset: function (flows) {
this._flow_list = flows || [];
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].recalculate(this._flow_list);
}
},
_build_map: function () {
this._pos_map = {};
for (var i = 0; i < this._flow_list.length; i++) {
var flow = this._flow_list[i];
this._pos_map[flow.id] = i;
}
},
open_view: function (filt, sort) {
var view = new FlowView(this._flow_list, filt, sort);
this._views.push(view);
return view;
},
close_view: function (view) {
this._views = _.without(this._views, view);
} }
});
function LiveFlowStore(endpoint) {
FlowStore.call(this);
this.updates_before_init = []; // (empty array is true in js)
this.endpoint = endpoint || "/flows";
this.conn = new Connection(this.endpoint + "/updates");
this.conn.onopen = this._onopen.bind(this);
this.conn.onmessage = function (e) {
var message = JSON.parse(e.data);
this.handle_update(message.type, message.data);
}.bind(this);
}
_.extend(LiveFlowStore.prototype, FlowStore.prototype, {
handle_update: function (type, data) {
console.log("LiveFlowStore.handle_update", type, data);
if (this.updates_before_init) {
console.log("defer update", type, data);
this.updates_before_init.push(arguments);
} else {
this[type](data);
}
},
handle_fetch: function (data) {
console.log("Flows fetched.");
this.reset(data.flows);
var updates = this.updates_before_init;
this.updates_before_init = false;
for (var i = 0; i < updates.length; i++) {
this.handle_update.apply(this, updates[i]);
}
},
_onopen: function () {
//Update stream openend, fetch list of flows.
console.log("Update Connection opened, fetching flows...");
$.getJSON(this.endpoint, this.handle_fetch.bind(this));
},
});
function SortByInsertionOrder() {
this.i = 0;
this.map = {};
this.key = this.key.bind(this);
}
SortByInsertionOrder.prototype.key = function (flow) {
if (!(flow.id in this.map)) {
this.i++;
this.map[flow.id] = this.i;
}
return this.map[flow.id];
};
var default_sort = (new SortByInsertionOrder()).key;
function FlowView(flows, filt, sort) {
EventEmitter.call(this);
filt = filt || function (flow) {
return true;
};
sort = sort || default_sort;
this.recalculate(flows, filt, sort);
} }
_.extend(FlowView.prototype, EventEmitter.prototype, { _.extend(FlowView.prototype, EventEmitter.prototype, {
close: function () { recalculate: function (flows, filt, sort) {
this._store.removeListener(ActionTypes.ADD_FLOW, this.add); if (filt) {
this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update); this.filt = filt;
}, }
getAll: function () { if (sort) {
return this.flows; this.sort = sort;
}
this.flows = flows.filter(this.filt);
this.flows.sort(function (a, b) {
return this.sort(a) - this.sort(b);
}.bind(this));
this.emit("recalculate");
}, },
add: function (flow) { add: function (flow) {
return this.update(flow); if (this.filt(flow)) {
}, var idx = _.sortedIndex(this.flows, flow, this.sort);
add_bulk: function (flows) { if (idx === this.flows.length) { //happens often, .push is way faster.
//Treat all previously received updates as newer than the bulk update. this.flows.push(flow);
//If they weren't newer, we're about to receive an update for them very soon. } else {
var updates = this.flows; this.flows.splice(idx, 0, flow);
this.flows = flows; }
updates.forEach(function(flow){ this.emit("add", flow, idx);
this._update(flow);
}.bind(this));
this.emit("change");
},
_update: function(flow){
var idx = _.findIndex(this.flows, function(f){
return flow.id === f.id;
});
if(idx < 0){
this.flows.push(flow);
//if(this.flows.length > 100){
// this.flows.shift();
//}
} else {
this.flows[idx] = flow;
} }
}, },
update: function(flow){ update: function (flow) {
this._update(flow); var idx;
this.emit("change"); var i = this.flows.length;
}, // Search from the back, we usually update the latest flows.
}); while (i--) {
if (this.flows[i].id === flow.id) {
idx = i;
function _FlowStore() {
EventEmitter.call(this);
}
_.extend(_FlowStore.prototype, EventEmitter.prototype, {
getView: function (since) {
var view = new FlowView(this, !since);
$.getJSON("/static/flows.json", function(flows){
flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows));
var id = 1;
flows.forEach(function(flow){
flow.id = "uuid-" + id++;
});
view.add_bulk(flows);
});
return view;
},
handle: function (action) {
switch (action.type) {
case ActionTypes.ADD_FLOW:
case ActionTypes.UPDATE_FLOW:
this.emit(action.type, action.data);
break; break;
default: }
return; }
if (idx === -1) { //not contained in list
this.add(flow);
} else if (!this.filt(flow)) {
this.remove(flow.id);
} else {
if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed
this.remove(this.flows[idx]);
this.add(flow);
} else {
this.flows[idx] = flow;
this.emit("update", flow, idx);
}
}
},
remove: function (flow_id) {
var i = this.flows.length;
while (i--) {
if (this.flows[i].id === flow_id) {
this.flows.splice(i, 1);
this.emit("remove", flow_id, i);
break;
}
} }
} }
}); });
function Connection(url) {
if(url[0] != "/"){
var FlowStore = new _FlowStore(); this.url = url;
AppDispatcher.register(FlowStore.handle.bind(FlowStore)); } else {
this.url = location.origin.replace("http", "ws") + url;
function _Connection(url) { }
this.url = url; var ws = new WebSocket(this.url);
ws.onopen = function(){
this.onopen.apply(this, arguments);
}.bind(this);
ws.onmessage = function(){
this.onmessage.apply(this, arguments);
}.bind(this);
ws.onerror = function(){
this.onerror.apply(this, arguments);
}.bind(this);
ws.onclose = function(){
this.onclose.apply(this, arguments);
}.bind(this);
this.ws = ws;
} }
_Connection.prototype.init = function () { Connection.prototype.onopen = function (open) {
this.openWebSocketConnection();
};
_Connection.prototype.openWebSocketConnection = function () {
this.ws = new WebSocket(this.url.replace("http", "ws"));
var ws = this.ws;
ws.onopen = this.onopen.bind(this);
ws.onmessage = this.onmessage.bind(this);
ws.onerror = this.onerror.bind(this);
ws.onclose = this.onclose.bind(this);
};
_Connection.prototype.onopen = function (open) {
console.debug("onopen", this, arguments); console.debug("onopen", this, arguments);
}; };
_Connection.prototype.onmessage = function (message) { Connection.prototype.onmessage = function (message) {
//AppDispatcher.dispatchServerAction(...); console.warn("onmessage (not implemented)", this, message.data);
var m = JSON.parse(message.data);
AppDispatcher.dispatchServerAction(m);
}; };
_Connection.prototype.onerror = function (error) { Connection.prototype.onerror = function (error) {
EventLogActions.add_event("WebSocket Connection Error."); EventLogActions.add_event("WebSocket Connection Error.");
console.debug("onerror", this, arguments); console.debug("onerror", this, arguments);
}; };
_Connection.prototype.onclose = function (close) { Connection.prototype.onclose = function (close) {
EventLogActions.add_event("WebSocket Connection closed."); EventLogActions.add_event("WebSocket Connection closed.");
console.debug("onclose", this, arguments); console.debug("onclose", this, arguments);
}; };
Connection.prototype.close = function(){
var Connection = new _Connection(location.origin + "/updates"); this.ws.close();
};
/** @jsx React.DOM */ /** @jsx React.DOM */
//React utils. For other utilities, see ../utils.js //React utils. For other utilities, see ../utils.js
@ -1214,8 +1298,14 @@ var MainView = React.createClass({displayName: 'MainView',
}; };
}, },
componentDidMount: function () { componentDidMount: function () {
this.flowStore = FlowStore.getView(); //FIXME: The store should be global, move out of here.
this.flowStore.addListener("change",this.onFlowChange); window.flowstore = new LiveFlowStore();
this.flowStore = window.flowstore.open_view();
this.flowStore.addListener("add",this.onFlowChange);
this.flowStore.addListener("update",this.onFlowChange);
this.flowStore.addListener("remove",this.onFlowChange);
this.flowStore.addListener("recalculate",this.onFlowChange);
}, },
componentWillUnmount: function () { componentWillUnmount: function () {
this.flowStore.removeListener("change",this.onFlowChange); this.flowStore.removeListener("change",this.onFlowChange);
@ -1223,7 +1313,7 @@ var MainView = React.createClass({displayName: 'MainView',
}, },
onFlowChange: function () { onFlowChange: function () {
this.setState({ this.setState({
flows: this.flowStore.getAll() flows: this.flowStore.flows
}); });
}, },
selectDetailTab: function(panel) { selectDetailTab: function(panel) {
@ -1518,7 +1608,11 @@ var ProxyApp = (
) )
); );
$(function () { $(function () {
Connection.init(); window.app = React.renderComponent(ProxyApp, document.body);
app = React.renderComponent(ProxyApp, document.body); var UpdateConnection = new Connection("/updates");
UpdateConnection.onmessage = function (message) {
var m = JSON.parse(message.data);
AppDispatcher.dispatchServerAction(m);
};
}); });
//# sourceMappingURL=app.js.map //# sourceMappingURL=app.js.map

View File

@ -15,7 +15,7 @@ class TestConsoleState:
""" """
c = console.ConsoleState() c = console.ConsoleState()
f = self._add_request(c) f = self._add_request(c)
assert f in c._flow_list assert f in c.flows
assert c.get_focus() == (f, 0) assert c.get_focus() == (f, 0)
def test_focus(self): def test_focus(self):
@ -52,19 +52,19 @@ class TestConsoleState:
def _add_request(self, state): def _add_request(self, state):
f = tutils.tflow() f = tutils.tflow()
return state.add_request(f) return state.add_flow(f)
def _add_response(self, state): def _add_response(self, state):
f = self._add_request(state) f = self._add_request(state)
f.response = tutils.tresp() f.response = tutils.tresp()
state.add_response(f) state.update_flow(f)
def test_add_response(self): def test_add_response(self):
c = console.ConsoleState() c = console.ConsoleState()
f = self._add_request(c) f = self._add_request(c)
f.response = tutils.tresp() f.response = tutils.tresp()
c.focus = None c.focus = None
c.add_response(f) c.update_flow(f)
def test_focus_view(self): def test_focus_view(self):
c = console.ConsoleState() c = console.ConsoleState()

View File

@ -364,7 +364,7 @@ class TestState:
def test_backup(self): def test_backup(self):
c = flow.State() c = flow.State()
f = tutils.tflow() f = tutils.tflow()
c.add_request(f) c.add_flow(f)
f.backup() f.backup()
c.revert(f) c.revert(f)
@ -376,42 +376,42 @@ class TestState:
""" """
c = flow.State() c = flow.State()
f = tutils.tflow() f = tutils.tflow()
c.add_request(f) c.add_flow(f)
assert f assert f
assert c.flow_count() == 1 assert c.flow_count() == 1
assert c.active_flow_count() == 1 assert c.active_flow_count() == 1
newf = tutils.tflow() newf = tutils.tflow()
assert c.add_request(newf) assert c.add_flow(newf)
assert c.active_flow_count() == 2 assert c.active_flow_count() == 2
f.response = tutils.tresp() f.response = tutils.tresp()
assert c.add_response(f) assert c.update_flow(f)
assert c.flow_count() == 2 assert c.flow_count() == 2
assert c.active_flow_count() == 1 assert c.active_flow_count() == 1
_ = tutils.tresp() _ = tutils.tresp()
assert not c.add_response(None) assert not c.update_flow(None)
assert c.active_flow_count() == 1 assert c.active_flow_count() == 1
newf.response = tutils.tresp() newf.response = tutils.tresp()
assert c.add_response(newf) assert c.update_flow(newf)
assert c.active_flow_count() == 0 assert c.active_flow_count() == 0
def test_err(self): def test_err(self):
c = flow.State() c = flow.State()
f = tutils.tflow() f = tutils.tflow()
c.add_request(f) c.add_flow(f)
f.error = Error("message") f.error = Error("message")
assert c.add_error(f) assert c.update_flow(f)
c = flow.State() c = flow.State()
f = tutils.tflow() f = tutils.tflow()
c.add_request(f) c.add_flow(f)
c.set_limit("~e") c.set_limit("~e")
assert not c.view assert not c.view
f.error = tutils.terr() f.error = tutils.terr()
assert c.add_error(f) assert c.update_flow(f)
assert c.view assert c.view
def test_set_limit(self): def test_set_limit(self):
@ -420,20 +420,20 @@ class TestState:
f = tutils.tflow() f = tutils.tflow()
assert len(c.view) == 0 assert len(c.view) == 0
c.add_request(f) c.add_flow(f)
assert len(c.view) == 1 assert len(c.view) == 1
c.set_limit("~s") c.set_limit("~s")
assert c.limit_txt == "~s" assert c.limit_txt == "~s"
assert len(c.view) == 0 assert len(c.view) == 0
f.response = tutils.tresp() f.response = tutils.tresp()
c.add_response(f) c.update_flow(f)
assert len(c.view) == 1 assert len(c.view) == 1
c.set_limit(None) c.set_limit(None)
assert len(c.view) == 1 assert len(c.view) == 1
f = tutils.tflow() f = tutils.tflow()
c.add_request(f) c.add_flow(f)
assert len(c.view) == 2 assert len(c.view) == 2
c.set_limit("~q") c.set_limit("~q")
assert len(c.view) == 1 assert len(c.view) == 1
@ -452,18 +452,18 @@ class TestState:
def _add_request(self, state): def _add_request(self, state):
f = tutils.tflow() f = tutils.tflow()
state.add_request(f) state.add_flow(f)
return f return f
def _add_response(self, state): def _add_response(self, state):
f = tutils.tflow() f = tutils.tflow()
state.add_request(f) state.add_flow(f)
f.response = tutils.tresp() f.response = tutils.tresp()
state.add_response(f) state.update_flow(f)
def _add_error(self, state): def _add_error(self, state):
f = tutils.tflow(err=True) f = tutils.tflow(err=True)
state.add_request(f) state.add_flow(f)
def test_clear(self): def test_clear(self):
c = flow.State() c = flow.State()
@ -487,7 +487,7 @@ class TestState:
c.clear() c.clear()
c.load_flows(flows) c.load_flows(flows)
assert isinstance(c._flow_list[0], Flow) assert isinstance(c.flows[0], Flow)
def test_accept_all(self): def test_accept_all(self):
c = flow.State() c = flow.State()
@ -532,7 +532,7 @@ class TestSerialize:
s = flow.State() s = flow.State()
fm = flow.FlowMaster(None, s) fm = flow.FlowMaster(None, s)
fm.load_flows(r) fm.load_flows(r)
assert len(s._flow_list) == 6 assert len(s.flows) == 6
def test_load_flows_reverse(self): def test_load_flows_reverse(self):
r = self._treader() r = self._treader()
@ -540,7 +540,7 @@ class TestSerialize:
conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80]) conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80])
fm = flow.FlowMaster(DummyServer(conf), s) fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r) fm.load_flows(r)
assert s._flow_list[0].request.host == "use-this-domain" assert s.flows[0].request.host == "use-this-domain"
def test_filter(self): def test_filter(self):
sio = StringIO() sio = StringIO()

View File

@ -747,19 +747,19 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert req.content == "content" assert req.content == "content"
assert req.status_code == 418 assert req.status_code == 418
assert not self.chain[1].tmaster.state._flow_list[0].response # killed assert not self.chain[1].tmaster.state.flows[0].response # killed
assert self.chain[1].tmaster.state._flow_list[1].response assert self.chain[1].tmaster.state.flows[1].response
assert self.proxy.tmaster.state._flow_list[0].request.form_in == "authority" assert self.proxy.tmaster.state.flows[0].request.form_in == "authority"
assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative" assert self.proxy.tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority" assert self.chain[0].tmaster.state.flows[0].request.form_in == "authority"
assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative" assert self.chain[0].tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority" assert self.chain[0].tmaster.state.flows[2].request.form_in == "authority"
assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative" assert self.chain[0].tmaster.state.flows[3].request.form_in == "relative"
assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "relative" assert self.chain[1].tmaster.state.flows[0].request.form_in == "relative"
assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative" assert self.chain[1].tmaster.state.flows[1].request.form_in == "relative"
req = p.request("get:'/p/418:b\"content2\"'") req = p.request("get:'/p/418:b\"content2\"'")

View File

@ -1,4 +1,8 @@
$(function () { $(function () {
Connection.init(); window.app = React.renderComponent(ProxyApp, document.body);
app = React.renderComponent(ProxyApp, document.body); var UpdateConnection = new Connection("/updates");
UpdateConnection.onmessage = function (message) {
var m = JSON.parse(message.data);
AppDispatcher.dispatchServerAction(m);
};
}); });

View File

@ -7,8 +7,14 @@ var MainView = React.createClass({
}; };
}, },
componentDidMount: function () { componentDidMount: function () {
this.flowStore = FlowStore.getView(); //FIXME: The store should be global, move out of here.
this.flowStore.addListener("change",this.onFlowChange); window.flowstore = new LiveFlowStore();
this.flowStore = window.flowstore.open_view();
this.flowStore.addListener("add",this.onFlowChange);
this.flowStore.addListener("update",this.onFlowChange);
this.flowStore.addListener("remove",this.onFlowChange);
this.flowStore.addListener("recalculate",this.onFlowChange);
}, },
componentWillUnmount: function () { componentWillUnmount: function () {
this.flowStore.removeListener("change",this.onFlowChange); this.flowStore.removeListener("change",this.onFlowChange);
@ -16,7 +22,7 @@ var MainView = React.createClass({
}, },
onFlowChange: function () { onFlowChange: function () {
this.setState({ this.setState({
flows: this.flowStore.getAll() flows: this.flowStore.flows
}); });
}, },
selectDetailTab: function(panel) { selectDetailTab: function(panel) {

View File

@ -1,33 +1,38 @@
function _Connection(url) { function Connection(url) {
this.url = url; if(url[0] != "/"){
this.url = url;
} else {
this.url = location.origin.replace("http", "ws") + url;
}
var ws = new WebSocket(this.url);
ws.onopen = function(){
this.onopen.apply(this, arguments);
}.bind(this);
ws.onmessage = function(){
this.onmessage.apply(this, arguments);
}.bind(this);
ws.onerror = function(){
this.onerror.apply(this, arguments);
}.bind(this);
ws.onclose = function(){
this.onclose.apply(this, arguments);
}.bind(this);
this.ws = ws;
} }
_Connection.prototype.init = function () { Connection.prototype.onopen = function (open) {
this.openWebSocketConnection();
};
_Connection.prototype.openWebSocketConnection = function () {
this.ws = new WebSocket(this.url.replace("http", "ws"));
var ws = this.ws;
ws.onopen = this.onopen.bind(this);
ws.onmessage = this.onmessage.bind(this);
ws.onerror = this.onerror.bind(this);
ws.onclose = this.onclose.bind(this);
};
_Connection.prototype.onopen = function (open) {
console.debug("onopen", this, arguments); console.debug("onopen", this, arguments);
}; };
_Connection.prototype.onmessage = function (message) { Connection.prototype.onmessage = function (message) {
//AppDispatcher.dispatchServerAction(...); console.warn("onmessage (not implemented)", this, message.data);
var m = JSON.parse(message.data);
AppDispatcher.dispatchServerAction(m);
}; };
_Connection.prototype.onerror = function (error) { Connection.prototype.onerror = function (error) {
EventLogActions.add_event("WebSocket Connection Error."); EventLogActions.add_event("WebSocket Connection Error.");
console.debug("onerror", this, arguments); console.debug("onerror", this, arguments);
}; };
_Connection.prototype.onclose = function (close) { Connection.prototype.onclose = function (close) {
EventLogActions.add_event("WebSocket Connection closed."); EventLogActions.add_event("WebSocket Connection closed.");
console.debug("onclose", this, arguments); console.debug("onclose", this, arguments);
}; };
Connection.prototype.close = function(){
var Connection = new _Connection(location.origin + "/updates"); this.ws.close();
};

View File

@ -1,91 +1,172 @@
function FlowView(store, live) { function FlowStore(endpoint) {
EventEmitter.call(this); this._views = [];
this._store = store; this.reset();
this.live = live; }
this.flows = []; _.extend(FlowStore.prototype, {
add: function (flow) {
this.add = this.add.bind(this); this._pos_map[flow.id] = this._flow_list.length;
this.update = this.update.bind(this); this._flow_list.push(flow);
for (var i = 0; i < this._views.length; i++) {
if (live) { this._views[i].add(flow);
this._store.addListener(ActionTypes.ADD_FLOW, this.add); }
this._store.addListener(ActionTypes.UPDATE_FLOW, this.update); },
update: function (flow) {
this._flow_list[this._pos_map[flow.id]] = flow;
for (var i = 0; i < this._views.length; i++) {
this._views[i].update(flow);
}
},
remove: function (flow_id) {
this._flow_list.splice(this._pos_map[flow_id], 1);
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].remove(flow_id);
}
},
reset: function (flows) {
this._flow_list = flows || [];
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].recalculate(this._flow_list);
}
},
_build_map: function () {
this._pos_map = {};
for (var i = 0; i < this._flow_list.length; i++) {
var flow = this._flow_list[i];
this._pos_map[flow.id] = i;
}
},
open_view: function (filt, sort) {
var view = new FlowView(this._flow_list, filt, sort);
this._views.push(view);
return view;
},
close_view: function (view) {
this._views = _.without(this._views, view);
} }
});
function LiveFlowStore(endpoint) {
FlowStore.call(this);
this.updates_before_init = []; // (empty array is true in js)
this.endpoint = endpoint || "/flows";
this.conn = new Connection(this.endpoint + "/updates");
this.conn.onopen = this._onopen.bind(this);
this.conn.onmessage = function (e) {
var message = JSON.parse(e.data);
this.handle_update(message.type, message.data);
}.bind(this);
}
_.extend(LiveFlowStore.prototype, FlowStore.prototype, {
handle_update: function (type, data) {
console.log("LiveFlowStore.handle_update", type, data);
if (this.updates_before_init) {
console.log("defer update", type, data);
this.updates_before_init.push(arguments);
} else {
this[type](data);
}
},
handle_fetch: function (data) {
console.log("Flows fetched.");
this.reset(data.flows);
var updates = this.updates_before_init;
this.updates_before_init = false;
for (var i = 0; i < updates.length; i++) {
this.handle_update.apply(this, updates[i]);
}
},
_onopen: function () {
//Update stream openend, fetch list of flows.
console.log("Update Connection opened, fetching flows...");
$.getJSON(this.endpoint, this.handle_fetch.bind(this));
},
});
function SortByInsertionOrder() {
this.i = 0;
this.map = {};
this.key = this.key.bind(this);
}
SortByInsertionOrder.prototype.key = function (flow) {
if (!(flow.id in this.map)) {
this.i++;
this.map[flow.id] = this.i;
}
return this.map[flow.id];
};
var default_sort = (new SortByInsertionOrder()).key;
function FlowView(flows, filt, sort) {
EventEmitter.call(this);
filt = filt || function (flow) {
return true;
};
sort = sort || default_sort;
this.recalculate(flows, filt, sort);
} }
_.extend(FlowView.prototype, EventEmitter.prototype, { _.extend(FlowView.prototype, EventEmitter.prototype, {
close: function () { recalculate: function (flows, filt, sort) {
this._store.removeListener(ActionTypes.ADD_FLOW, this.add); if (filt) {
this._store.removeListener(ActionTypes.UPDATE_FLOW, this.update); this.filt = filt;
}, }
getAll: function () { if (sort) {
return this.flows; this.sort = sort;
}
this.flows = flows.filter(this.filt);
this.flows.sort(function (a, b) {
return this.sort(a) - this.sort(b);
}.bind(this));
this.emit("recalculate");
}, },
add: function (flow) { add: function (flow) {
return this.update(flow); if (this.filt(flow)) {
}, var idx = _.sortedIndex(this.flows, flow, this.sort);
add_bulk: function (flows) { if (idx === this.flows.length) { //happens often, .push is way faster.
//Treat all previously received updates as newer than the bulk update. this.flows.push(flow);
//If they weren't newer, we're about to receive an update for them very soon. } else {
var updates = this.flows; this.flows.splice(idx, 0, flow);
this.flows = flows; }
updates.forEach(function(flow){ this.emit("add", flow, idx);
this._update(flow);
}.bind(this));
this.emit("change");
},
_update: function(flow){
var idx = _.findIndex(this.flows, function(f){
return flow.id === f.id;
});
if(idx < 0){
this.flows.push(flow);
//if(this.flows.length > 100){
// this.flows.shift();
//}
} else {
this.flows[idx] = flow;
} }
}, },
update: function(flow){ update: function (flow) {
this._update(flow); var idx;
this.emit("change"); var i = this.flows.length;
}, // Search from the back, we usually update the latest flows.
}); while (i--) {
if (this.flows[i].id === flow.id) {
idx = i;
function _FlowStore() {
EventEmitter.call(this);
}
_.extend(_FlowStore.prototype, EventEmitter.prototype, {
getView: function (since) {
var view = new FlowView(this, !since);
$.getJSON("/static/flows.json", function(flows){
flows = flows.concat(_.cloneDeep(flows)).concat(_.cloneDeep(flows));
var id = 1;
flows.forEach(function(flow){
flow.id = "uuid-" + id++;
});
view.add_bulk(flows);
});
return view;
},
handle: function (action) {
switch (action.type) {
case ActionTypes.ADD_FLOW:
case ActionTypes.UPDATE_FLOW:
this.emit(action.type, action.data);
break; break;
default: }
return; }
if (idx === -1) { //not contained in list
this.add(flow);
} else if (!this.filt(flow)) {
this.remove(flow.id);
} else {
if (this.sort(this.flows[idx]) !== this.sort(flow)) { //sortpos has changed
this.remove(this.flows[idx]);
this.add(flow);
} else {
this.flows[idx] = flow;
this.emit("update", flow, idx);
}
}
},
remove: function (flow_id) {
var i = this.flows.length;
while (i--) {
if (this.flows[i].id === flow_id) {
this.flows.splice(i, 1);
this.emit("remove", flow_id, i);
break;
}
} }
} }
}); });
var FlowStore = new _FlowStore();
AppDispatcher.register(FlowStore.handle.bind(FlowStore));