[web] add msgQueue

This commit is contained in:
Jason 2016-06-25 00:36:39 +08:00
parent 5a1677c387
commit 571c817f21
9 changed files with 281 additions and 236 deletions

File diff suppressed because one or more lines are too long

View File

@ -1,12 +1,12 @@
import { fetchApi } from '../utils'
import reduceList, * as listActions from './utils/list' import reduceList, * as listActions from './utils/list'
import reduceView, * as viewActions from './utils/view' import reduceView, * as viewActions from './utils/view'
import * as websocketActions from './websocket' import * as websocketActions from './websocket'
import * as msgQueueActions from './msgQueue'
export const WS_MSG_TYPE = 'UPDATE_EVENTLOG' export const MSG_TYPE = 'UPDATE_EVENTLOG'
export const DATA_URL = '/events'
export const ADD = 'EVENTLOG_ADD' export const ADD = 'EVENTLOG_ADD'
export const REQUEST = 'EVENTLOG_REQUEST'
export const RECEIVE = 'EVENTLOG_RECEIVE' export const RECEIVE = 'EVENTLOG_RECEIVE'
export const TOGGLE_VISIBILITY = 'EVENTLOG_TOGGLE_VISIBILITY' export const TOGGLE_VISIBILITY = 'EVENTLOG_TOGGLE_VISIBILITY'
export const TOGGLE_FILTER = 'EVENTLOG_TOGGLE_FILTER' export const TOGGLE_FILTER = 'EVENTLOG_TOGGLE_FILTER'
@ -51,12 +51,6 @@ export default function reduce(state = defaultState, action) {
view: reduceView(state.view, viewActions.add(item, log => state.filters[log.level])), view: reduceView(state.view, viewActions.add(item, log => state.filters[log.level])),
} }
case REQUEST:
return {
...state,
list: reduceList(state.list, listActions.request()),
}
case RECEIVE: case RECEIVE:
const list = reduceList(state.list, listActions.receive(action.list)) const list = reduceList(state.list, listActions.receive(action.list))
return { return {
@ -120,33 +114,12 @@ export function handleWsMsg(msg) {
* @public websocket * @public websocket
*/ */
export function fetchData() { export function fetchData() {
return dispatch => { return msgQueueActions.fetchData(MSG_TYPE)
dispatch(request())
return fetchApi('/events')
.then(res => res.json())
.then(json => dispatch(receive(json.data)))
.catch(error => dispatch(fetchError(error)))
}
} }
/** /**
* @private * @public msgQueue
*/ */
export function request() { export function receiveData(list) {
return { type: REQUEST }
}
/**
* @private
*/
export function receive(list) {
return { type: RECEIVE, list } return { type: RECEIVE, list }
} }
/**
* @private
*/
export function fetchError(error) {
return { type: FETCH_ERROR, error }
}

View File

@ -1,14 +1,15 @@
import { fetchApi } from '../utils' import { fetchApi } from '../utils'
import reduceList, * as listActions from './utils/list' import reduceList, * as listActions from './utils/list'
import reduceViews, * as viewsActions from './views' import reduceViews, * as viewsActions from './views'
import * as msgQueueActions from './msgQueue'
import * as websocketActions from './websocket' import * as websocketActions from './websocket'
export const WS_MSG_TYPE = 'UPDATE_FLOWS' export const MSG_TYPE = 'UPDATE_FLOWS'
export const DATA_URL = '/flows'
export const ADD = 'FLOWS_ADD' export const ADD = 'FLOWS_ADD'
export const UPDATE = 'FLOWS_UPDATE' export const UPDATE = 'FLOWS_UPDATE'
export const REMOVE = 'FLOWS_REMOVE' export const REMOVE = 'FLOWS_REMOVE'
export const REQUEST = 'FLOWS_REQUEST'
export const RECEIVE = 'FLOWS_RECEIVE' export const RECEIVE = 'FLOWS_RECEIVE'
export const REQUEST_ACTION = 'FLOWS_REQUEST_ACTION' export const REQUEST_ACTION = 'FLOWS_REQUEST_ACTION'
export const UNKNOWN_CMD = 'FLOWS_UNKNOWN_CMD' export const UNKNOWN_CMD = 'FLOWS_UNKNOWN_CMD'
@ -43,12 +44,6 @@ export default function reduce(state = defaultState, action) {
views: reduceViews(state.views, viewsActions.remove(action.item.id)), views: reduceViews(state.views, viewsActions.remove(action.item.id)),
} }
case REQUEST:
return {
...state,
list: reduceList(state.list, listActions.request()),
}
case RECEIVE: case RECEIVE:
const list = reduceList(state.list, listActions.receive(action.list)) const list = reduceList(state.list, listActions.receive(action.list))
return { return {
@ -177,14 +172,14 @@ export function handleWsMsg(msg) {
* @public websocket * @public websocket
*/ */
export function fetchData() { export function fetchData() {
return dispatch => { return msgQueueActions.fetchData(MSG_TYPE)
dispatch(request())
return fetchApi('/flows')
.then(res => res.json())
.then(json => dispatch(receive(json.data)))
.catch(error => dispatch(fetchError(error)))
} }
/**
* @public msgQueue
*/
export function receiveData(list) {
return { type: RECEIVE, list }
} }
/** /**
@ -207,24 +202,3 @@ export function update(id, item) {
export function remove(id) { export function remove(id) {
return { type: REMOVE, id } return { type: REMOVE, id }
} }
/**
* @private
*/
export function request() {
return { type: REQUEST }
}
/**
* @private
*/
export function receive(list) {
return { type: RECEIVE, list }
}
/**
* @private
*/
export function fetchError(error) {
return { type: FETCH_ERROR, error }
}

View File

@ -4,13 +4,13 @@ import websocket from './websocket'
import flows from './flows' import flows from './flows'
import settings from './settings' import settings from './settings'
import ui from './ui' import ui from './ui'
import msgQueue from './msgQueue'
const rootReducer = combineReducers({ export default combineReducers({
eventLog, eventLog,
websocket, websocket,
flows, flows,
settings, settings,
ui ui,
msgQueue,
}) })
export default rootReducer

View File

@ -0,0 +1,113 @@
import { fetchApi } from '../utils'
import * as websocketActions from './websocket'
import * as eventLogActions from './eventLog'
import * as flowsActions from './flows'
import * as settingsActions from './settings'
export const INIT = 'MSG_QUEUE_INIT'
export const ENQUEUE = 'MSG_QUEUE_ENQUEUE'
export const CLEAR = 'MSG_QUEUE_CLEAR'
export const FETCH_ERROR = 'MSG_QUEUE_FETCH_ERROR'
const handlers = {
[eventLogActions.MSG_TYPE] : eventLogActions,
[flowsActions.MSG_TYPE] : flowsActions,
[settingsActions.MSG_TYPE] : settingsActions,
}
const defaultState = {}
export default function reduce(state = defaultState, action) {
switch (action.type) {
case INIT:
return {
...state,
[action.queue]: [],
}
case ENQUEUE:
return {
...state,
[action.queue]: [...state[action.queue], action.msg],
}
case CLEAR:
return {
...state,
[action.queue]: null,
}
default:
return state
}
}
/**
* @public websocket
*/
export function handleWsMsg(msg) {
return (dispatch, getState) => {
const handler = handlers[msg.type]
if (msg.cmd === websocketActions.CMD_RESET) {
return dispatch(fetchData(handler.MSG_TYPE))
}
if (getState().msgQueue[handler.MSG_TYPE]) {
return dispatch({ type: ENQUEUE, queue: handler.MSG_TYPE, msg })
}
return dispatch(handler.handleWsMsg(msg))
}
}
/**
* @public
*/
export function fetchData(type) {
return dispatch => {
const handler = handlers[type]
dispatch(init(handler.MSG_TYPE))
fetchApi(handler.DATA_URL)
.then(res => res.json())
.then(json => dispatch(receive(type, json)))
.catch(error => dispatch(fetchError(type, error)))
}
}
/**
* @private
*/
export function receive(type, res) {
return (dispatch, getState) => {
const handler = handlers[type]
const queue = getState().msgQueue[handler.MSG_TYPE] || []
dispatch(clear(handler.MSG_TYPE))
dispatch(handler.receiveData(res.data))
for (const msg of queue) {
dispatch(handler.handleWsMsg(msg))
}
}
}
/**
* @private
*/
export function init(queue) {
return { type: INIT, queue }
}
/**
* @private
*/
export function clear(queue) {
return { type: CLEAR, queue }
}
/**
* @private
*/
export function fetchError(type, error) {
return { type: FETCH_ERROR, type, error }
}

View File

@ -1,45 +1,31 @@
import {fetchApi} from '../utils'; import { fetchApi } from '../utils'
import * as msgQueueActions from './msgQueue'
export const REQUEST_SETTINGS = 'REQUEST_SETTINGS' export const MSG_TYPE = 'UPDATE_SETTINGS'
export const RECEIVE_SETTINGS = 'RECEIVE_SETTINGS' export const DATA_URL = '/settings'
export const UPDATE_SETTINGS = 'UPDATE_SETTINGS'
export const RECEIVE = 'RECEIVE'
export const UPDATE = 'UPDATE'
export const REQUEST_UPDATE = 'REQUEST_UPDATE'
export const UNKNOWN_CMD = 'SETTINGS_UNKNOWN_CMD'
const defaultState = { const defaultState = {
settings: {}, settings: {},
isFetching: false,
actionsDuringFetch: [],
} }
export default function reducer(state = defaultState, action) { export default function reducer(state = defaultState, action) {
switch (action.type) { switch (action.type) {
case REQUEST_SETTINGS: case RECEIVE:
return { return {
...state, ...state,
isFetching: true
}
case RECEIVE_SETTINGS:
let s = {
settings: action.settings, settings: action.settings,
isFetching: false,
actionsDuringFetch: [],
} }
for (action of state.actionsDuringFetch) {
s = reducer(s, action)
}
return s
case UPDATE_SETTINGS: case UPDATE:
if (state.isFetching) {
return { return {
...state, ...state,
actionsDuringFetch: [...state.actionsDuringFetch, action] settings: { ...state.settings, ...action.settings },
}
}
return {
...state,
settings: {...state.settings, ...action.settings}
} }
default: default:
@ -47,31 +33,39 @@ export default function reducer(state = defaultState, action) {
} }
} }
export function handleWsMsg(event) { /**
/* This action creator takes all WebSocket events */ * @public msgQueue
if (event.cmd === 'update') { */
return { export function handleWsMsg(msg) {
type: UPDATE_SETTINGS, switch (msg.cmd) {
settings: event.data
}
}
console.error('unknown settings update', event)
}
export function fetchSettings() { case websocketActions.CMD_UPDATE:
return dispatch => { return { type: UPDATE, settings: msg.data }
dispatch({type: REQUEST_SETTINGS})
return fetchApi('/settings') default:
.then(response => response.json()) console.error('unknown settings update', msg)
.then(json => return { type: UNKNOWN_CMD, msg }
dispatch({type: RECEIVE_SETTINGS, settings: json.data})
)
// TODO: Error handling
} }
} }
/**
* @public
*/
export function updateSettings(settings) { export function updateSettings(settings) {
fetchApi.put('/settings', settings) fetchApi.put('/settings', settings)
return { type: SET_INTERCEPT } return { type: REQUEST_UPDATE }
}
/**
* @public websocket
*/
export function fetchData() {
return msgQueueActions.fetchData(MSG_TYPE)
}
/**
* @public msgQueue
*/
export function receiveData(settings) {
return { type: RECEIVE, settings }
} }

View File

@ -1,54 +1,74 @@
import _ from 'lodash' import _ from 'lodash'
export const SET = 'LIST_SET' export const ADD = 'LIST_ADD'
export const CLEAR = 'LIST_CLEAR' export const UPDATE = 'LIST_UPDATE'
export const REQUEST = 'LIST_REQUEST' export const REMOVE = 'LIST_REMOVE'
export const RECEIVE = 'LIST_RECEIVE' export const RECEIVE = 'LIST_RECEIVE'
const defaultState = { const defaultState = {
data: {}, data: [],
pendingActions: null, byId: {},
indexOf: {},
} }
export default function reduce(state = defaultState, action) { export default function reduce(state = defaultState, action) {
switch (action.type) { switch (action.type) {
case SET: case ADD:
if (state.pendingActions) {
return { return {
...state, ...state,
pendingActions: [...state.pendingActions, action] data: [...state.data, action.item],
} byId: { ...state.byId, [action.item.id]: action.item },
} indexOf: { ...state.indexOf, [action.item.id]: state.data.length },
return {
...state,
data: { ...state.data, [action.id]: null, [action.item.id]: action.item }
} }
case CLEAR: case UPDATE: {
if (state.pendingActions) { const data = [...state.data]
return { const index = state.indexOf[action.id]
...state,
pendingActions: [...state.pendingActions, action] if (index == null) {
} throw new Error('Item not found')
}
return {
...state,
data: { ...state.data, [action.id]: null }
} }
case REQUEST: data[index] = action.item
return { return {
...state, ...state,
pendingActions: [] data,
byId: { ...state.byId, [action.id]: null, [action.item.id]: action.item },
indexOf: { ...state.indexOf, [action.id]: null, [action.item.id]: index },
}
}
case REMOVE: {
const data = [...state.data]
const indexOf = { ...state.indexOf }
const index = indexOf[action.id]
if (index == null) {
throw new Error('Item not found')
}
data.splice(index, 1)
for (let i = data.length - 1; i >= index; i--) {
indexOf[data[i].id] = i
}
return {
...state,
data,
indexOf,
byId: { ...state.byId, [action.id]: null },
}
} }
case RECEIVE: case RECEIVE:
return state.pendingActions.reduce(reduce, { return {
...state, ...state,
pendingActions: null, data: action.list,
data: _.fromPairs(action.list.map(item => [item.id, item])), byId: _.fromPairs(action.list.map(item => [item.id, item])),
}) indexOf: _.fromPairs(action.list.map((item, index) => [item.id, index])),
}
default: default:
return state return state
@ -59,28 +79,21 @@ export default function reduce(state = defaultState, action) {
* @public * @public
*/ */
export function add(item) { export function add(item) {
return { type: SET, id: item.id, item } return { type: ADD, item }
} }
/** /**
* @public * @public
*/ */
export function update(id, item) { export function update(id, item) {
return { type: SET, id, item } return { type: UPDATE, id, item }
} }
/** /**
* @public * @public
*/ */
export function remove(id) { export function remove(id) {
return { type: CLEAR, id } return { type: REMOVE, id }
}
/**
* @public
*/
export function request() {
return { type: REQUEST }
} }
/** /**

View File

@ -16,7 +16,7 @@ export default function reduce(state = defaultState, action) {
switch (action.type) { switch (action.type) {
case UPDATE_FILTER: { case UPDATE_FILTER: {
const data = _.values(action.list.data).filter(action.filter).sort(action.sorter) const data = action.list.data.filter(action.filter).sort(action.sorter)
return { return {
...state, ...state,
data, data,
@ -69,7 +69,7 @@ export default function reduce(state = defaultState, action) {
} }
case RECEIVE: { case RECEIVE: {
const data = _.values(action.list.data).filter(action.filter).sort(action.sorter) const data = action.list.data.filter(action.filter).sort(action.sorter)
return { return {
...state, ...state,
data, data,
@ -138,7 +138,7 @@ function sortedIndex(list, item, sorter) {
while (low < high) { while (low < high) {
const middle = (low + high) >>> 1 const middle = (low + high) >>> 1
if (sorter(item, list[middle]) > 0) { if (sorter(item, list[middle]) >= 0) {
low = middle + 1 low = middle + 1
} else { } else {
high = middle high = middle

View File

@ -1,5 +1,7 @@
import { ConnectionActions } from '../actions.js' import { ConnectionActions } from '../actions.js'
import { AppDispatcher } from '../dispatcher.js' import { AppDispatcher } from '../dispatcher.js'
import * as msgQueueActions from './msgQueue'
import * as eventLogActions from './eventLog' import * as eventLogActions from './eventLog'
import * as flowsActions from './flows' import * as flowsActions from './flows'
import * as settingsActions from './settings' import * as settingsActions from './settings'
@ -45,17 +47,12 @@ export function connect() {
return dispatch => { return dispatch => {
const socket = new WebSocket(location.origin.replace('http', 'ws') + '/updates') const socket = new WebSocket(location.origin.replace('http', 'ws') + '/updates')
// @todo remove this
window.ws = socket
socket.addEventListener('open', () => dispatch(onConnect())) socket.addEventListener('open', () => dispatch(onConnect()))
socket.addEventListener('close', () => dispatch(onDisconnect())) socket.addEventListener('close', () => dispatch(onDisconnect()))
socket.addEventListener('message', msg => dispatch(onMessage(msg))) socket.addEventListener('message', msg => dispatch(onMessage(JSON.parse(msg.data))))
socket.addEventListener('error', error => dispatch(onError(error))) socket.addEventListener('error', error => dispatch(onError(error)))
dispatch({ type: CONNECT, socket }) dispatch({ type: CONNECT, socket })
return socket
} }
} }
@ -70,39 +67,18 @@ export function onConnect() {
// workaround to make sure that our state is already available. // workaround to make sure that our state is already available.
return dispatch => { return dispatch => {
dispatch({ type: CONNECTED }) dispatch({ type: CONNECTED })
dispatch(settingsActions.fetchSettings()) dispatch(settingsActions.fetchData())
dispatch(flowsActions.fetchFlows()).then(() => ConnectionActions.open()) dispatch(flowsActions.fetchData())
dispatch(eventLogActions.fetchData())
} }
} }
export function onMessage(msg) { export function onMessage(msg) {
return dispatch => { return msgQueueActions.handleWsMsg(msg)
const data = JSON.parse(msg.data)
AppDispatcher.dispatchServerAction(data)
switch (data.type) {
case eventLogActions.WS_MSG_TYPE:
return dispatch(eventLogActions.handleWsMsg(data))
case flowsActions.WS_MSG_TYPE:
return dispatch(flowsActions.handleWsMsg(data))
case settingsActions.UPDATE_SETTINGS:
return dispatch(settingsActions.handleWsMsg(data))
default:
console.warn('unknown message', data)
}
dispatch({ type: MESSAGE, msg })
}
} }
export function onDisconnect() { export function onDisconnect() {
return dispatch => { return dispatch => {
ConnectionActions.close()
dispatch(eventLogActions.addLogEntry('WebSocket connection closed.')) dispatch(eventLogActions.addLogEntry('WebSocket connection closed.'))
dispatch({ type: DISCONNECTED }) dispatch({ type: DISCONNECTED })
} }
@ -111,7 +87,6 @@ export function onDisconnect() {
export function onError(error) { export function onError(error) {
// @todo let event log subscribe WebSocketActions.ERROR // @todo let event log subscribe WebSocketActions.ERROR
return dispatch => { return dispatch => {
ConnectionActions.error()
dispatch(eventLogActions.addLogEntry('WebSocket connection error.')) dispatch(eventLogActions.addLogEntry('WebSocket connection error.'))
dispatch({ type: ERROR, error }) dispatch({ type: ERROR, error })
} }