mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 07:08:10 +00:00
more style cleanup
This commit is contained in:
parent
5c45ca7f9a
commit
e2447406cc
@ -46,7 +46,7 @@ install:
|
|||||||
before_script:
|
before_script:
|
||||||
- "openssl version -a"
|
- "openssl version -a"
|
||||||
- "python -c \"from OpenSSL import SSL; print(SSL.SSLeay_version(SSL.SSLEAY_VERSION))\""
|
- "python -c \"from OpenSSL import SSL; print(SSL.SSLeay_version(SSL.SSLEAY_VERSION))\""
|
||||||
- "[[ $(flake8 -qq --count --exit-zero mitmproxy netlib pathod examples test) -le 71 ]]"
|
- "[[ $(flake8 -qq --count --exit-zero mitmproxy netlib pathod examples test) -le 12 ]]"
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- "py.test --timeout 60 --cov netlib --cov mitmproxy --cov pathod ./test/$SCOPE"
|
- "py.test --timeout 60 --cov netlib --cov mitmproxy --cov pathod ./test/$SCOPE"
|
||||||
|
@ -174,7 +174,7 @@ def handler(f):
|
|||||||
raise exceptions.ControlException("Handler takes one argument: a message")
|
raise exceptions.ControlException("Handler takes one argument: a message")
|
||||||
|
|
||||||
if not hasattr(message, "reply"):
|
if not hasattr(message, "reply"):
|
||||||
raise exceptions.ControlException("Message %s has no reply attribute"%message)
|
raise exceptions.ControlException("Message %s has no reply attribute" % message)
|
||||||
|
|
||||||
# The following ensures that inheritance with wrapped handlers in the
|
# The following ensures that inheritance with wrapped handlers in the
|
||||||
# base class works. If we're the first handler, then responsibility for
|
# base class works. If we're the first handler, then responsibility for
|
||||||
|
@ -4,3 +4,10 @@ from netlib.version import VERSION, IVERSION
|
|||||||
|
|
||||||
NAME = "mitmproxy"
|
NAME = "mitmproxy"
|
||||||
NAMEVERSION = NAME + " " + VERSION
|
NAMEVERSION = NAME + " " + VERSION
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"NAME",
|
||||||
|
"NAMEVERSION",
|
||||||
|
"VERSION",
|
||||||
|
"IVERSION",
|
||||||
|
]
|
||||||
|
@ -112,7 +112,8 @@ class RequestHandler(BasicAuth, tornado.web.RequestHandler):
|
|||||||
class IndexHandler(RequestHandler):
|
class IndexHandler(RequestHandler):
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
_ = self.xsrf_token # https://github.com/tornadoweb/tornado/issues/645
|
token = self.xsrf_token # https://github.com/tornadoweb/tornado/issues/645
|
||||||
|
assert token
|
||||||
self.render("index.html")
|
self.render("index.html")
|
||||||
|
|
||||||
|
|
||||||
|
@ -97,7 +97,7 @@ class Headers(MultiDict):
|
|||||||
headers = {
|
headers = {
|
||||||
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
|
_always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
|
||||||
for name, value in six.iteritems(headers)
|
for name, value in six.iteritems(headers)
|
||||||
}
|
}
|
||||||
self.update(headers)
|
self.update(headers)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -240,7 +240,7 @@ class Request(Message):
|
|||||||
query = utils.urlencode(value)
|
query = utils.urlencode(value)
|
||||||
scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url)
|
scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url)
|
||||||
_, _, _, self.path = utils.parse_url(
|
_, _, _, self.path = utils.parse_url(
|
||||||
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
|
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
|
||||||
|
|
||||||
@query.setter
|
@query.setter
|
||||||
def query(self, value):
|
def query(self, value):
|
||||||
@ -288,7 +288,7 @@ class Request(Message):
|
|||||||
path = "/" + "/".join(components)
|
path = "/" + "/".join(components)
|
||||||
scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url)
|
scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url)
|
||||||
_, _, _, self.path = utils.parse_url(
|
_, _, _, self.path = utils.parse_url(
|
||||||
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
|
urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]))
|
||||||
|
|
||||||
def anticache(self):
|
def anticache(self):
|
||||||
"""
|
"""
|
||||||
|
@ -1,2 +1,11 @@
|
|||||||
from .frame import *
|
from __future__ import absolute_import, print_function, division
|
||||||
from .protocol import *
|
from .frame import FrameHeader, Frame, OPCODE
|
||||||
|
from .protocol import Masker, WebsocketsProtocol
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"FrameHeader",
|
||||||
|
"Frame",
|
||||||
|
"Masker",
|
||||||
|
"WebsocketsProtocol",
|
||||||
|
"OPCODE",
|
||||||
|
]
|
||||||
|
@ -7,9 +7,13 @@ import pyparsing as pp
|
|||||||
|
|
||||||
from . import http, http2, websockets, writer, exceptions
|
from . import http, http2, websockets, writer, exceptions
|
||||||
|
|
||||||
from .exceptions import *
|
from .exceptions import RenderError, FileAccessDenied, ParseException
|
||||||
from .base import Settings
|
from .base import Settings
|
||||||
assert Settings # prevent pyflakes from messing with this
|
|
||||||
|
__all__ = [
|
||||||
|
"RenderError", "FileAccessDenied", "ParseException",
|
||||||
|
"Settings",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def expand(msg):
|
def expand(msg):
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
class RenderError(Exception):
|
class RenderError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -1 +1,7 @@
|
|||||||
from . import http, http2, websockets
|
from . import http, http2, websockets
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"http",
|
||||||
|
"http2",
|
||||||
|
"websockets",
|
||||||
|
]
|
||||||
|
@ -4,3 +4,10 @@ from netlib.version import VERSION, IVERSION
|
|||||||
|
|
||||||
NAME = "pathod"
|
NAME = "pathod"
|
||||||
NAMEVERSION = NAME + " " + VERSION
|
NAMEVERSION = NAME + " " + VERSION
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"NAME",
|
||||||
|
"NAMEVERSION",
|
||||||
|
"VERSION",
|
||||||
|
"IVERSION",
|
||||||
|
]
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
[flake8]
|
[flake8]
|
||||||
max-line-length = 130
|
max-line-length = 140
|
||||||
max-complexity = 25
|
max-complexity = 25
|
||||||
ignore = E251,C901
|
ignore = E251,C901
|
||||||
exclude = mitmproxy/contrib/*
|
exclude = mitmproxy/contrib/*,test/mitmproxy/data/*
|
||||||
|
|
||||||
[pytest]
|
[pytest]
|
||||||
testpaths = test
|
testpaths = test
|
||||||
|
0
test/mitmproxy/console/__init__.py
Normal file
0
test/mitmproxy/console/__init__.py
Normal file
@ -1,13 +1,8 @@
|
|||||||
import os
|
|
||||||
from unittest.case import SkipTest
|
|
||||||
if os.name == "nt":
|
|
||||||
raise SkipTest("Skipped on Windows.")
|
|
||||||
|
|
||||||
|
|
||||||
import mitmproxy.console.common as common
|
import mitmproxy.console.common as common
|
||||||
from . import tutils
|
from .. import tutils
|
||||||
|
|
||||||
|
|
||||||
|
@tutils.skip_appveyor
|
||||||
def test_format_flow():
|
def test_format_flow():
|
||||||
f = tutils.tflow(resp=True)
|
f = tutils.tflow(resp=True)
|
||||||
assert common.format_flow(f, True)
|
assert common.format_flow(f, True)
|
@ -4,7 +4,7 @@ import netlib.tutils
|
|||||||
from mitmproxy import console
|
from mitmproxy import console
|
||||||
from mitmproxy.console import common
|
from mitmproxy.console import common
|
||||||
|
|
||||||
from . import tutils
|
from .. import tutils
|
||||||
|
|
||||||
|
|
||||||
class TestConsoleState:
|
class TestConsoleState:
|
@ -1,11 +1,8 @@
|
|||||||
import os
|
|
||||||
from unittest.case import SkipTest
|
|
||||||
if os.name == "nt":
|
|
||||||
raise SkipTest("Skipped on Windows.")
|
|
||||||
|
|
||||||
import mitmproxy.console.help as help
|
import mitmproxy.console.help as help
|
||||||
|
from .. import tutils
|
||||||
|
|
||||||
|
|
||||||
|
@tutils.skip_appveyor
|
||||||
class TestHelp:
|
class TestHelp:
|
||||||
|
|
||||||
def test_helptext(self):
|
def test_helptext(self):
|
@ -1,10 +1,8 @@
|
|||||||
import os
|
|
||||||
from unittest.case import SkipTest
|
|
||||||
if os.name == "nt":
|
|
||||||
raise SkipTest("Skipped on Windows.")
|
|
||||||
import mitmproxy.console.palettes as palettes
|
import mitmproxy.console.palettes as palettes
|
||||||
|
from .. import tutils
|
||||||
|
|
||||||
|
|
||||||
|
@tutils.skip_appveyor
|
||||||
class TestPalette:
|
class TestPalette:
|
||||||
|
|
||||||
def test_helptext(self):
|
def test_helptext(self):
|
@ -2,7 +2,7 @@ import os
|
|||||||
from os.path import normpath
|
from os.path import normpath
|
||||||
from mitmproxy.console import pathedit
|
from mitmproxy.console import pathedit
|
||||||
|
|
||||||
from . import tutils
|
from .. import tutils
|
||||||
|
|
||||||
|
|
||||||
class TestPathCompleter:
|
class TestPathCompleter:
|
@ -1,6 +1,7 @@
|
|||||||
import time
|
import time
|
||||||
from mitmproxy.script import concurrent
|
from mitmproxy.script import concurrent
|
||||||
|
|
||||||
|
|
||||||
@concurrent
|
@concurrent
|
||||||
def request(context, flow):
|
def request(context, flow):
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
@ -11,7 +11,7 @@ class Dummy:
|
|||||||
|
|
||||||
@tutils.skip_appveyor
|
@tutils.skip_appveyor
|
||||||
def test_concurrent():
|
def test_concurrent():
|
||||||
with Script(tutils.test_data.path("scripts/concurrent_decorator.py"), None) as s:
|
with Script(tutils.test_data.path("data/scripts/concurrent_decorator.py"), None) as s:
|
||||||
def reply():
|
def reply():
|
||||||
reply.acked.set()
|
reply.acked.set()
|
||||||
reply.acked = Event()
|
reply.acked = Event()
|
||||||
@ -27,6 +27,6 @@ def test_concurrent():
|
|||||||
|
|
||||||
|
|
||||||
def test_concurrent_err():
|
def test_concurrent_err():
|
||||||
s = Script(tutils.test_data.path("scripts/concurrent_decorator_err.py"), None)
|
s = Script(tutils.test_data.path("data/scripts/concurrent_decorator_err.py"), None)
|
||||||
with tutils.raises("Concurrent decorator not supported for 'start' method"):
|
with tutils.raises("Concurrent decorator not supported for 'start' method"):
|
||||||
s.load()
|
s.load()
|
||||||
|
@ -21,9 +21,9 @@ class TestParseCommand:
|
|||||||
|
|
||||||
def test_parse_args(self):
|
def test_parse_args(self):
|
||||||
with tutils.chdir(tutils.test_data.dirname):
|
with tutils.chdir(tutils.test_data.dirname):
|
||||||
assert Script.parse_command("scripts/a.py") == ["scripts/a.py"]
|
assert Script.parse_command("data/scripts/a.py") == ["data/scripts/a.py"]
|
||||||
assert Script.parse_command("scripts/a.py foo bar") == ["scripts/a.py", "foo", "bar"]
|
assert Script.parse_command("data/scripts/a.py foo bar") == ["data/scripts/a.py", "foo", "bar"]
|
||||||
assert Script.parse_command("scripts/a.py 'foo bar'") == ["scripts/a.py", "foo bar"]
|
assert Script.parse_command("data/scripts/a.py 'foo bar'") == ["data/scripts/a.py", "foo bar"]
|
||||||
|
|
||||||
@tutils.skip_not_windows
|
@tutils.skip_not_windows
|
||||||
def test_parse_windows(self):
|
def test_parse_windows(self):
|
||||||
@ -33,7 +33,7 @@ class TestParseCommand:
|
|||||||
|
|
||||||
|
|
||||||
def test_simple():
|
def test_simple():
|
||||||
with tutils.chdir(tutils.test_data.path("scripts")):
|
with tutils.chdir(tutils.test_data.path("data/scripts")):
|
||||||
s = Script("a.py --var 42", None)
|
s = Script("a.py --var 42", None)
|
||||||
assert s.filename == "a.py"
|
assert s.filename == "a.py"
|
||||||
assert s.ns is None
|
assert s.ns is None
|
||||||
@ -55,7 +55,7 @@ def test_simple():
|
|||||||
|
|
||||||
|
|
||||||
def test_script_exception():
|
def test_script_exception():
|
||||||
with tutils.chdir(tutils.test_data.path("scripts")):
|
with tutils.chdir(tutils.test_data.path("data/scripts")):
|
||||||
s = Script("syntaxerr.py", None)
|
s = Script("syntaxerr.py", None)
|
||||||
with tutils.raises(ScriptException):
|
with tutils.raises(ScriptException):
|
||||||
s.load()
|
s.load()
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from . import tutils, tservers
|
from . import tservers
|
||||||
|
|
||||||
|
|
||||||
class TestApp(tservers.HTTPProxyTest):
|
class TestApp(tservers.HTTPProxyTest):
|
||||||
@ -7,8 +7,7 @@ class TestApp(tservers.HTTPProxyTest):
|
|||||||
assert self.app("/").status_code == 200
|
assert self.app("/").status_code == 200
|
||||||
|
|
||||||
def test_cert(self):
|
def test_cert(self):
|
||||||
with tutils.tmpdir() as d:
|
for ext in ["pem", "p12"]:
|
||||||
for ext in ["pem", "p12"]:
|
resp = self.app("/cert/%s" % ext)
|
||||||
resp = self.app("/cert/%s" % ext)
|
assert resp.status_code == 200
|
||||||
assert resp.status_code == 200
|
assert resp.content
|
||||||
assert resp.content
|
|
||||||
|
@ -218,7 +218,7 @@ class TestDumpMaster:
|
|||||||
def test_script(self):
|
def test_script(self):
|
||||||
ret = self._dummy_cycle(
|
ret = self._dummy_cycle(
|
||||||
1, None, "",
|
1, None, "",
|
||||||
scripts=[tutils.test_data.path("scripts/all.py")], verbosity=1
|
scripts=[tutils.test_data.path("data/scripts/all.py")], verbosity=1
|
||||||
)
|
)
|
||||||
assert "XCLIENTCONNECT" in ret
|
assert "XCLIENTCONNECT" in ret
|
||||||
assert "XSERVERCONNECT" in ret
|
assert "XSERVERCONNECT" in ret
|
||||||
|
@ -761,13 +761,13 @@ class TestFlowMaster:
|
|||||||
s = flow.State()
|
s = flow.State()
|
||||||
fm = flow.FlowMaster(None, s)
|
fm = flow.FlowMaster(None, s)
|
||||||
|
|
||||||
fm.load_script(tutils.test_data.path("scripts/a.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||||
fm.load_script(tutils.test_data.path("scripts/a.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||||
fm.unload_scripts()
|
fm.unload_scripts()
|
||||||
with tutils.raises(ScriptException):
|
with tutils.raises(ScriptException):
|
||||||
fm.load_script("nonexistent")
|
fm.load_script("nonexistent")
|
||||||
try:
|
try:
|
||||||
fm.load_script(tutils.test_data.path("scripts/starterr.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/starterr.py"))
|
||||||
except ScriptException as e:
|
except ScriptException as e:
|
||||||
assert "ValueError" in str(e)
|
assert "ValueError" in str(e)
|
||||||
assert len(fm.scripts) == 0
|
assert len(fm.scripts) == 0
|
||||||
@ -796,7 +796,7 @@ class TestFlowMaster:
|
|||||||
def test_script_reqerr(self):
|
def test_script_reqerr(self):
|
||||||
s = flow.State()
|
s = flow.State()
|
||||||
fm = flow.FlowMaster(None, s)
|
fm = flow.FlowMaster(None, s)
|
||||||
fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/reqerr.py"))
|
||||||
f = tutils.tflow()
|
f = tutils.tflow()
|
||||||
fm.clientconnect(f.client_conn)
|
fm.clientconnect(f.client_conn)
|
||||||
assert fm.request(f)
|
assert fm.request(f)
|
||||||
@ -804,7 +804,7 @@ class TestFlowMaster:
|
|||||||
def test_script(self):
|
def test_script(self):
|
||||||
s = flow.State()
|
s = flow.State()
|
||||||
fm = flow.FlowMaster(None, s)
|
fm = flow.FlowMaster(None, s)
|
||||||
fm.load_script(tutils.test_data.path("scripts/all.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||||
f = tutils.tflow(resp=True)
|
f = tutils.tflow(resp=True)
|
||||||
|
|
||||||
fm.clientconnect(f.client_conn)
|
fm.clientconnect(f.client_conn)
|
||||||
@ -816,7 +816,7 @@ class TestFlowMaster:
|
|||||||
fm.response(f)
|
fm.response(f)
|
||||||
assert fm.scripts[0].ns["log"][-1] == "response"
|
assert fm.scripts[0].ns["log"][-1] == "response"
|
||||||
# load second script
|
# load second script
|
||||||
fm.load_script(tutils.test_data.path("scripts/all.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||||
assert len(fm.scripts) == 2
|
assert len(fm.scripts) == 2
|
||||||
fm.clientdisconnect(f.server_conn)
|
fm.clientdisconnect(f.server_conn)
|
||||||
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
|
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
|
||||||
@ -825,7 +825,7 @@ class TestFlowMaster:
|
|||||||
# unload first script
|
# unload first script
|
||||||
fm.unload_scripts()
|
fm.unload_scripts()
|
||||||
assert len(fm.scripts) == 0
|
assert len(fm.scripts) == 0
|
||||||
fm.load_script(tutils.test_data.path("scripts/all.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||||
|
|
||||||
f.error = tutils.terr()
|
f.error = tutils.terr()
|
||||||
fm.error(f)
|
fm.error(f)
|
||||||
@ -868,7 +868,7 @@ class TestFlowMaster:
|
|||||||
f.error.reply = controller.DummyReply()
|
f.error.reply = controller.DummyReply()
|
||||||
fm.error(f)
|
fm.error(f)
|
||||||
|
|
||||||
fm.load_script(tutils.test_data.path("scripts/a.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||||
fm.shutdown()
|
fm.shutdown()
|
||||||
|
|
||||||
def test_client_playback(self):
|
def test_client_playback(self):
|
||||||
|
@ -20,11 +20,16 @@ def python_equals(testdata, text):
|
|||||||
assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip()
|
assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip()
|
||||||
|
|
||||||
|
|
||||||
req_get = lambda: netlib.tutils.treq(method='GET', content='', path=b"/path?a=foo&a=bar&b=baz")
|
def req_get():
|
||||||
|
return netlib.tutils.treq(method='GET', content='', path=b"/path?a=foo&a=bar&b=baz")
|
||||||
|
|
||||||
req_post = lambda: netlib.tutils.treq(method='POST', headers=())
|
|
||||||
|
|
||||||
req_patch = lambda: netlib.tutils.treq(method='PATCH', path=b"/path?query=param")
|
def req_post():
|
||||||
|
return netlib.tutils.treq(method='POST', headers=())
|
||||||
|
|
||||||
|
|
||||||
|
def req_patch():
|
||||||
|
return netlib.tutils.treq(method='PATCH', path=b"/path?query=param")
|
||||||
|
|
||||||
|
|
||||||
class TestExportCurlCommand():
|
class TestExportCurlCommand():
|
||||||
@ -47,22 +52,22 @@ class TestExportCurlCommand():
|
|||||||
class TestExportPythonCode():
|
class TestExportPythonCode():
|
||||||
def test_get(self):
|
def test_get(self):
|
||||||
flow = tutils.tflow(req=req_get())
|
flow = tutils.tflow(req=req_get())
|
||||||
python_equals("test_flow_export/python_get.py", flow_export.python_code(flow))
|
python_equals("data/test_flow_export/python_get.py", flow_export.python_code(flow))
|
||||||
|
|
||||||
def test_post(self):
|
def test_post(self):
|
||||||
flow = tutils.tflow(req=req_post())
|
flow = tutils.tflow(req=req_post())
|
||||||
python_equals("test_flow_export/python_post.py", flow_export.python_code(flow))
|
python_equals("data/test_flow_export/python_post.py", flow_export.python_code(flow))
|
||||||
|
|
||||||
def test_post_json(self):
|
def test_post_json(self):
|
||||||
p = req_post()
|
p = req_post()
|
||||||
p.content = '{"name": "example", "email": "example@example.com"}'
|
p.content = '{"name": "example", "email": "example@example.com"}'
|
||||||
p.headers = Headers(content_type="application/json")
|
p.headers = Headers(content_type="application/json")
|
||||||
flow = tutils.tflow(req=p)
|
flow = tutils.tflow(req=p)
|
||||||
python_equals("test_flow_export/python_post_json.py", flow_export.python_code(flow))
|
python_equals("data/test_flow_export/python_post_json.py", flow_export.python_code(flow))
|
||||||
|
|
||||||
def test_patch(self):
|
def test_patch(self):
|
||||||
flow = tutils.tflow(req=req_patch())
|
flow = tutils.tflow(req=req_patch())
|
||||||
python_equals("test_flow_export/python_patch.py", flow_export.python_code(flow))
|
python_equals("data/test_flow_export/python_patch.py", flow_export.python_code(flow))
|
||||||
|
|
||||||
|
|
||||||
class TestRawRequest():
|
class TestRawRequest():
|
||||||
@ -103,32 +108,32 @@ class TestRawRequest():
|
|||||||
class TestExportLocustCode():
|
class TestExportLocustCode():
|
||||||
def test_get(self):
|
def test_get(self):
|
||||||
flow = tutils.tflow(req=req_get())
|
flow = tutils.tflow(req=req_get())
|
||||||
python_equals("test_flow_export/locust_get.py", flow_export.locust_code(flow))
|
python_equals("data/test_flow_export/locust_get.py", flow_export.locust_code(flow))
|
||||||
|
|
||||||
def test_post(self):
|
def test_post(self):
|
||||||
p = req_post()
|
p = req_post()
|
||||||
p.content = '''content'''
|
p.content = '''content'''
|
||||||
p.headers = ''
|
p.headers = ''
|
||||||
flow = tutils.tflow(req=p)
|
flow = tutils.tflow(req=p)
|
||||||
python_equals("test_flow_export/locust_post.py", flow_export.locust_code(flow))
|
python_equals("data/test_flow_export/locust_post.py", flow_export.locust_code(flow))
|
||||||
|
|
||||||
def test_patch(self):
|
def test_patch(self):
|
||||||
flow = tutils.tflow(req=req_patch())
|
flow = tutils.tflow(req=req_patch())
|
||||||
python_equals("test_flow_export/locust_patch.py", flow_export.locust_code(flow))
|
python_equals("data/test_flow_export/locust_patch.py", flow_export.locust_code(flow))
|
||||||
|
|
||||||
|
|
||||||
class TestExportLocustTask():
|
class TestExportLocustTask():
|
||||||
def test_get(self):
|
def test_get(self):
|
||||||
flow = tutils.tflow(req=req_get())
|
flow = tutils.tflow(req=req_get())
|
||||||
python_equals("test_flow_export/locust_task_get.py", flow_export.locust_task(flow))
|
python_equals("data/test_flow_export/locust_task_get.py", flow_export.locust_task(flow))
|
||||||
|
|
||||||
def test_post(self):
|
def test_post(self):
|
||||||
flow = tutils.tflow(req=req_post())
|
flow = tutils.tflow(req=req_post())
|
||||||
python_equals("test_flow_export/locust_task_post.py", flow_export.locust_task(flow))
|
python_equals("data/test_flow_export/locust_task_post.py", flow_export.locust_task(flow))
|
||||||
|
|
||||||
def test_patch(self):
|
def test_patch(self):
|
||||||
flow = tutils.tflow(req=req_patch())
|
flow = tutils.tflow(req=req_patch())
|
||||||
python_equals("test_flow_export/locust_task_patch.py", flow_export.locust_task(flow))
|
python_equals("data/test_flow_export/locust_task_patch.py", flow_export.locust_task(flow))
|
||||||
|
|
||||||
|
|
||||||
class TestIsJson():
|
class TestIsJson():
|
||||||
@ -144,7 +149,7 @@ class TestIsJson():
|
|||||||
j = flow_export.is_json(headers, '{"name": "example", "email": "example@example.com"}')
|
j = flow_export.is_json(headers, '{"name": "example", "email": "example@example.com"}')
|
||||||
assert j is False
|
assert j is False
|
||||||
|
|
||||||
def test_valid(self):
|
def test_valid2(self):
|
||||||
headers = Headers(content_type="application/json")
|
headers = Headers(content_type="application/json")
|
||||||
j = flow_export.is_json(headers, '{"name": "example", "email": "example@example.com"}')
|
j = flow_export.is_json(headers, '{"name": "example", "email": "example@example.com"}')
|
||||||
assert isinstance(j, dict)
|
assert isinstance(j, dict)
|
||||||
|
@ -164,12 +164,21 @@ class TestSimple(_Http2TestBase, _Http2ServerBase):
|
|||||||
assert ('client-foo', 'client-bar-1') in event.headers
|
assert ('client-foo', 'client-bar-1') in event.headers
|
||||||
assert ('client-foo', 'client-bar-2') in event.headers
|
assert ('client-foo', 'client-bar-2') in event.headers
|
||||||
|
|
||||||
h2_conn.send_headers(event.stream_id, [
|
import warnings
|
||||||
(':status', '200'),
|
with warnings.catch_warnings():
|
||||||
('server-foo', 'server-bar'),
|
# Ignore UnicodeWarning:
|
||||||
('föo', 'bär'),
|
# h2/utilities.py:64: UnicodeWarning: Unicode equal comparison
|
||||||
('X-Stream-ID', str(event.stream_id)),
|
# failed to convert both arguments to Unicode - interpreting
|
||||||
])
|
# them as being unequal.
|
||||||
|
# elif header[0] in (b'cookie', u'cookie') and len(header[1]) < 20:
|
||||||
|
|
||||||
|
warnings.simplefilter("ignore")
|
||||||
|
h2_conn.send_headers(event.stream_id, [
|
||||||
|
(':status', '200'),
|
||||||
|
('server-foo', 'server-bar'),
|
||||||
|
('föo', 'bär'),
|
||||||
|
('X-Stream-ID', str(event.stream_id)),
|
||||||
|
])
|
||||||
h2_conn.send_data(event.stream_id, b'foobar')
|
h2_conn.send_data(event.stream_id, b'foobar')
|
||||||
h2_conn.end_stream(event.stream_id)
|
h2_conn.end_stream(event.stream_id)
|
||||||
wfile.write(h2_conn.data_to_send())
|
wfile.write(h2_conn.data_to_send())
|
||||||
@ -434,6 +443,7 @@ class TestPushPromise(_Http2TestBase, _Http2ServerBase):
|
|||||||
assert b'regular_stream' in bodies
|
assert b'regular_stream' in bodies
|
||||||
# the other two bodies might not be transmitted before the reset
|
# the other two bodies might not be transmitted before the reset
|
||||||
|
|
||||||
|
|
||||||
@requires_alpn
|
@requires_alpn
|
||||||
class TestConnectionLost(_Http2TestBase, _Http2ServerBase):
|
class TestConnectionLost(_Http2TestBase, _Http2ServerBase):
|
||||||
|
|
||||||
|
@ -113,11 +113,10 @@ class TestProcessProxyOptions:
|
|||||||
"nonexistent")
|
"nonexistent")
|
||||||
|
|
||||||
def test_certs(self):
|
def test_certs(self):
|
||||||
with tutils.tmpdir() as cadir:
|
self.assert_noerr(
|
||||||
self.assert_noerr(
|
"--cert",
|
||||||
"--cert",
|
tutils.test_data.path("data/testkey.pem"))
|
||||||
tutils.test_data.path("data/testkey.pem"))
|
self.assert_err("does not exist", "--cert", "nonexistent")
|
||||||
self.assert_err("does not exist", "--cert", "nonexistent")
|
|
||||||
|
|
||||||
def test_auth(self):
|
def test_auth(self):
|
||||||
p = self.assert_noerr("--nonanonymous")
|
p = self.assert_noerr("--nonanonymous")
|
||||||
|
@ -5,7 +5,7 @@ from . import tutils
|
|||||||
def test_duplicate_flow():
|
def test_duplicate_flow():
|
||||||
s = flow.State()
|
s = flow.State()
|
||||||
fm = flow.FlowMaster(None, s)
|
fm = flow.FlowMaster(None, s)
|
||||||
fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
|
fm.load_script(tutils.test_data.path("data/scripts/duplicate_flow.py"))
|
||||||
f = tutils.tflow()
|
f = tutils.tflow()
|
||||||
fm.request(f)
|
fm.request(f)
|
||||||
assert fm.state.flow_count() == 2
|
assert fm.state.flow_count() == 2
|
||||||
|
@ -286,7 +286,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
|
|||||||
self.master.set_stream_large_bodies(None)
|
self.master.set_stream_large_bodies(None)
|
||||||
|
|
||||||
def test_stream_modify(self):
|
def test_stream_modify(self):
|
||||||
self.master.load_script(tutils.test_data.path("scripts/stream_modify.py"))
|
self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py"))
|
||||||
d = self.pathod('200:b"foo"')
|
d = self.pathod('200:b"foo"')
|
||||||
assert d.content == "bar"
|
assert d.content == "bar"
|
||||||
self.master.unload_scripts()
|
self.master.unload_scripts()
|
||||||
@ -511,7 +511,7 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
|||||||
ssl = False
|
ssl = False
|
||||||
|
|
||||||
def test_tcp_stream_modify(self):
|
def test_tcp_stream_modify(self):
|
||||||
self.master.load_script(tutils.test_data.path("scripts/tcp_stream_modify.py"))
|
self.master.load_script(tutils.test_data.path("data/scripts/tcp_stream_modify.py"))
|
||||||
|
|
||||||
self._tcpproxy_on()
|
self._tcpproxy_on()
|
||||||
d = self.pathod('200:b"foo"')
|
d = self.pathod('200:b"foo"')
|
||||||
|
@ -156,6 +156,7 @@ def chdir(dir):
|
|||||||
yield
|
yield
|
||||||
os.chdir(orig_dir)
|
os.chdir(orig_dir)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def tmpdir(*args, **kwargs):
|
def tmpdir(*args, **kwargs):
|
||||||
temp_workdir = tempfile.mkdtemp(*args, **kwargs)
|
temp_workdir = tempfile.mkdtemp(*args, **kwargs)
|
||||||
|
@ -64,5 +64,3 @@ do("openssl req -x509 -new -nodes -batch "
|
|||||||
"-days 1024 "
|
"-days 1024 "
|
||||||
"-out self-signed.crt".format(SUBJECT)
|
"-out self-signed.crt".format(SUBJECT)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ class TestBasicProxyAuth:
|
|||||||
assert ba.authenticate(headers)
|
assert ba.authenticate(headers)
|
||||||
|
|
||||||
ba.clean(headers)
|
ba.clean(headers)
|
||||||
assert not ba.AUTH_HEADER in headers
|
assert ba.AUTH_HEADER not in headers
|
||||||
|
|
||||||
headers[ba.AUTH_HEADER] = ""
|
headers[ba.AUTH_HEADER] = ""
|
||||||
assert not ba.authenticate(headers)
|
assert not ba.authenticate(headers)
|
||||||
|
@ -184,7 +184,7 @@ def test_parse_set_cookie_pairs():
|
|||||||
assert ret == lst
|
assert ret == lst
|
||||||
s2 = cookies._format_set_cookie_pairs(ret)
|
s2 = cookies._format_set_cookie_pairs(ret)
|
||||||
ret2 = cookies._parse_set_cookie_pairs(s2)
|
ret2 = cookies._parse_set_cookie_pairs(s2)
|
||||||
assert ret2 == lst
|
assert ret2 == lst
|
||||||
|
|
||||||
|
|
||||||
def test_parse_set_cookie_header():
|
def test_parse_set_cookie_header():
|
||||||
|
@ -59,7 +59,8 @@ class TestResponseUtils(object):
|
|||||||
|
|
||||||
def test_get_cookies_with_parameters(self):
|
def test_get_cookies_with_parameters(self):
|
||||||
resp = tresp()
|
resp = tresp()
|
||||||
resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
|
cookie = "cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"
|
||||||
|
resp.headers = Headers(set_cookie=cookie)
|
||||||
result = resp.cookies
|
result = resp.cookies
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert "cookiename" in result
|
assert "cookiename" in result
|
||||||
|
@ -49,7 +49,7 @@ class TestMultiDict(object):
|
|||||||
assert md["foo"] == "bar"
|
assert md["foo"] == "bar"
|
||||||
|
|
||||||
with tutils.raises(KeyError):
|
with tutils.raises(KeyError):
|
||||||
_ = md["bar"]
|
md["bar"]
|
||||||
|
|
||||||
md_multi = TMultiDict(
|
md_multi = TMultiDict(
|
||||||
[("foo", "a"), ("foo", "b")]
|
[("foo", "a"), ("foo", "b")]
|
||||||
|
@ -15,6 +15,7 @@ from netlib.exceptions import InvalidCertificateException, TcpReadIncomplete, Tl
|
|||||||
|
|
||||||
from . import tservers
|
from . import tservers
|
||||||
|
|
||||||
|
|
||||||
class EchoHandler(tcp.BaseHandler):
|
class EchoHandler(tcp.BaseHandler):
|
||||||
sni = None
|
sni = None
|
||||||
|
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
# coding=utf-8
|
# coding=utf-8
|
||||||
|
|
||||||
from netlib import utils, tutils
|
from netlib import utils, tutils
|
||||||
from netlib.http import Headers
|
from netlib.http import Headers
|
||||||
|
|
||||||
|
|
||||||
def test_bidi():
|
def test_bidi():
|
||||||
b = utils.BiDi(a=1, b=2)
|
b = utils.BiDi(a=1, b=2)
|
||||||
assert b.a == 1
|
assert b.a == 1
|
||||||
@ -189,4 +191,4 @@ def test_escaped_str_to_bytes():
|
|||||||
assert utils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
|
assert utils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
|
||||||
assert utils.escaped_str_to_bytes(u"\\x08") == b"\b"
|
assert utils.escaped_str_to_bytes(u"\\x08") == b"\b"
|
||||||
assert utils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
|
assert utils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
|
||||||
assert utils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
|
assert utils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
|
||||||
|
@ -11,7 +11,7 @@ def tflow():
|
|||||||
|
|
||||||
|
|
||||||
class ExampleApp:
|
class ExampleApp:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.called = False
|
self.called = False
|
||||||
|
|
||||||
|
@ -120,8 +120,8 @@ class TestWebSockets(tservers.ServerTestBase):
|
|||||||
default builder should always generate valid frames
|
default builder should always generate valid frames
|
||||||
"""
|
"""
|
||||||
msg = self.random_bytes()
|
msg = self.random_bytes()
|
||||||
client_frame = websockets.Frame.default(msg, from_client=True)
|
assert websockets.Frame.default(msg, from_client=True)
|
||||||
server_frame = websockets.Frame.default(msg, from_client=False)
|
assert websockets.Frame.default(msg, from_client=False)
|
||||||
|
|
||||||
def test_serialization_bijection(self):
|
def test_serialization_bijection(self):
|
||||||
"""
|
"""
|
||||||
|
@ -68,9 +68,9 @@ class TestInject:
|
|||||||
def test_spec(self):
|
def test_spec(self):
|
||||||
e = actions.InjectAt.expr()
|
e = actions.InjectAt.expr()
|
||||||
v = e.parseString("i0,'foo'")[0]
|
v = e.parseString("i0,'foo'")[0]
|
||||||
assert v.spec() == 'i0,"foo"'
|
assert v.spec() == "i0,'foo'"
|
||||||
|
|
||||||
def test_spec(self):
|
def test_spec2(self):
|
||||||
e = actions.InjectAt.expr()
|
e = actions.InjectAt.expr()
|
||||||
v = e.parseString("i0,@100")[0]
|
v = e.parseString("i0,@100")[0]
|
||||||
v2 = v.freeze({})
|
v2 = v.freeze({})
|
||||||
|
@ -178,7 +178,7 @@ class TestMisc:
|
|||||||
assert base.TokValue.parseString('"val"')[0].val == "val"
|
assert base.TokValue.parseString('"val"')[0].val == "val"
|
||||||
assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'"
|
assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'"
|
||||||
|
|
||||||
def test_value(self):
|
def test_value2(self):
|
||||||
class TT(base.Value):
|
class TT(base.Value):
|
||||||
preamble = "m"
|
preamble = "m"
|
||||||
e = TT.expr()
|
e = TT.expr()
|
||||||
|
@ -51,7 +51,7 @@ class TestNoApi(tutils.DaemonTests):
|
|||||||
assert self.getpath("/log").status_code == 404
|
assert self.getpath("/log").status_code == 404
|
||||||
r = self.getpath("/")
|
r = self.getpath("/")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert not "Log" in r.content
|
assert "Log" not in r.content
|
||||||
|
|
||||||
|
|
||||||
class TestNotAfterConnect(tutils.DaemonTests):
|
class TestNotAfterConnect(tutils.DaemonTests):
|
||||||
@ -110,7 +110,7 @@ class TestHexdump(tutils.DaemonTests):
|
|||||||
hexdump = True
|
hexdump = True
|
||||||
|
|
||||||
def test_hexdump(self):
|
def test_hexdump(self):
|
||||||
r = self.get(r"200:b'\xf0'")
|
assert self.get(r"200:b'\xf0'")
|
||||||
|
|
||||||
|
|
||||||
class TestNocraft(tutils.DaemonTests):
|
class TestNocraft(tutils.DaemonTests):
|
||||||
@ -125,8 +125,8 @@ class TestNocraft(tutils.DaemonTests):
|
|||||||
class CommonTests(tutils.DaemonTests):
|
class CommonTests(tutils.DaemonTests):
|
||||||
|
|
||||||
def test_binarydata(self):
|
def test_binarydata(self):
|
||||||
r = self.get(r"200:b'\xf0'")
|
assert self.get(r"200:b'\xf0'")
|
||||||
l = self.d.last_log()
|
assert self.d.last_log()
|
||||||
# FIXME: Other binary data elements
|
# FIXME: Other binary data elements
|
||||||
|
|
||||||
@pytest.mark.skip(reason="race condition")
|
@pytest.mark.skip(reason="race condition")
|
||||||
@ -147,7 +147,7 @@ class CommonTests(tutils.DaemonTests):
|
|||||||
def test_logs(self):
|
def test_logs(self):
|
||||||
assert self.d.clear_log()
|
assert self.d.clear_log()
|
||||||
assert not self.d.last_log()
|
assert not self.d.last_log()
|
||||||
rsp = self.get("202:da")
|
assert self.get("202:da")
|
||||||
assert len(self.d.log()) == 1
|
assert len(self.d.log()) == 1
|
||||||
assert self.d.clear_log()
|
assert self.d.clear_log()
|
||||||
assert len(self.d.log()) == 0
|
assert len(self.d.log()) == 0
|
||||||
|
Loading…
Reference in New Issue
Block a user