mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
Rebase on master
This commit is contained in:
parent
1b1ea98f08
commit
f7f9cab5dc
@ -120,7 +120,7 @@ def mitmweb(args=None): # pragma: no cover
|
||||
options.verbose = 0
|
||||
|
||||
proxy_config = config.process_proxy_options(parser, options)
|
||||
web_options = web.Options(**cmdline.get_common_options(options))
|
||||
web_options = web.master.Options(**cmdline.get_common_options(options))
|
||||
web_options.intercept = options.intercept
|
||||
web_options.wdebug = options.wdebug
|
||||
web_options.wiface = options.wiface
|
||||
@ -131,7 +131,7 @@ def mitmweb(args=None): # pragma: no cover
|
||||
|
||||
server = get_server(web_options.no_server, proxy_config)
|
||||
|
||||
m = web.WebMaster(server, web_options)
|
||||
m = web.master.WebMaster(server, web_options)
|
||||
try:
|
||||
m.run()
|
||||
except (KeyboardInterrupt, _thread.error):
|
||||
|
@ -1,218 +1,3 @@
|
||||
from __future__ import absolute_import, print_function, division
|
||||
import master
|
||||
|
||||
import collections
|
||||
import sys
|
||||
|
||||
import tornado.httpserver
|
||||
import tornado.ioloop
|
||||
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.web import app
|
||||
from netlib.http import authentication
|
||||
|
||||
|
||||
class Stop(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WebFlowView(flow.FlowView):
|
||||
|
||||
def __init__(self, store):
|
||||
super(WebFlowView, self).__init__(store, None)
|
||||
|
||||
def _add(self, f):
|
||||
super(WebFlowView, self)._add(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="add",
|
||||
data=app._strip_content(f.get_state())
|
||||
)
|
||||
|
||||
def _update(self, f):
|
||||
super(WebFlowView, self)._update(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="update",
|
||||
data=app._strip_content(f.get_state())
|
||||
)
|
||||
|
||||
def _remove(self, f):
|
||||
super(WebFlowView, self)._remove(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="remove",
|
||||
data=f.id
|
||||
)
|
||||
|
||||
def _recalculate(self, flows):
|
||||
super(WebFlowView, self)._recalculate(flows)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="reset"
|
||||
)
|
||||
|
||||
|
||||
class WebState(flow.State):
|
||||
|
||||
def __init__(self):
|
||||
super(WebState, self).__init__()
|
||||
self.view._close()
|
||||
self.view = WebFlowView(self.flows)
|
||||
|
||||
self._last_event_id = 0
|
||||
self.events = collections.deque(maxlen=1000)
|
||||
|
||||
def add_event(self, e, level):
|
||||
self._last_event_id += 1
|
||||
entry = {
|
||||
"id": self._last_event_id,
|
||||
"message": e,
|
||||
"level": level
|
||||
}
|
||||
self.events.append(entry)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_EVENTLOG",
|
||||
cmd="add",
|
||||
data=entry
|
||||
)
|
||||
|
||||
def clear(self):
|
||||
super(WebState, self).clear()
|
||||
self.events.clear()
|
||||
app.ClientConnection.broadcast(
|
||||
type="events",
|
||||
cmd="reset",
|
||||
data=[]
|
||||
)
|
||||
|
||||
|
||||
class Options(object):
|
||||
attributes = [
|
||||
"app",
|
||||
"app_domain",
|
||||
"app_ip",
|
||||
"anticache",
|
||||
"anticomp",
|
||||
"client_replay",
|
||||
"eventlog",
|
||||
"keepserving",
|
||||
"kill",
|
||||
"intercept",
|
||||
"no_server",
|
||||
"refresh_server_playback",
|
||||
"rfile",
|
||||
"scripts",
|
||||
"showhost",
|
||||
"replacements",
|
||||
"rheaders",
|
||||
"setheaders",
|
||||
"server_replay",
|
||||
"stickycookie",
|
||||
"stickyauth",
|
||||
"stream_large_bodies",
|
||||
"verbosity",
|
||||
"wfile",
|
||||
"nopop",
|
||||
|
||||
"wdebug",
|
||||
"wport",
|
||||
"wiface",
|
||||
"wauthenticator",
|
||||
"wsingleuser",
|
||||
"whtpasswd",
|
||||
]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for i in self.attributes:
|
||||
if not hasattr(self, i):
|
||||
setattr(self, i, None)
|
||||
|
||||
def process_web_options(self, parser):
|
||||
if self.wsingleuser or self.whtpasswd:
|
||||
if self.wsingleuser:
|
||||
if len(self.wsingleuser.split(':')) != 2:
|
||||
return parser.error(
|
||||
"Invalid single-user specification. Please use the format username:password"
|
||||
)
|
||||
username, password = self.wsingleuser.split(':')
|
||||
self.wauthenticator = authentication.PassManSingleUser(username, password)
|
||||
elif self.whtpasswd:
|
||||
try:
|
||||
self.wauthenticator = authentication.PassManHtpasswd(self.whtpasswd)
|
||||
except ValueError as v:
|
||||
return parser.error(v.message)
|
||||
else:
|
||||
self.wauthenticator = None
|
||||
|
||||
|
||||
class WebMaster(flow.FlowMaster):
|
||||
|
||||
def __init__(self, server, options):
|
||||
self.options = options
|
||||
super(WebMaster, self).__init__(server, WebState())
|
||||
self.app = app.Application(self, self.options.wdebug, self.options.wauthenticator)
|
||||
if options.rfile:
|
||||
try:
|
||||
self.load_flows_file(options.rfile)
|
||||
except exceptions.FlowReadException as v:
|
||||
self.add_event(
|
||||
"Could not read flow file: %s" % v,
|
||||
"error"
|
||||
)
|
||||
|
||||
if options.outfile:
|
||||
err = self.start_stream_to_path(
|
||||
options.outfile[0],
|
||||
options.outfile[1]
|
||||
)
|
||||
if err:
|
||||
print("Stream file error: {}".format(err), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if self.options.app:
|
||||
self.start_app(self.options.app_host, self.options.app_port)
|
||||
|
||||
def run(self): # pragma: no cover
|
||||
|
||||
iol = tornado.ioloop.IOLoop.instance()
|
||||
|
||||
http_server = tornado.httpserver.HTTPServer(self.app)
|
||||
http_server.listen(self.options.wport)
|
||||
|
||||
iol.add_callback(self.start)
|
||||
tornado.ioloop.PeriodicCallback(lambda: self.tick(timeout=0), 5).start()
|
||||
try:
|
||||
print("Server listening at http://{}:{}".format(
|
||||
self.options.wiface, self.options.wport), file=sys.stderr)
|
||||
iol.start()
|
||||
except (Stop, KeyboardInterrupt):
|
||||
self.shutdown()
|
||||
|
||||
def _process_flow(self, f):
|
||||
if self.state.intercept and self.state.intercept(
|
||||
f) and not f.request.is_replay:
|
||||
f.intercept(self)
|
||||
f.reply.take()
|
||||
|
||||
@controller.handler
|
||||
def request(self, f):
|
||||
super(WebMaster, self).request(f)
|
||||
self._process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def response(self, f):
|
||||
super(WebMaster, self).response(f)
|
||||
self._process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def error(self, f):
|
||||
super(WebMaster, self).error(f)
|
||||
self._process_flow(f)
|
||||
|
||||
def add_event(self, e, level="info"):
|
||||
super(WebMaster, self).add_event(e, level)
|
||||
self.state.add_event(e, level)
|
||||
__all__ = ["master"]
|
||||
|
220
mitmproxy/web/master.py
Normal file
220
mitmproxy/web/master.py
Normal file
@ -0,0 +1,220 @@
|
||||
from __future__ import absolute_import, print_function, division
|
||||
|
||||
import sys
|
||||
import collections
|
||||
|
||||
import tornado.httpserver
|
||||
import tornado.ioloop
|
||||
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.web import app
|
||||
from netlib.http import authentication
|
||||
|
||||
|
||||
class Stop(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WebFlowView(flow.FlowView):
|
||||
|
||||
def __init__(self, store):
|
||||
super(WebFlowView, self).__init__(store, None)
|
||||
|
||||
def _add(self, f):
|
||||
super(WebFlowView, self)._add(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="add",
|
||||
data=app._strip_content(f.get_state())
|
||||
)
|
||||
|
||||
def _update(self, f):
|
||||
super(WebFlowView, self)._update(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="update",
|
||||
data=app._strip_content(f.get_state())
|
||||
)
|
||||
|
||||
def _remove(self, f):
|
||||
super(WebFlowView, self)._remove(f)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="remove",
|
||||
data=f.id
|
||||
)
|
||||
|
||||
def _recalculate(self, flows):
|
||||
super(WebFlowView, self)._recalculate(flows)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_FLOWS",
|
||||
cmd="reset"
|
||||
)
|
||||
|
||||
|
||||
class WebState(flow.State):
|
||||
|
||||
def __init__(self):
|
||||
super(WebState, self).__init__()
|
||||
self.view._close()
|
||||
self.view = WebFlowView(self.flows)
|
||||
|
||||
self._last_event_id = 0
|
||||
self.events = collections.deque(maxlen=1000)
|
||||
|
||||
def add_event(self, e, level):
|
||||
self._last_event_id += 1
|
||||
entry = {
|
||||
"id": self._last_event_id,
|
||||
"message": e,
|
||||
"level": level
|
||||
}
|
||||
self.events.append(entry)
|
||||
app.ClientConnection.broadcast(
|
||||
type="UPDATE_EVENTLOG",
|
||||
cmd="add",
|
||||
data=entry
|
||||
)
|
||||
|
||||
def clear(self):
|
||||
super(WebState, self).clear()
|
||||
self.events.clear()
|
||||
app.ClientConnection.broadcast(
|
||||
type="events",
|
||||
cmd="reset",
|
||||
data=[]
|
||||
)
|
||||
|
||||
|
||||
class Options(object):
|
||||
attributes = [
|
||||
"app",
|
||||
"app_domain",
|
||||
"app_ip",
|
||||
"anticache",
|
||||
"anticomp",
|
||||
"client_replay",
|
||||
"eventlog",
|
||||
"keepserving",
|
||||
"kill",
|
||||
"intercept",
|
||||
"no_server",
|
||||
"outfile",
|
||||
"refresh_server_playback",
|
||||
"rfile",
|
||||
"scripts",
|
||||
"showhost",
|
||||
"replacements",
|
||||
"rheaders",
|
||||
"setheaders",
|
||||
"server_replay",
|
||||
"stickycookie",
|
||||
"stickyauth",
|
||||
"stream_large_bodies",
|
||||
"verbosity",
|
||||
"wfile",
|
||||
"nopop",
|
||||
|
||||
"wdebug",
|
||||
"wport",
|
||||
"wiface",
|
||||
"wauthenticator",
|
||||
"wsingleuser",
|
||||
"whtpasswd",
|
||||
]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
for i in self.attributes:
|
||||
if not hasattr(self, i):
|
||||
setattr(self, i, None)
|
||||
|
||||
def process_web_options(self, parser):
|
||||
if self.wsingleuser or self.whtpasswd:
|
||||
if self.wsingleuser:
|
||||
if len(self.wsingleuser.split(':')) != 2:
|
||||
return parser.error(
|
||||
"Invalid single-user specification. Please use the format username:password"
|
||||
)
|
||||
username, password = self.wsingleuser.split(':')
|
||||
self.wauthenticator = authentication.PassManSingleUser(username, password)
|
||||
elif self.whtpasswd:
|
||||
try:
|
||||
self.wauthenticator = authentication.PassManHtpasswd(self.whtpasswd)
|
||||
except ValueError as v:
|
||||
return parser.error(v.message)
|
||||
else:
|
||||
self.wauthenticator = None
|
||||
|
||||
|
||||
class WebMaster(flow.FlowMaster):
|
||||
|
||||
def __init__(self, server, options):
|
||||
self.options = options
|
||||
super(WebMaster, self).__init__(server, WebState())
|
||||
self.app = app.Application(self, self.options.wdebug, self.options.wauthenticator)
|
||||
if options.rfile:
|
||||
try:
|
||||
self.load_flows_file(options.rfile)
|
||||
except exceptions.FlowReadException as v:
|
||||
self.add_event(
|
||||
"Could not read flow file: %s" % v,
|
||||
"error"
|
||||
)
|
||||
|
||||
if options.outfile:
|
||||
err = self.start_stream_to_path(
|
||||
options.outfile[0],
|
||||
options.outfile[1]
|
||||
)
|
||||
if err:
|
||||
print("Stream file error: {}".format(err), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if self.options.app:
|
||||
self.start_app(self.options.app_host, self.options.app_port)
|
||||
|
||||
def run(self): # pragma: no cover
|
||||
|
||||
iol = tornado.ioloop.IOLoop.instance()
|
||||
|
||||
http_server = tornado.httpserver.HTTPServer(self.app)
|
||||
http_server.listen(self.options.wport)
|
||||
|
||||
iol.add_callback(self.start)
|
||||
tornado.ioloop.PeriodicCallback(lambda: self.tick(timeout=0), 5).start()
|
||||
try:
|
||||
print("Server listening at http://{}:{}".format(
|
||||
self.options.wiface, self.options.wport), file=sys.stderr)
|
||||
iol.start()
|
||||
except (Stop, KeyboardInterrupt):
|
||||
self.shutdown()
|
||||
|
||||
def _process_flow(self, f):
|
||||
if self.state.intercept and self.state.intercept(
|
||||
f) and not f.request.is_replay:
|
||||
f.intercept(self)
|
||||
f.reply.take()
|
||||
return f
|
||||
|
||||
@controller.handler
|
||||
def request(self, f):
|
||||
super(WebMaster, self).request(f)
|
||||
return self._process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def response(self, f):
|
||||
super(WebMaster, self).response(f)
|
||||
return self._process_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def error(self, f):
|
||||
super(WebMaster, self).error(f)
|
||||
return self._process_flow(f)
|
||||
|
||||
def add_event(self, e, level="info"):
|
||||
super(WebMaster, self).add_event(e, level)
|
||||
return self.state.add_event(e, level)
|
33
test/mitmproxy/mastertest.py
Normal file
33
test/mitmproxy/mastertest.py
Normal file
@ -0,0 +1,33 @@
|
||||
import tutils
|
||||
import netlib.tutils
|
||||
import mock
|
||||
|
||||
from mitmproxy import flow, proxy, models
|
||||
|
||||
|
||||
class MasterTest:
|
||||
def cycle(self, master, content):
|
||||
f = tutils.tflow(req=netlib.tutils.treq(content=content))
|
||||
l = proxy.Log("connect")
|
||||
l.reply = mock.MagicMock()
|
||||
master.log(l)
|
||||
master.clientconnect(f.client_conn)
|
||||
master.serverconnect(f.server_conn)
|
||||
master.request(f)
|
||||
if not f.error:
|
||||
f.response = models.HTTPResponse.wrap(netlib.tutils.tresp(content=content))
|
||||
f = master.response(f)
|
||||
master.clientdisconnect(f.client_conn)
|
||||
return f
|
||||
|
||||
def dummy_cycle(self, master, n, content):
|
||||
for i in range(n):
|
||||
self.cycle(master, content)
|
||||
master.shutdown()
|
||||
|
||||
def flowfile(self, path):
|
||||
f = open(path, "wb")
|
||||
fw = flow.FlowWriter(f)
|
||||
t = tutils.tflow(resp=True)
|
||||
fw.add(t)
|
||||
f.close()
|
@ -1,13 +1,11 @@
|
||||
import os
|
||||
from six.moves import cStringIO as StringIO
|
||||
from mitmproxy.exceptions import ContentViewException
|
||||
from mitmproxy.models import HTTPResponse
|
||||
|
||||
import netlib.tutils
|
||||
|
||||
from mitmproxy import dump, flow
|
||||
from mitmproxy.proxy import Log
|
||||
from . import tutils
|
||||
from mitmproxy import dump, flow, models
|
||||
from . import tutils, mastertest
|
||||
import mock
|
||||
|
||||
|
||||
@ -58,37 +56,28 @@ def test_contentview(get_content_view):
|
||||
assert "Content viewer failed" in m.outfile.getvalue()
|
||||
|
||||
|
||||
class TestDumpMaster:
|
||||
class TestDumpMaster(mastertest.MasterTest):
|
||||
def dummy_cycle(self, master, n, content):
|
||||
mastertest.MasterTest.dummy_cycle(self, master, n, content)
|
||||
return master.outfile.getvalue()
|
||||
|
||||
def _cycle(self, m, content):
|
||||
f = tutils.tflow(req=netlib.tutils.treq(content=content))
|
||||
l = Log("connect")
|
||||
l.reply = mock.MagicMock()
|
||||
m.log(l)
|
||||
m.clientconnect(f.client_conn)
|
||||
m.serverconnect(f.server_conn)
|
||||
m.request(f)
|
||||
if not f.error:
|
||||
f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=content))
|
||||
f = m.response(f)
|
||||
m.clientdisconnect(f.client_conn)
|
||||
return f
|
||||
|
||||
def _dummy_cycle(self, n, filt, content, **options):
|
||||
def mkmaster(self, filt, **options):
|
||||
cs = StringIO()
|
||||
o = dump.Options(filtstr=filt, **options)
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
for i in range(n):
|
||||
self._cycle(m, content)
|
||||
m.shutdown()
|
||||
return cs.getvalue()
|
||||
return dump.DumpMaster(None, o, outfile=cs)
|
||||
|
||||
def _flowfile(self, path):
|
||||
f = open(path, "wb")
|
||||
fw = flow.FlowWriter(f)
|
||||
t = tutils.tflow(resp=True)
|
||||
fw.add(t)
|
||||
f.close()
|
||||
def test_basic(self):
|
||||
for i in (1, 2, 3):
|
||||
assert "GET" in self.dummy_cycle(self.mkmaster("~s", flow_detail=i), 1, "")
|
||||
assert "GET" in self.dummy_cycle(
|
||||
self.mkmaster("~s", flow_detail=i),
|
||||
1,
|
||||
"\x00\x00\x00"
|
||||
)
|
||||
assert "GET" in self.dummy_cycle(
|
||||
self.mkmaster("~s", flow_detail=i),
|
||||
1, "ascii"
|
||||
)
|
||||
|
||||
def test_error(self):
|
||||
cs = StringIO()
|
||||
@ -106,7 +95,7 @@ class TestDumpMaster:
|
||||
f = tutils.tflow()
|
||||
f.request.content = None
|
||||
m.request(f)
|
||||
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
f.response = models.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
f.response.content = None
|
||||
m.response(f)
|
||||
assert "content missing" in cs.getvalue()
|
||||
@ -119,17 +108,17 @@ class TestDumpMaster:
|
||||
|
||||
with tutils.tmpdir() as t:
|
||||
p = os.path.join(t, "rep")
|
||||
self._flowfile(p)
|
||||
self.flowfile(p)
|
||||
|
||||
o = dump.Options(server_replay=[p], kill=True)
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
|
||||
self._cycle(m, "content")
|
||||
self._cycle(m, "content")
|
||||
self.cycle(m, "content")
|
||||
self.cycle(m, "content")
|
||||
|
||||
o = dump.Options(server_replay=[p], kill=False)
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
self._cycle(m, "nonexistent")
|
||||
self.cycle(m, "nonexistent")
|
||||
|
||||
o = dump.Options(client_replay=[p], kill=False)
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
@ -137,22 +126,19 @@ class TestDumpMaster:
|
||||
def test_read(self):
|
||||
with tutils.tmpdir() as t:
|
||||
p = os.path.join(t, "read")
|
||||
self._flowfile(p)
|
||||
assert "GET" in self._dummy_cycle(
|
||||
0,
|
||||
None,
|
||||
"",
|
||||
flow_detail=1,
|
||||
rfile=p
|
||||
self.flowfile(p)
|
||||
assert "GET" in self.dummy_cycle(
|
||||
self.mkmaster(None, flow_detail=1, rfile=p),
|
||||
0, "",
|
||||
)
|
||||
|
||||
tutils.raises(
|
||||
dump.DumpError, self._dummy_cycle,
|
||||
0, None, "", verbosity=1, rfile="/nonexistent"
|
||||
dump.DumpError,
|
||||
self.mkmaster, None, verbosity=1, rfile="/nonexistent"
|
||||
)
|
||||
tutils.raises(
|
||||
dump.DumpError, self._dummy_cycle,
|
||||
0, None, "", verbosity=1, rfile="test_dump.py"
|
||||
dump.DumpError,
|
||||
self.mkmaster, None, verbosity=1, rfile="test_dump.py"
|
||||
)
|
||||
|
||||
def test_options(self):
|
||||
@ -160,7 +146,9 @@ class TestDumpMaster:
|
||||
assert o.verbosity == 2
|
||||
|
||||
def test_filter(self):
|
||||
assert "GET" not in self._dummy_cycle(1, "~u foo", "", verbosity=1)
|
||||
assert "GET" not in self.dummy_cycle(
|
||||
self.mkmaster("~u foo", verbosity=1), 1, ""
|
||||
)
|
||||
|
||||
def test_app(self):
|
||||
o = dump.Options(app=True)
|
||||
@ -172,53 +160,50 @@ class TestDumpMaster:
|
||||
cs = StringIO()
|
||||
o = dump.Options(replacements=[(".*", "content", "foo")])
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
f = self._cycle(m, "content")
|
||||
f = self.cycle(m, "content")
|
||||
assert f.request.content == "foo"
|
||||
|
||||
def test_setheader(self):
|
||||
cs = StringIO()
|
||||
o = dump.Options(setheaders=[(".*", "one", "two")])
|
||||
m = dump.DumpMaster(None, o, outfile=cs)
|
||||
f = self._cycle(m, "content")
|
||||
f = self.cycle(m, "content")
|
||||
assert f.request.headers["one"] == "two"
|
||||
|
||||
def test_basic(self):
|
||||
for i in (1, 2, 3):
|
||||
assert "GET" in self._dummy_cycle(1, "~s", "", flow_detail=i)
|
||||
assert "GET" in self._dummy_cycle(
|
||||
1,
|
||||
"~s",
|
||||
"\x00\x00\x00",
|
||||
flow_detail=i)
|
||||
assert "GET" in self._dummy_cycle(1, "~s", "ascii", flow_detail=i)
|
||||
|
||||
def test_write(self):
|
||||
with tutils.tmpdir() as d:
|
||||
p = os.path.join(d, "a")
|
||||
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
|
||||
self.dummy_cycle(
|
||||
self.mkmaster(None, outfile=(p, "wb"), verbosity=0), 1, ""
|
||||
)
|
||||
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1
|
||||
|
||||
def test_write_append(self):
|
||||
with tutils.tmpdir() as d:
|
||||
p = os.path.join(d, "a.append")
|
||||
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
|
||||
self._dummy_cycle(1, None, "", outfile=(p, "ab"), verbosity=0)
|
||||
self.dummy_cycle(
|
||||
self.mkmaster(None, outfile=(p, "wb"), verbosity=0),
|
||||
1, ""
|
||||
)
|
||||
self.dummy_cycle(
|
||||
self.mkmaster(None, outfile=(p, "ab"), verbosity=0),
|
||||
1, ""
|
||||
)
|
||||
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2
|
||||
|
||||
def test_write_err(self):
|
||||
tutils.raises(
|
||||
dump.DumpError,
|
||||
self._dummy_cycle,
|
||||
1,
|
||||
None,
|
||||
"",
|
||||
outfile = ("nonexistentdir/foo", "wb")
|
||||
self.mkmaster, None, outfile = ("nonexistentdir/foo", "wb")
|
||||
)
|
||||
|
||||
def test_script(self):
|
||||
ret = self._dummy_cycle(
|
||||
1, None, "",
|
||||
ret = self.dummy_cycle(
|
||||
self.mkmaster(
|
||||
None,
|
||||
scripts=[tutils.test_data.path("data/scripts/all.py")], verbosity=1
|
||||
),
|
||||
1, "",
|
||||
)
|
||||
assert "XCLIENTCONNECT" in ret
|
||||
assert "XSERVERCONNECT" in ret
|
||||
@ -227,15 +212,23 @@ class TestDumpMaster:
|
||||
assert "XCLIENTDISCONNECT" in ret
|
||||
tutils.raises(
|
||||
dump.DumpError,
|
||||
self._dummy_cycle, 1, None, "", scripts=["nonexistent"]
|
||||
self.mkmaster,
|
||||
None, scripts=["nonexistent"]
|
||||
)
|
||||
tutils.raises(
|
||||
dump.DumpError,
|
||||
self._dummy_cycle, 1, None, "", scripts=["starterr.py"]
|
||||
self.mkmaster,
|
||||
None, scripts=["starterr.py"]
|
||||
)
|
||||
|
||||
def test_stickycookie(self):
|
||||
self._dummy_cycle(1, None, "", stickycookie = ".*")
|
||||
self.dummy_cycle(
|
||||
self.mkmaster(None, stickycookie = ".*"),
|
||||
1, ""
|
||||
)
|
||||
|
||||
def test_stickyauth(self):
|
||||
self._dummy_cycle(1, None, "", stickyauth = ".*")
|
||||
self.dummy_cycle(
|
||||
self.mkmaster(None, stickyauth = ".*"),
|
||||
1, ""
|
||||
)
|
||||
|
0
test/mitmproxy/test_web_app.py
Normal file
0
test/mitmproxy/test_web_app.py
Normal file
17
test/mitmproxy/test_web_master.py
Normal file
17
test/mitmproxy/test_web_master.py
Normal file
@ -0,0 +1,17 @@
|
||||
from mitmproxy.web import master
|
||||
from . import mastertest
|
||||
|
||||
|
||||
class TestWebMaster(mastertest.MasterTest):
|
||||
def mkmaster(self, filt, **options):
|
||||
o = master.Options(
|
||||
filtstr=filt,
|
||||
**options
|
||||
)
|
||||
return master.WebMaster(None, o)
|
||||
|
||||
def test_basic(self):
|
||||
m = self.mkmaster(None)
|
||||
for i in (1, 2, 3):
|
||||
self.dummy_cycle(m, 1, "")
|
||||
assert len(m.state.flows) == i
|
Loading…
Reference in New Issue
Block a user