mitmproxy/test/test_console.py

284 lines
7.1 KiB
Python
Raw Normal View History

from libmproxy import console, proxy, utils, filt, flow
2010-02-16 04:09:07 +00:00
import libpry
def treq(conn=None):
if not conn:
conn = proxy.BrowserConnection("address", 22)
headers = utils.Headers()
headers["header"] = ["qvalue"]
return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
def tresp(req=None):
if not req:
req = treq()
headers = utils.Headers()
headers["header_response"] = ["svalue"]
return proxy.Response(req, 200, "HTTP/1.1", "message", headers, "content_response")
def tflow():
bc = proxy.BrowserConnection("address", 22)
return console.ConsoleFlow(bc)
2010-02-16 04:09:07 +00:00
class uState(libpry.AutoTree):
def test_backup(self):
bc = proxy.BrowserConnection("address", 22)
c = console.ConsoleState()
f = console.ConsoleFlow(bc)
2010-02-16 04:09:07 +00:00
c.add_browserconnect(f)
f.backup()
c.revert(f)
def test_flow(self):
"""
normal flow:
connect -> request -> response
"""
bc = proxy.BrowserConnection("address", 22)
c = console.ConsoleState()
f = console.ConsoleFlow(bc)
2010-02-16 04:09:07 +00:00
c.add_browserconnect(f)
assert c.lookup(bc)
assert c.get_focus() == (f, 0)
req = treq(bc)
assert c.add_request(req)
assert len(c.flow_list) == 1
assert c.lookup(req)
newreq = treq()
assert not c.add_request(newreq)
assert not c.lookup(newreq)
resp = tresp(req)
assert c.add_response(resp)
assert len(c.flow_list) == 1
assert f.waiting == False
assert c.lookup(resp)
newresp = tresp()
assert not c.add_response(newresp)
assert not c.lookup(newresp)
def test_err(self):
bc = proxy.BrowserConnection("address", 22)
c = console.ConsoleState()
f = console.ConsoleFlow(bc)
2010-02-16 04:09:07 +00:00
c.add_browserconnect(f)
e = proxy.Error(bc, "message")
assert c.add_error(e)
e = proxy.Error(proxy.BrowserConnection("address", 22), "message")
assert not c.add_error(e)
def test_view(self):
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
f = tflow()
c.add_browserconnect(f)
assert len(c.view) == 1
c.set_limit(filt.parse("~q"))
assert len(c.view) == 0
c.set_limit(None)
f = tflow()
req = treq(f.connection)
c.add_browserconnect(f)
c.add_request(req)
assert len(c.view) == 2
c.set_limit(filt.parse("~q"))
assert len(c.view) == 1
c.set_limit(filt.parse("~s"))
assert len(c.view) == 0
def test_focus(self):
"""
normal flow:
connect -> request -> response
"""
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
bc = proxy.BrowserConnection("address", 22)
f = console.ConsoleFlow(bc)
2010-02-16 04:09:07 +00:00
c.add_browserconnect(f)
assert c.get_focus() == (f, 0)
assert c.get_from_pos(0) == (f, 0)
assert c.get_from_pos(1) == (None, None)
assert c.get_next(0) == (None, None)
bc2 = proxy.BrowserConnection("address", 22)
f2 = console.ConsoleFlow(bc2)
2010-02-16 04:09:07 +00:00
c.add_browserconnect(f2)
assert c.get_focus() == (f, 1)
assert c.get_next(0) == (f, 1)
assert c.get_prev(1) == (f2, 0)
assert c.get_next(1) == (None, None)
c.set_focus(0)
assert c.get_focus() == (f2, 0)
c.set_focus(-1)
assert c.get_focus() == (f2, 0)
c.delete_flow(f2)
assert c.get_focus() == (f, 0)
c.delete_flow(f)
assert c.get_focus() == (None, None)
def _add_request(self, state):
f = tflow()
state.add_browserconnect(f)
q = treq(f.connection)
state.add_request(q)
return f
def _add_response(self, state):
f = self._add_request(state)
r = tresp(f.request)
state.add_response(r)
def test_focus_view(self):
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
c.set_limit(filt.parse("~q"))
assert len(c.view) == 3
assert c.focus == 2
def test_delete_last(self):
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
f1 = tflow()
f2 = tflow()
c.add_browserconnect(f1)
c.add_browserconnect(f2)
c.set_focus(1)
c.delete_flow(f1)
assert c.focus == 0
def test_kill_flow(self):
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
f = tflow()
c.add_browserconnect(f)
c.kill_flow(f)
assert not c.flow_list
def test_clear(self):
c = console.ConsoleState()
2010-02-16 04:09:07 +00:00
f = tflow()
c.add_browserconnect(f)
f.intercepting = True
c.clear()
assert len(c.flow_list) == 1
f.intercepting = False
c.clear()
assert len(c.flow_list) == 0
class uFlow(libpry.AutoTree):
def test_match(self):
f = tflow()
f.response = tresp()
f.request = f.response.request
assert not f.match(filt.parse("~b test"))
def test_backup(self):
f = tflow()
f.backup()
f.revert()
def test_getset_state(self):
f = tflow()
state = f.get_state()
assert f == console.ConsoleFlow.from_state(state)
f.response = tresp()
f.request = f.response.request
state = f.get_state()
assert f == console.ConsoleFlow.from_state(state)
2010-02-16 04:09:07 +00:00
def test_simple(self):
f = tflow()
assert f.get_text()
f.request = treq()
assert f.get_text()
f.response = tresp()
f.response.headers["content-type"] = ["text/html"]
assert f.get_text()
f.response.code = 404
assert f.get_text()
f.focus = True
assert f.get_text()
f.connection = flow.ReplayConnection()
2010-02-16 04:09:07 +00:00
assert f.get_text()
f.response = None
assert f.get_text()
f.error = proxy.Error(200, "test")
assert f.get_text()
def test_kill(self):
f = tflow()
f.request = treq()
f.intercept()
assert not f.request.acked
f.kill()
assert f.request.acked
f.intercept()
f.response = tresp()
f.request = f.response.request
f.request.ack()
assert not f.response.acked
f.kill()
assert f.response.acked
def test_accept_intercept(self):
f = tflow()
f.request = treq()
f.intercept()
assert not f.request.acked
f.accept_intercept()
assert f.request.acked
f.response = tresp()
f.request = f.response.request
f.intercept()
f.request.ack()
assert not f.response.acked
f.accept_intercept()
assert f.response.acked
def test_serialization(self):
f = console.ConsoleFlow(None)
f.request = treq()
2010-02-16 04:09:07 +00:00
class uformat_keyvals(libpry.AutoTree):
def test_simple(self):
assert console.format_keyvals(
[
("aa", "bb"),
("cc", "dd"),
]
)
tests = [
uFlow(),
uformat_keyvals(),
uState()
]