pytest.raises: shim new API

This commit is contained in:
Thomas Kriechbaumer 2017-02-06 17:48:44 +01:00
parent 28c0596742
commit 7a9d40817c
22 changed files with 83 additions and 106 deletions

View File

@ -2,6 +2,7 @@ import os
import pytest import pytest
import OpenSSL import OpenSSL
import functools import functools
from contextlib import contextmanager
import mitmproxy.net.tcp import mitmproxy.net.tcp
@ -29,35 +30,19 @@ skip_appveyor = pytest.mark.skipif(
original_pytest_raises = pytest.raises original_pytest_raises = pytest.raises
# TODO: remove this wrapper when pytest 3.1.0 is released
@contextmanager
@functools.wraps(original_pytest_raises) @functools.wraps(original_pytest_raises)
def raises(exc, *args, **kwargs): def raises(exc, *args, **kwargs):
if isinstance(exc, str): with original_pytest_raises(exc, *args, **kwargs) as exc_info:
return RaisesContext(exc) yield
else: if 'match' in kwargs:
return original_pytest_raises(exc, *args, **kwargs) assert exc_info.match(kwargs['match'])
pytest.raises = raises pytest.raises = raises
class RaisesContext:
def __init__(self, expected_exception):
self.expected_exception = expected_exception
def __enter__(self):
return
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
raise AssertionError("No exception raised.")
else:
if self.expected_exception.lower() not in str(exc_val).lower():
raise AssertionError(
"Expected %s, but caught %s" % (repr(self.expected_exception), repr(exc_val))
)
return True
@pytest.fixture() @pytest.fixture()
def disable_alpn(monkeypatch): def disable_alpn(monkeypatch):
monkeypatch.setattr(mitmproxy.net.tcp, 'HAS_ALPN', False) monkeypatch.setattr(mitmproxy.net.tcp, 'HAS_ALPN', False)

View File

@ -16,16 +16,16 @@ class TestReplace:
assert x == ("foo", "bar", "vo/ing/") assert x == ("foo", "bar", "vo/ing/")
x = replace.parse_hook("/bar/voing") x = replace.parse_hook("/bar/voing")
assert x == (".*", "bar", "voing") assert x == (".*", "bar", "voing")
with pytest.raises("invalid replacement"): with pytest.raises(Exception, match="Invalid replacement"):
replace.parse_hook("/") replace.parse_hook("/")
def test_configure(self): def test_configure(self):
r = replace.Replace() r = replace.Replace()
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.configure(r, replacements=[("one", "two", "three")]) tctx.configure(r, replacements=[("one", "two", "three")])
with pytest.raises("invalid filter pattern"): with pytest.raises(Exception, match="Invalid filter pattern"):
tctx.configure(r, replacements=[("~b", "two", "three")]) tctx.configure(r, replacements=[("~b", "two", "three")])
with pytest.raises("invalid regular expression"): with pytest.raises(Exception, match="Invalid regular expression"):
tctx.configure(r, replacements=[("foo", "+", "three")]) tctx.configure(r, replacements=[("foo", "+", "three")])
tctx.configure(r, replacements=["/a/b/c/"]) tctx.configure(r, replacements=["/a/b/c/"])

View File

@ -64,11 +64,11 @@ class TestParseCommand:
script.parse_command(" ") script.parse_command(" ")
def test_no_script_file(self): def test_no_script_file(self):
with pytest.raises("not found"): with pytest.raises(Exception, match="not found"):
script.parse_command("notfound") script.parse_command("notfound")
with tutils.tmpdir() as dir: with tutils.tmpdir() as dir:
with pytest.raises("not a file"): with pytest.raises(Exception, match="Not a file"):
script.parse_command(dir) script.parse_command(dir)
def test_parse_args(self): def test_parse_args(self):
@ -204,7 +204,7 @@ class TestScriptLoader:
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
with m.handlecontext(): with m.handlecontext():
with pytest.raises("file not found"): with pytest.raises(Exception, match="file not found"):
sl.run_once("nonexistent", [f]) sl.run_once("nonexistent", [f])
def test_simple(self): def test_simple(self):

View File

@ -14,13 +14,13 @@ class TestSetHeaders:
assert x == ("foo", "bar", "vo/ing/") assert x == ("foo", "bar", "vo/ing/")
x = setheaders.parse_setheader("/bar/voing") x = setheaders.parse_setheader("/bar/voing")
assert x == (".*", "bar", "voing") assert x == (".*", "bar", "voing")
with pytest.raises("invalid replacement"): with pytest.raises(Exception, match="Invalid replacement"):
setheaders.parse_setheader("/") setheaders.parse_setheader("/")
def test_configure(self): def test_configure(self):
sh = setheaders.SetHeaders() sh = setheaders.SetHeaders()
with taddons.context() as tctx: with taddons.context() as tctx:
with pytest.raises("invalid setheader filter pattern"): with pytest.raises(Exception, match="Invalid setheader filter pattern"):
tctx.configure(sh, setheaders = [("~b", "one", "two")]) tctx.configure(sh, setheaders = [("~b", "one", "two")])
tctx.configure(sh, setheaders = ["/foo/bar/voing"]) tctx.configure(sh, setheaders = ["/foo/bar/voing"])

View File

@ -16,7 +16,7 @@ class TestStickyCookie:
def test_config(self): def test_config(self):
sc = stickycookie.StickyCookie() sc = stickycookie.StickyCookie()
with taddons.context() as tctx: with taddons.context() as tctx:
with pytest.raises("invalid filter"): with pytest.raises(Exception, match="invalid filter"):
tctx.configure(sc, stickycookie="~b") tctx.configure(sc, stickycookie="~b")
tctx.configure(sc, stickycookie="foo") tctx.configure(sc, stickycookie="foo")

View File

@ -18,7 +18,7 @@ def test_configure():
p = os.path.join(tdir, "foo") p = os.path.join(tdir, "foo")
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, streamfile=tdir) tctx.configure(sa, streamfile=tdir)
with pytest.raises("invalid filter"): with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(sa, streamfile=p, filtstr="~~") tctx.configure(sa, streamfile=p, filtstr="~~")
tctx.configure(sa, filtstr="foo") tctx.configure(sa, filtstr="foo")
assert sa.filt assert sa.filt

View File

@ -396,11 +396,11 @@ def test_configure():
v = view.View() v = view.View()
with taddons.context(options=Options()) as tctx: with taddons.context(options=Options()) as tctx:
tctx.configure(v, filter="~q") tctx.configure(v, filter="~q")
with pytest.raises("invalid interception filter"): with pytest.raises(Exception, match="Invalid interception filter"):
tctx.configure(v, filter="~~") tctx.configure(v, filter="~~")
tctx.configure(v, console_order="method") tctx.configure(v, console_order="method")
with pytest.raises("unknown flow order"): with pytest.raises(Exception, match="Unknown flow order"):
tctx.configure(v, console_order="no") tctx.configure(v, console_order="no")
tctx.configure(v, console_order_reversed=True) tctx.configure(v, console_order_reversed=True)

View File

@ -356,11 +356,11 @@ def test_read_chunked():
assert b"".join(_read_chunked(BytesIO(data))) == b"ab" assert b"".join(_read_chunked(BytesIO(data))) == b"ab"
data = b"\r\n" data = b"\r\n"
with pytest.raises("closed prematurely"): with pytest.raises(Exception, match="closed prematurely"):
b"".join(_read_chunked(BytesIO(data))) b"".join(_read_chunked(BytesIO(data)))
data = b"1\r\nfoo" data = b"1\r\nfoo"
with pytest.raises("malformed chunked body"): with pytest.raises(Exception, match="Malformed chunked body"):
b"".join(_read_chunked(BytesIO(data))) b"".join(_read_chunked(BytesIO(data)))
data = b"foo\r\nfoo" data = b"foo\r\nfoo"
@ -368,5 +368,5 @@ def test_read_chunked():
b"".join(_read_chunked(BytesIO(data))) b"".join(_read_chunked(BytesIO(data)))
data = b"5\r\naaaaa\r\n0\r\n\r\n" data = b"5\r\naaaaa\r\n0\r\n\r\n"
with pytest.raises("too large"): with pytest.raises(Exception, match="too large"):
b"".join(_read_chunked(BytesIO(data), limit=2)) b"".join(_read_chunked(BytesIO(data), limit=2))

View File

@ -181,9 +181,8 @@ def test_message_ipv6():
def test_message_invalid_host(): def test_message_invalid_host():
raw = tutils.treader(b"\xEE\x01\x00\x03\x0bexample@com\xDE\xAD\xBE\xEF") raw = tutils.treader(b"\xEE\x01\x00\x03\x0bexample@com\xDE\xAD\xBE\xEF")
with pytest.raises(socks.SocksError) as exc_info: with pytest.raises(socks.SocksError, match="Invalid hostname: b'example@com'"):
socks.Message.from_file(raw) socks.Message.from_file(raw)
assert exc_info.match("Invalid hostname: b'example@com'")
def test_message_invalid_rsv(): def test_message_invalid_rsv():

View File

@ -430,7 +430,7 @@ class TestServerCipherListError(tservers.ServerTestBase):
def test_echo(self): def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises("handshake error"): with pytest.raises(Exception, match="handshake error"):
c.convert_to_ssl(sni="foo.com") c.convert_to_ssl(sni="foo.com")
@ -443,7 +443,7 @@ class TestClientCipherListError(tservers.ServerTestBase):
def test_echo(self): def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises("cipher specification"): with pytest.raises(Exception, match="cipher specification"):
c.convert_to_ssl(sni="foo.com", cipher_list="bogus") c.convert_to_ssl(sni="foo.com", cipher_list="bogus")

View File

@ -108,9 +108,9 @@ class TestFrameHeader:
assert not f2.mask assert not f2.mask
def test_violations(self): def test_violations(self):
with pytest.raises("opcode"): with pytest.raises(Exception, match="opcode"):
websockets.FrameHeader(opcode=17) websockets.FrameHeader(opcode=17)
with pytest.raises("masking key"): with pytest.raises(Exception, match="Masking key"):
websockets.FrameHeader(masking_key=b"x") websockets.FrameHeader(masking_key=b"x")
def test_automask(self): def test_automask(self):

View File

@ -251,9 +251,8 @@ class TestSerialize:
sio.write(b"bogus") sio.write(b"bogus")
sio.seek(0) sio.seek(0)
r = mitmproxy.io.FlowReader(sio) r = mitmproxy.io.FlowReader(sio)
with pytest.raises(FlowReadException) as exc_info: with pytest.raises(FlowReadException, match='Invalid data format'):
list(r.stream()) list(r.stream())
assert exc_info.match('Invalid data format')
sio = io.BytesIO() sio = io.BytesIO()
f = tflow.tdummyflow() f = tflow.tdummyflow()
@ -261,9 +260,8 @@ class TestSerialize:
w.add(f) w.add(f)
sio.seek(0) sio.seek(0)
r = mitmproxy.io.FlowReader(sio) r = mitmproxy.io.FlowReader(sio)
with pytest.raises(FlowReadException) as exc_info: with pytest.raises(FlowReadException, match='Unknown flow type'):
list(r.stream()) list(r.stream())
assert exc_info.match('Unknown flow type')
f = FlowReadException("foo") f = FlowReadException("foo")
assert str(f) == "foo" assert str(f) == "foo"
@ -277,7 +275,7 @@ class TestSerialize:
sio.seek(0) sio.seek(0)
r = mitmproxy.io.FlowReader(sio) r = mitmproxy.io.FlowReader(sio)
with pytest.raises("version"): with pytest.raises(Exception, match="version"):
list(r.stream()) list(r.stream())
@ -287,15 +285,15 @@ class TestFlowMaster:
fm = master.Master(None, DummyServer()) fm = master.Master(None, DummyServer())
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
f.request.content = None f.request.content = None
with pytest.raises("missing"): with pytest.raises(Exception, match="missing"):
fm.replay_request(f) fm.replay_request(f)
f.intercepted = True f.intercepted = True
with pytest.raises("intercepted"): with pytest.raises(Exception, match="intercepted"):
fm.replay_request(f) fm.replay_request(f)
f.live = True f.live = True
with pytest.raises("live"): with pytest.raises(Exception, match="live"):
fm.replay_request(f) fm.replay_request(f)
def test_create_flow(self): def test_create_flow(self):

View File

@ -71,9 +71,9 @@ def test_options():
with pytest.raises(TypeError): with pytest.raises(TypeError):
TO(nonexistent = "value") TO(nonexistent = "value")
with pytest.raises("no such option"): with pytest.raises(Exception, match="No such option"):
o.nonexistent = "value" o.nonexistent = "value"
with pytest.raises("no such option"): with pytest.raises(Exception, match="No such option"):
o.update(nonexistent = "value") o.update(nonexistent = "value")
rec = [] rec = []
@ -97,7 +97,7 @@ def test_setter():
f = o.setter("two") f = o.setter("two")
f("xxx") f("xxx")
assert o.two == "xxx" assert o.two == "xxx"
with pytest.raises("no such option"): with pytest.raises(Exception, match="No such option"):
o.setter("nonexistent") o.setter("nonexistent")
@ -108,7 +108,7 @@ def test_toggler():
assert o.two is False assert o.two is False
f() f()
assert o.two is True assert o.two is True
with pytest.raises("no such option"): with pytest.raises(Exception, match="No such option"):
o.toggler("nonexistent") o.toggler("nonexistent")
@ -193,11 +193,11 @@ def test_serialize():
assert o2 == o assert o2 == o
t = "invalid: foo\ninvalid" t = "invalid: foo\ninvalid"
with pytest.raises("config error"): with pytest.raises(Exception, match="Config error"):
o2.load(t) o2.load(t)
t = "invalid" t = "invalid"
with pytest.raises("config error"): with pytest.raises(Exception, match="Config error"):
o2.load(t) o2.load(t)
t = "" t = ""

View File

@ -14,7 +14,7 @@ class TestLookup:
p = tutils.test_data.path("mitmproxy/data/pf01") p = tutils.test_data.path("mitmproxy/data/pf01")
d = open(p, "rb").read() d = open(p, "rb").read()
assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80) assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80)
with pytest.raises("Could not resolve original destination"): with pytest.raises(Exception, match="Could not resolve original destination"):
pf.lookup("192.168.1.112", 40000, d) pf.lookup("192.168.1.112", 40000, d)
with pytest.raises("Could not resolve original destination"): with pytest.raises(Exception, match="Could not resolve original destination"):
pf.lookup("192.168.1.111", 40001, d) pf.lookup("192.168.1.111", 40001, d)

View File

@ -96,27 +96,27 @@ class TestProcessProxyOptions:
@mock.patch("mitmproxy.platform.original_addr", None) @mock.patch("mitmproxy.platform.original_addr", None)
def test_no_transparent(self): def test_no_transparent(self):
with pytest.raises("transparent mode not supported"): with pytest.raises(Exception, match="Transparent mode not supported"):
self.p("-T") self.p("-T")
@mock.patch("mitmproxy.platform.original_addr") @mock.patch("mitmproxy.platform.original_addr")
def test_modes(self, _): def test_modes(self, _):
self.assert_noerr("-R", "http://localhost") self.assert_noerr("-R", "http://localhost")
with pytest.raises("expected one argument"): with pytest.raises(Exception, match="expected one argument"):
self.p("-R") self.p("-R")
with pytest.raises("Invalid server specification"): with pytest.raises(Exception, match="Invalid server specification"):
self.p("-R", "reverse") self.p("-R", "reverse")
self.assert_noerr("-T") self.assert_noerr("-T")
self.assert_noerr("-U", "http://localhost") self.assert_noerr("-U", "http://localhost")
with pytest.raises("Invalid server specification"): with pytest.raises(Exception, match="Invalid server specification"):
self.p("-U", "upstream") self.p("-U", "upstream")
self.assert_noerr("--upstream-auth", "test:test") self.assert_noerr("--upstream-auth", "test:test")
with pytest.raises("expected one argument"): with pytest.raises(Exception, match="expected one argument"):
self.p("--upstream-auth") self.p("--upstream-auth")
with pytest.raises("mutually exclusive"): with pytest.raises(Exception, match="mutually exclusive"):
self.p("-R", "http://localhost", "-T") self.p("-R", "http://localhost", "-T")
def test_client_certs(self): def test_client_certs(self):
@ -125,14 +125,14 @@ class TestProcessProxyOptions:
self.assert_noerr( self.assert_noerr(
"--client-certs", "--client-certs",
os.path.join(tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem")) os.path.join(tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem"))
with pytest.raises("path does not exist"): with pytest.raises(Exception, match="path does not exist"):
self.p("--client-certs", "nonexistent") self.p("--client-certs", "nonexistent")
def test_certs(self): def test_certs(self):
self.assert_noerr( self.assert_noerr(
"--cert", "--cert",
tutils.test_data.path("mitmproxy/data/testkey.pem")) tutils.test_data.path("mitmproxy/data/testkey.pem"))
with pytest.raises("does not exist"): with pytest.raises(Exception, match="does not exist"):
self.p("--cert", "nonexistent") self.p("--cert", "nonexistent")
def test_insecure(self): def test_insecure(self):
@ -156,12 +156,12 @@ class TestProxyServer:
def test_err(self): def test_err(self):
# binding to 0.0.0.0:1 works without special permissions on Windows # binding to 0.0.0.0:1 works without special permissions on Windows
conf = ProxyConfig(options.Options(listen_port=1)) conf = ProxyConfig(options.Options(listen_port=1))
with pytest.raises("error starting proxy server"): with pytest.raises(Exception, match="Error starting proxy server"):
ProxyServer(conf) ProxyServer(conf)
def test_err_2(self): def test_err_2(self):
conf = ProxyConfig(options.Options(listen_host="invalidhost")) conf = ProxyConfig(options.Options(listen_host="invalidhost"))
with pytest.raises("error starting proxy server"): with pytest.raises(Exception, match="Error starting proxy server"):
ProxyServer(conf) ProxyServer(conf)

View File

@ -3,7 +3,7 @@ from mitmproxy.proxy import config
def test_parse_server_spec(): def test_parse_server_spec():
with pytest.raises("Invalid server specification"): with pytest.raises(Exception, match="Invalid server specification"):
config.parse_server_spec("") config.parse_server_spec("")
assert config.parse_server_spec("http://foo.com:88") == ( assert config.parse_server_spec("http://foo.com:88") == (
"http", ("foo.com", 88) "http", ("foo.com", 88)
@ -14,7 +14,7 @@ def test_parse_server_spec():
assert config.parse_server_spec("https://foo.com") == ( assert config.parse_server_spec("https://foo.com") == (
"https", ("foo.com", 443) "https", ("foo.com", 443)
) )
with pytest.raises("Invalid server specification"): with pytest.raises(Exception, match="Invalid server specification"):
config.parse_server_spec("foo.com") config.parse_server_spec("foo.com")
with pytest.raises("Invalid server specification"): with pytest.raises(Exception, match="Invalid server specification"):
config.parse_server_spec("http://") config.parse_server_spec("http://")

View File

@ -149,11 +149,11 @@ class TestTokValueFile:
v = base.TokValue.parseString("<path2")[0] v = base.TokValue.parseString("<path2")[0]
with pytest.raises(exceptions.FileAccessDenied): with pytest.raises(exceptions.FileAccessDenied):
v.get_generator(language.Settings(staticdir=t)) v.get_generator(language.Settings(staticdir=t))
with pytest.raises("access disabled"): with pytest.raises(Exception, match="access disabled"):
v.get_generator(language.Settings()) v.get_generator(language.Settings())
v = base.TokValue.parseString("</outside")[0] v = base.TokValue.parseString("</outside")[0]
with pytest.raises("outside"): with pytest.raises(Exception, match="outside"):
v.get_generator(language.Settings(staticdir=t)) v.get_generator(language.Settings(staticdir=t))
def test_spec(self): def test_spec(self):
@ -194,32 +194,27 @@ class TestMisc:
v3 = v2.freeze({}) v3 = v2.freeze({})
assert v2.value.val == v3.value.val assert v2.value.val == v3.value.val
def test_fixedlengthvalue(self): def test_fixedlengthvalue(self, tmpdir):
class TT(base.FixedLengthValue): class TT(base.FixedLengthValue):
preamble = "m" preamble = "m"
length = 4 length = 4
e = TT.expr() e = TT.expr()
assert e.parseString("m@4") assert e.parseString("m@4")
with pytest.raises("invalid value length"): with pytest.raises(Exception, match="Invalid value length"):
e.parseString("m@100") e.parseString("m@100")
with pytest.raises("invalid value length"): with pytest.raises(Exception, match="Invalid value length"):
e.parseString("m@1") e.parseString("m@1")
with tutils.tmpdir() as t: s = base.Settings(staticdir=str(tmpdir))
p = os.path.join(t, "path") tmpdir.join("path").write_binary(b"a" * 20, ensure=True)
s = base.Settings(staticdir=t) v = e.parseString("m<path")[0]
with open(p, "wb") as f: with pytest.raises(Exception, match="Invalid value length"):
f.write(b"a" * 20) v.values(s)
v = e.parseString("m<path")[0]
with pytest.raises("invalid value length"):
v.values(s)
p = os.path.join(t, "path") tmpdir.join("path2").write_binary(b"a" * 4, ensure=True)
with open(p, "wb") as f: v = e.parseString("m<path2")[0]
f.write(b"a" * 4) assert v.values(s)
v = e.parseString("m<path")[0]
assert v.values(s)
class TKeyValue(base.KeyValue): class TKeyValue(base.KeyValue):
@ -282,7 +277,7 @@ def test_intfield():
assert v.value == 4 assert v.value == 4
assert v.spec() == "t4" assert v.spec() == "t4"
with pytest.raises("can't exceed"): with pytest.raises(Exception, match="can't exceed"):
e.parseString("t5") e.parseString("t5")
@ -324,9 +319,9 @@ def test_integer():
class BInt(base.Integer): class BInt(base.Integer):
bounds = (1, 5) bounds = (1, 5)
with pytest.raises("must be between"): with pytest.raises(Exception, match="must be between"):
BInt(0) BInt(0)
with pytest.raises("must be between"): with pytest.raises(Exception, match="must be between"):
BInt(6) BInt(6)
assert BInt(5) assert BInt(5)
assert BInt(1) assert BInt(1)

View File

@ -20,7 +20,7 @@ def test_make_error_response():
class TestRequest: class TestRequest:
def test_nonascii(self): def test_nonascii(self):
with pytest.raises("ascii"): with pytest.raises(Exception, match="ASCII"):
parse_request("get:\xf0") parse_request("get:\xf0")
def test_err(self): def test_err(self):
@ -226,7 +226,7 @@ class TestResponse:
assert str(v) assert str(v)
def test_nonascii(self): def test_nonascii(self):
with pytest.raises("ascii"): with pytest.raises(Exception, match="ASCII"):
language.parse_pathod("foo:b\xf0") language.parse_pathod("foo:b\xf0")
def test_parse_header(self): def test_parse_header(self):
@ -263,7 +263,7 @@ class TestResponse:
def test_websockets(self): def test_websockets(self):
r = next(language.parse_pathod("ws")) r = next(language.parse_pathod("ws"))
with pytest.raises("no websocket key"): with pytest.raises(Exception, match="No websocket key"):
r.resolve(language.Settings()) r.resolve(language.Settings())
res = r.resolve(language.Settings(websocket_key=b"foo")) res = r.resolve(language.Settings(websocket_key=b"foo"))
assert res.status_code.string() == b"101" assert res.status_code.string() == b"101"
@ -351,5 +351,5 @@ def test_nested_response_freeze():
def test_unique_components(): def test_unique_components():
with pytest.raises("multiple body clauses"): with pytest.raises(Exception, match="multiple body clauses"):
language.parse_pathod("400:b@1:b@1") language.parse_pathod("400:b@1:b@1")

View File

@ -39,7 +39,7 @@ class TestRequest:
assert req.values(default_settings()) == req.values(default_settings()) assert req.values(default_settings()) == req.values(default_settings())
def test_nonascii(self): def test_nonascii(self):
with pytest.raises("ascii"): with pytest.raises(Exception, match="ASCII"):
parse_request("get:\xf0") parse_request("get:\xf0")
def test_err(self): def test_err(self):
@ -168,7 +168,7 @@ class TestResponse:
assert res.values(default_settings()) == res.values(default_settings()) assert res.values(default_settings()) == res.values(default_settings())
def test_nonascii(self): def test_nonascii(self):
with pytest.raises("ascii"): with pytest.raises(Exception, match="ASCII"):
parse_response("200:\xf0") parse_response("200:\xf0")
def test_err(self): def test_err(self):

View File

@ -130,7 +130,7 @@ class TestWebsocketFrame:
assert frm.payload == b"abc" assert frm.payload == b"abc"
def test_knone(self): def test_knone(self):
with pytest.raises("expected 4 bytes"): with pytest.raises(Exception, match="Expected 4 bytes"):
self.fr("wf:b'foo':mask:knone") self.fr("wf:b'foo':mask:knone")
def test_length(self): def test_length(self):
@ -138,5 +138,5 @@ class TestWebsocketFrame:
frm = self.fr("wf:l2:b'foo'") frm = self.fr("wf:l2:b'foo'")
assert frm.header.payload_length == 2 assert frm.header.payload_length == 2
assert frm.payload == b"fo" assert frm.payload == b"fo"
with pytest.raises("expected 1024 bytes"): with pytest.raises(Exception, match="Expected 1024 bytes"):
self.fr("wf:l1024:b'foo'") self.fr("wf:l1024:b'foo'")

View File

@ -173,12 +173,12 @@ class TestDaemon(PathocTestDaemon):
to = ("foobar", 80) to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.rfile, c.wfile = io.BytesIO(), io.BytesIO() c.rfile, c.wfile = io.BytesIO(), io.BytesIO()
with pytest.raises("connect failed"): with pytest.raises(Exception, match="CONNECT failed"):
c.http_connect(to) c.http_connect(to)
c.rfile = io.BytesIO( c.rfile = io.BytesIO(
b"HTTP/1.1 500 OK\r\n" b"HTTP/1.1 500 OK\r\n"
) )
with pytest.raises("connect failed"): with pytest.raises(Exception, match="CONNECT failed"):
c.http_connect(to) c.http_connect(to)
c.rfile = io.BytesIO( c.rfile = io.BytesIO(
b"HTTP/1.1 200 OK\r\n" b"HTTP/1.1 200 OK\r\n"
@ -195,14 +195,14 @@ class TestDaemon(PathocTestDaemon):
c.rfile = tutils.treader( c.rfile = tutils.treader(
b"\x05\xEE" b"\x05\xEE"
) )
with pytest.raises("SOCKS without authentication"): with pytest.raises(Exception, match="SOCKS without authentication"):
c.socks_connect(("example.com", 0xDEAD)) c.socks_connect(("example.com", 0xDEAD))
c.rfile = tutils.treader( c.rfile = tutils.treader(
b"\x05\x00" + b"\x05\x00" +
b"\x05\xEE\x00\x03\x0bexample.com\xDE\xAD" b"\x05\xEE\x00\x03\x0bexample.com\xDE\xAD"
) )
with pytest.raises("SOCKS server error"): with pytest.raises(Exception, match="SOCKS server error"):
c.socks_connect(("example.com", 0xDEAD)) c.socks_connect(("example.com", 0xDEAD))
c.rfile = tutils.treader( c.rfile = tutils.treader(

View File

@ -134,7 +134,7 @@ class CommonTests(tservers.DaemonTests):
assert len(self.d.log()) == 0 assert len(self.d.log()) == 0
def test_disconnect(self): def test_disconnect(self):
with pytest.raises("unexpected eof"): with pytest.raises(Exception, match="Unexpected EOF"):
self.get("202:b@100k:d200") self.get("202:b@100k:d200")
def test_parserr(self): def test_parserr(self):