fix pep8 whitespace

This commit is contained in:
Thomas Kriechbaumer 2015-06-18 18:12:11 +02:00
parent 90aeda47ae
commit 7a3623a14e
24 changed files with 229 additions and 171 deletions

View File

@ -3,6 +3,7 @@ from libpathod import test
class Test:
"""
Testing the requests module with
a pathod instance started for

View File

@ -3,6 +3,7 @@ from libpathod import test
class Test:
"""
Testing the requests module with
a single pathod instance started

View File

@ -12,6 +12,7 @@ EXAMPLE_WEBSOCKET_KEY = "examplekey"
# pylint: disable=unused-variable
def make_app(noapi, debug):
app = Flask(__name__)
app.debug = debug
@ -20,13 +21,13 @@ def make_app(noapi, debug):
@app.route('/api/info')
def api_info():
return jsonify(
version = version.IVERSION
version=version.IVERSION
)
@app.route('/api/log')
def api_log():
return jsonify(
log = app.config["pathod"].get_log()
log=app.config["pathod"].get_log()
)
@app.route('/api/clear_log')
@ -125,10 +126,10 @@ def make_app(noapi, debug):
spec = request.args["spec"]
args = dict(
spec = spec,
section = "main",
syntaxerror = None,
error = None,
spec=spec,
section="main",
syntaxerror=None,
error=None,
)
if not spec.strip():
args["error"] = "Can't parse an empty spec."

View File

@ -75,7 +75,7 @@ def parse_websocket_frame(s):
websockets.WebsocketFrame.expr()
).parseString(
s,
parseAll = True
parseAll=True
)
except pp.ParseException as v:
raise exceptions.ParseException(v.msg, v.line, v.col)
@ -105,9 +105,9 @@ def serve(msg, fp, settings):
disconnect = writer.write_values(fp, vals, actions[:])
duration = time.time() - started
ret = dict(
disconnect = disconnect,
started = started,
duration = duration,
disconnect=disconnect,
started=started,
duration=duration,
)
ret.update(msg.log(settings))
return ret

View File

@ -8,6 +8,7 @@ from . import base
class _Action(base.Token):
"""
An action that operates on the raw data stream of the message. All
actions have one thing in common: an offset that specifies where the
@ -76,6 +77,7 @@ class PauseAt(_Action):
class DisconnectAt(_Action):
def __init__(self, offset):
_Action.__init__(self, offset)

View File

@ -7,14 +7,15 @@ from .. import utils
from . import generators, exceptions
class Settings(object):
def __init__(
self,
is_client = False,
staticdir = None,
unconstrained_file_access = False,
request_host = None,
websocket_key = None,
protocol = None,
is_client=False,
staticdir=None,
unconstrained_file_access=False,
request_host=None,
websocket_key=None,
protocol=None,
):
self.is_client = is_client
self.staticdir = staticdir
@ -56,6 +57,7 @@ v_naked_literal = pp.MatchFirst(
class Token(object):
"""
A token in the specification language. Tokens are immutable. The token
classes have no meaning in and of themselves, and are combined into
@ -101,6 +103,7 @@ class Token(object):
class _TokValueLiteral(Token):
def __init__(self, val):
self.val = val.decode("string_escape")
@ -112,6 +115,7 @@ class _TokValueLiteral(Token):
class TokValueLiteral(_TokValueLiteral):
"""
A literal with Python-style string escaping
"""
@ -132,6 +136,7 @@ class TokValueLiteral(_TokValueLiteral):
class TokValueNakedLiteral(_TokValueLiteral):
@classmethod
def expr(cls):
e = v_naked_literal.copy()
@ -142,6 +147,7 @@ class TokValueNakedLiteral(_TokValueLiteral):
class TokValueGenerate(Token):
def __init__(self, usize, unit, datatype):
if not unit:
unit = "b"
@ -185,6 +191,7 @@ class TokValueGenerate(Token):
class TokValueFile(Token):
def __init__(self, path):
self.path = str(path)
@ -246,6 +253,7 @@ TokOffset = pp.MatchFirst(
class _Component(Token):
"""
A value component of the primary specification of an message.
Components produce byte values desribe the bytes of the message.
@ -265,6 +273,7 @@ class _Component(Token):
class KeyValue(_Component):
"""
A key/value pair.
cls.preamble: leader
@ -291,6 +300,7 @@ class KeyValue(_Component):
class CaselessLiteral(_Component):
"""
A caseless token that can take only one value.
"""
@ -315,6 +325,7 @@ class CaselessLiteral(_Component):
class OptionsOrValue(_Component):
"""
Can be any of a specified set of options, or a value specifier.
"""
@ -395,6 +406,7 @@ class Integer(_Component):
class Value(_Component):
"""
A value component lead by an optional preamble.
"""
@ -421,6 +433,7 @@ class Value(_Component):
class FixedLengthValue(Value):
"""
A value component lead by an optional preamble.
"""
@ -461,6 +474,7 @@ class FixedLengthValue(Value):
class Boolean(_Component):
"""
A boolean flag.
name = true
@ -489,6 +503,7 @@ class Boolean(_Component):
class IntField(_Component):
"""
An integer field, where values can optionally specified by name.
"""
@ -522,6 +537,7 @@ class IntField(_Component):
class NestedMessage(Token):
"""
A nested message, as an escaped string with a preamble.
"""

View File

@ -8,6 +8,7 @@ class FileAccessDenied(RenderError):
class ParseException(Exception):
def __init__(self, msg, s, col):
Exception.__init__(self)
self.msg = msg

View File

@ -3,20 +3,21 @@ import random
import mmap
DATATYPES = dict(
ascii_letters = string.ascii_letters,
ascii_lowercase = string.ascii_lowercase,
ascii_uppercase = string.ascii_uppercase,
digits = string.digits,
hexdigits = string.hexdigits,
octdigits = string.octdigits,
punctuation = string.punctuation,
whitespace = string.whitespace,
ascii = string.printable,
bytes = "".join(chr(i) for i in range(256))
ascii_letters=string.ascii_letters,
ascii_lowercase=string.ascii_lowercase,
ascii_uppercase=string.ascii_uppercase,
digits=string.digits,
hexdigits=string.hexdigits,
octdigits=string.octdigits,
punctuation=string.punctuation,
whitespace=string.whitespace,
ascii=string.printable,
bytes="".join(chr(i) for i in range(256))
)
class TransformGenerator(object):
"""
Perform a byte-by-byte transform another generator - that is, for each
input byte, the transformation must produce one output byte.
@ -45,6 +46,7 @@ class TransformGenerator(object):
class RandomGenerator(object):
def __init__(self, dtype, length):
self.dtype = dtype
self.length = length
@ -65,6 +67,7 @@ class RandomGenerator(object):
class FileGenerator(object):
def __init__(self, path):
self.path = path
self.fp = file(path, "rb")

View File

@ -5,6 +5,7 @@ from . import base, generators, actions, message
NESTED_LEADER = "pathod!"
class WF(base.CaselessLiteral):
TOK = "wf"
@ -197,7 +198,7 @@ class WebsocketFrame(message.Message):
if self.toklength:
length = int(self.toklength.value)
frameparts = dict(
payload_length = length
payload_length=length
)
if self.mask and self.mask.value:
frameparts["mask"] = True

View File

@ -26,6 +26,7 @@ class PathocError(Exception):
class SSLInfo(object):
def __init__(self, certchain, cipher, alp):
self.certchain, self.cipher, self.alp = certchain, cipher, alp
@ -66,6 +67,7 @@ class SSLInfo(object):
class Response(object):
def __init__(
self,
httpversion,
@ -85,6 +87,7 @@ class Response(object):
class WebsocketFrameReader(threading.Thread):
def __init__(
self,
rfile,
@ -143,6 +146,7 @@ class WebsocketFrameReader(threading.Thread):
class Pathoc(tcp.TCPClient):
def __init__(
self,
address,
@ -157,23 +161,23 @@ class Pathoc(tcp.TCPClient):
# HTTP/2
use_http2=False,
http2_skip_connection_preface=False,
http2_framedump = False,
http2_framedump=False,
# Websockets
ws_read_limit = None,
ws_read_limit=None,
# Network
timeout = None,
timeout=None,
# Output control
showreq = False,
showresp = False,
explain = False,
hexdump = False,
ignorecodes = (),
ignoretimeout = False,
showsummary = False,
fp = sys.stdout
showreq=False,
showresp=False,
explain=False,
hexdump=False,
ignorecodes=(),
ignoretimeout=False,
showsummary=False,
fp=sys.stdout
):
"""
spec: A request specification
@ -222,11 +226,11 @@ class Pathoc(tcp.TCPClient):
self.protocol = None
self.settings = language.Settings(
is_client = True,
staticdir = os.getcwd(),
unconstrained_file_access = True,
request_host = self.address.host,
protocol = self.protocol,
is_client=True,
staticdir=os.getcwd(),
unconstrained_file_access=True,
request_host=self.address.host,
protocol=self.protocol,
)
def log(self):
@ -323,8 +327,8 @@ class Pathoc(tcp.TCPClient):
while True:
try:
frm = self.ws_framereader.frames_queue.get(
timeout = timeout,
block = True if timeout != 0 else False
timeout=timeout,
block=True if timeout != 0 else False
)
except Queue.Empty:
if finish:
@ -394,7 +398,7 @@ class Pathoc(tcp.TCPClient):
)
resp.append(self.sslinfo)
resp = Response(*resp)
except http.HttpError, v:
except http.HttpError as v:
log("Invalid server response: %s" % v)
raise
except tcp.NetLibTimeout:
@ -455,22 +459,22 @@ def main(args): # pragma: nocover
playlist = random.choice(args.requests)
p = Pathoc(
(args.host, args.port),
ssl = args.ssl,
sni = args.sni,
sslversion = args.sslversion,
clientcert = args.clientcert,
ciphers = args.ciphers,
use_http2 = args.use_http2,
http2_skip_connection_preface = args.http2_skip_connection_preface,
http2_framedump = args.http2_framedump,
showreq = args.showreq,
showresp = args.showresp,
explain = args.explain,
hexdump = args.hexdump,
ignorecodes = args.ignorecodes,
timeout = args.timeout,
ignoretimeout = args.ignoretimeout,
showsummary = True
ssl=args.ssl,
sni=args.sni,
sslversion=args.sslversion,
clientcert=args.clientcert,
ciphers=args.ciphers,
use_http2=args.use_http2,
http2_skip_connection_preface=args.http2_skip_connection_preface,
http2_framedump=args.http2_framedump,
showreq=args.showreq,
showresp=args.showresp,
explain=args.explain,
hexdump=args.hexdump,
ignorecodes=args.ignorecodes,
timeout=args.timeout,
ignoretimeout=args.ignoretimeout,
showsummary=True
)
trycount = 0
try:

View File

@ -30,7 +30,7 @@ def args_pathoc(argv, stdout=sys.stdout, stderr=sys.stderr):
)
parser.add_argument(
"-c", dest="connect_to", type=str, default=False,
metavar = "HOST:PORT",
metavar="HOST:PORT",
help="Issue an HTTP CONNECT to connect to the specified host."
)
parser.add_argument(
@ -77,7 +77,7 @@ def args_pathoc(argv, stdout=sys.stdout, stderr=sys.stderr):
parser.add_argument(
'host', type=str,
metavar = "host[:port]",
metavar="host[:port]",
help='Host and port to connect to'
)
parser.add_argument(

View File

@ -29,6 +29,7 @@ class PathodError(Exception):
class SSLOptions(object):
def __init__(
self,
confdir=CONFDIR,
@ -92,7 +93,7 @@ class PathodHandler(tcp.BaseHandler):
language.serve(err, self.wfile, self.settings)
return None, dict(
type="error",
msg = error
msg=error
)
if self.server.explain and not hasattr(crafted, 'is_error_response'):
@ -116,17 +117,17 @@ class PathodHandler(tcp.BaseHandler):
started = time.time()
try:
frm = websockets.Frame.from_file(self.rfile)
except tcp.NetLibIncomplete, e:
lg("Error reading websocket frame: %s"%e)
except tcp.NetLibIncomplete as e:
lg("Error reading websocket frame: %s" % e)
break
ended = time.time()
lg(frm.human_readable())
retlog = dict(
type = "inbound",
protocol = "websockets",
started = started,
duration = ended - started,
frame = dict(
type="inbound",
protocol="websockets",
started=started,
duration=ended - started,
frame=dict(
),
cipher=None,
)
@ -138,7 +139,7 @@ class PathodHandler(tcp.BaseHandler):
nest = frm.payload[len(ld):]
try:
wf_gen = language.parse_websocket_frame(nest)
except language.exceptions.ParseException, v:
except language.exceptions.ParseException as v:
log.write(
self.logfp,
"Parse error in reflected frame specifcation:"
@ -509,7 +510,7 @@ class Pathod(tcp.TCPServer):
self.anchors = anchors
self.settings = language.Settings(
staticdir = self.staticdir
staticdir=self.staticdir
)
def check_policy(self, req, settings):
@ -587,13 +588,13 @@ class Pathod(tcp.TCPServer):
def main(args): # pragma: nocover
ssloptions = SSLOptions(
cn = args.cn,
confdir = args.confdir,
not_after_connect = args.ssl_not_after_connect,
ciphers = args.ciphers,
sslversion = utils.SSLVERSIONS[args.sslversion],
certs = args.ssl_certs,
sans = args.sans,
cn=args.cn,
confdir=args.confdir,
not_after_connect=args.ssl_not_after_connect,
ciphers=args.ciphers,
sslversion=utils.SSLVERSIONS[args.sslversion],
certs=args.ssl_certs,
sans=args.sans,
)
root = logging.getLogger()
@ -619,23 +620,23 @@ def main(args): # pragma: nocover
try:
pd = Pathod(
(args.address, args.port),
craftanchor = args.craftanchor,
ssl = args.ssl,
ssloptions = ssloptions,
staticdir = args.staticdir,
anchors = args.anchors,
sizelimit = args.sizelimit,
noweb = args.noweb,
nocraft = args.nocraft,
noapi = args.noapi,
nohang = args.nohang,
timeout = args.timeout,
logreq = args.logreq,
logresp = args.logresp,
hexdump = args.hexdump,
http2_framedump = args.http2_framedump,
explain = args.explain,
webdebug = args.webdebug
craftanchor=args.craftanchor,
ssl=args.ssl,
ssloptions=ssloptions,
staticdir=args.staticdir,
anchors=args.anchors,
sizelimit=args.sizelimit,
noweb=args.noweb,
nocraft=args.nocraft,
noapi=args.noapi,
nohang=args.nohang,
timeout=args.timeout,
logreq=args.logreq,
logresp=args.logresp,
hexdump=args.hexdump,
http2_framedump=args.http2_framedump,
explain=args.explain,
webdebug=args.webdebug
)
except PathodError as v:
print >> sys.stderr, "Error: %s" % v

View File

@ -82,6 +82,7 @@ class Daemon:
class _PaThread(threading.Thread):
def __init__(self, iface, q, ssl, daemonargs):
threading.Thread.__init__(self)
self.name = "PathodThread"
@ -91,7 +92,7 @@ class _PaThread(threading.Thread):
def run(self):
self.server = pathod.Pathod(
(self.iface, 0),
ssl = self.ssl,
ssl=self.ssl,
**self.daemonargs
)
self.name = "PathodThread (%s:%s)" % (

View File

@ -12,15 +12,16 @@ SSLVERSIONS = {
}
SIZE_UNITS = dict(
b = 1024 ** 0,
k = 1024 ** 1,
m = 1024 ** 2,
g = 1024 ** 3,
t = 1024 ** 4,
b=1024 ** 0,
k=1024 ** 1,
m=1024 ** 2,
g=1024 ** 3,
t=1024 ** 4,
)
class MemBool(object):
"""
Truth-checking with a memory, for use in chained if statements.
"""
@ -84,6 +85,7 @@ def escape_unprintables(s):
class Data(object):
def __init__(self, name):
m = __import__(name)
dirname, _ = os.path.split(m.__file__)

View File

@ -14,6 +14,7 @@ def test_unique_name():
class TestDisconnects:
def test_parse_pathod(self):
a = language.parse_pathod("400:d0").next().actions[0]
assert a.spec() == "d0"
@ -39,6 +40,7 @@ class TestDisconnects:
class TestInject:
def test_parse_pathod(self):
a = language.parse_pathod("400:ir,@100").next().actions[0]
assert a.offset == "r"
@ -77,6 +79,7 @@ class TestInject:
class TestPauses:
def test_parse_pathod(self):
e = actions.PauseAt.expr()
v = e.parseString("p10,10")[0]
@ -107,6 +110,7 @@ class TestPauses:
class Test_Action:
def test_cmp(self):
a = actions.DisconnectAt(0)
b = actions.DisconnectAt(1)

View File

@ -24,6 +24,7 @@ def test_caseless_literal():
class TestTokValueNakedLiteral:
def test_expr(self):
v = base.TokValueNakedLiteral("foo")
assert v.expr()
@ -37,6 +38,7 @@ class TestTokValueNakedLiteral:
class TestTokValueLiteral:
def test_espr(self):
v = base.TokValueLiteral("foo")
assert v.expr()
@ -75,6 +77,7 @@ class TestTokValueLiteral:
class TestTokValueGenerate:
def test_basic(self):
v = base.TokValue.parseString("@10b")[0]
assert v.usize == 10
@ -116,6 +119,7 @@ class TestTokValueGenerate:
class TestTokValueFile:
def test_file_value(self):
v = base.TokValue.parseString("<'one two'")[0]
assert str(v)
@ -164,6 +168,7 @@ class TestTokValueFile:
class TestMisc:
def test_generators(self):
v = base.TokValue.parseString("'val'")[0]
g = v.get_generator({})
@ -227,6 +232,7 @@ class TKeyValue(base.KeyValue):
class TestKeyValue:
def test_simple(self):
e = TKeyValue.expr()
v = e.parseString("h'foo'='bar'")[0]

View File

@ -16,6 +16,7 @@ def test_make_error_response():
class TestRequest:
def test_nonascii(self):
tutils.raises("ascii", parse_request, "get:\xf0")
@ -80,7 +81,7 @@ class TestRequest:
assert language.serve(
r,
s,
language.Settings(request_host = "foo.com")
language.Settings(request_host="foo.com")
)
def test_multiline(self):
@ -142,6 +143,7 @@ class TestRequest:
class TestResponse:
def dummy_response(self):
return language.parse_pathod("400'msg'").next()

View File

@ -10,13 +10,15 @@ import tutils
def parse_request(s):
return language.parse_pathoc(s, True).next()
def parse_response(s):
return language.parse_pathod(s, True).next()
def default_settings():
return language.Settings(
request_host = "foo.com",
protocol = netlib.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234)))
request_host="foo.com",
protocol=netlib.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234)))
)
@ -27,6 +29,7 @@ def test_make_error_response():
class TestRequest:
def test_cached_values(self):
req = parse_request("get:/")
req_id = id(req)
@ -113,6 +116,7 @@ class TestRequest:
class TestResponse:
def test_cached_values(self):
res = parse_response("200")
res_id = id(res)
@ -134,7 +138,7 @@ class TestResponse:
assert r.code.string() == "200"
assert len(r.headers) == 1
assert r.headers[0].values(default_settings()) == ("foo", "bar")
assert r.body == None
assert r.body is None
r = parse_response('200:h"foo"="bar":bfoobar:h"bla"="fasel"')
assert r.code.string() == "200"

View File

@ -10,6 +10,7 @@ def parse_request(s):
class TestWebsocketFrame:
def _test_messages(self, specs, message_klass):
for i in specs:
wf = parse_request(i)

View File

@ -4,6 +4,7 @@ import netlib.tcp
class DummyIO(StringIO.StringIO):
def start_log(self, *args, **kwargs):
pass

View File

@ -20,10 +20,10 @@ class _TestDaemon:
@classmethod
def setUpAll(self):
self.d = test.Daemon(
ssl = self.ssl,
ssloptions = self.ssloptions,
staticdir = tutils.test_data.path("data"),
anchors = [
ssl=self.ssl,
ssloptions=self.ssloptions,
staticdir=tutils.test_data.path("data"),
anchors=[
(re.compile("/anchor/.*"), "202")
]
)
@ -38,8 +38,8 @@ class _TestDaemon:
def test_info(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = self.ssl,
fp = None
ssl=self.ssl,
fp=None
)
c.connect()
resp = c.request("get:/api/info")
@ -61,15 +61,15 @@ class _TestDaemon:
s = cStringIO.StringIO()
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = self.ssl,
showreq = showreq,
showresp = showresp,
explain = explain,
hexdump = hexdump,
ignorecodes = ignorecodes,
ignoretimeout = ignoretimeout,
showsummary = showsummary,
fp = s
ssl=self.ssl,
showreq=showreq,
showresp=showresp,
explain=explain,
hexdump=hexdump,
ignorecodes=ignorecodes,
ignoretimeout=ignoretimeout,
showsummary=showsummary,
fp=s
)
c.connect(showssl=showssl, fp=s)
if timeout:
@ -88,17 +88,17 @@ class _TestDaemon:
class TestDaemonSSL(_TestDaemon):
ssl = True
ssloptions = pathod.SSLOptions(
request_client_cert = True,
sans = ["test1.com", "test2.com"],
alpn_select = http2.HTTP2Protocol.ALPN_PROTO_H2,
request_client_cert=True,
sans=["test1.com", "test2.com"],
alpn_select=http2.HTTP2Protocol.ALPN_PROTO_H2,
)
def test_sni(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
sni = "foobar.com",
fp = None
ssl=True,
sni="foobar.com",
fp=None
)
c.connect()
c.request("get:/p/200")
@ -112,9 +112,9 @@ class TestDaemonSSL(_TestDaemon):
def test_clientcert(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
clientcert = tutils.test_data.path("data/clientcert/client.pem"),
fp = None
ssl=True,
clientcert=tutils.test_data.path("data/clientcert/client.pem"),
fp=None
)
c.connect()
c.request("get:/p/200")
@ -125,8 +125,8 @@ class TestDaemonSSL(_TestDaemon):
def test_http2_without_ssl(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
use_http2 = True,
ssl = False,
use_http2=True,
ssl=False,
)
tutils.raises(NotImplementedError, c.connect)
@ -135,7 +135,7 @@ class TestDaemon(_TestDaemon):
ssl = False
def test_ssl_error(self):
c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl = True, fp=None)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None)
tutils.raises("ssl handshake", c.connect)
def test_showssl(self):
@ -206,7 +206,7 @@ class TestDaemon(_TestDaemon):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ws_read_limit = 1
ws_read_limit=1
)
c.connect()
c.request("ws:/")
@ -237,22 +237,23 @@ class TestDaemonHTTP2(_TestDaemon):
def test_http2(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
use_http2 = True,
ssl = True,
use_http2=True,
ssl=True,
)
assert isinstance(c.protocol, http2.HTTP2Protocol)
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
)
assert c.protocol == None # TODO: change if other protocols get implemented
# TODO: change if other protocols get implemented
assert c.protocol is None
def test_http2_alpn(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
use_http2 = True,
http2_skip_connection_preface = True,
ssl=True,
use_http2=True,
http2_skip_connection_preface=True,
)
tmp_convert_to_ssl = c.convert_to_ssl
@ -266,8 +267,8 @@ class TestDaemonHTTP2(_TestDaemon):
def test_request(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
use_http2 = True,
ssl=True,
use_http2=True,
)
c.connect()
resp = c.request("get:/p/200")

View File

@ -8,6 +8,7 @@ import tutils
class TestPathod(object):
def test_logging(self):
s = cStringIO.StringIO()
p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
@ -56,7 +57,7 @@ class TestNoApi(tutils.DaemonTests):
class TestNotAfterConnect(tutils.DaemonTests):
ssl = False
ssloptions = dict(
not_after_connect = True
not_after_connect=True
)
def test_connect(self):
@ -70,7 +71,7 @@ class TestNotAfterConnect(tutils.DaemonTests):
class TestCustomCert(tutils.DaemonTests):
ssl = True
ssloptions = dict(
certs = [("*", tutils.test_data.path("data/testkey.pem"))],
certs=[("*", tutils.test_data.path("data/testkey.pem"))],
)
def test_connect(self):
@ -84,7 +85,7 @@ class TestCustomCert(tutils.DaemonTests):
class TestSSLCN(tutils.DaemonTests):
ssl = True
ssloptions = dict(
cn = "foo.com"
cn="foo.com"
)
def test_connect(self):
@ -122,6 +123,7 @@ class TestNocraft(tutils.DaemonTests):
class CommonTests(tutils.DaemonTests):
def test_binarydata(self):
r = self.get(r"200:b'\xf0'")
l = self.d.last_log()
@ -222,8 +224,8 @@ class CommonTests(tutils.DaemonTests):
def test_websocket_frame_reflect_error(self):
r, _ = self.pathoc(
["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
ws_read_limit = 1,
timeout = 1
ws_read_limit=1,
timeout=1
)
assert "Parse error" in self.d.text_log()
@ -271,6 +273,7 @@ class TestDaemonSSL(CommonTests):
assert r[0].status_code == 202
assert self.d.last_log()["cipher"][1] > 0
class TestHTTP2(tutils.DaemonTests):
ssl = True
noweb = True

View File

@ -6,6 +6,7 @@ logging.disable(logging.CRITICAL)
class TestDaemonManual:
def test_simple(self):
with test.Daemon() as d:
rsp = requests.get("http://localhost:%s/p/202:da" % d.port)
@ -34,9 +35,9 @@ class TestDaemonManual:
def test_startstop_ssl_explicit(self):
ssloptions = dict(
certfile = tutils.test_data.path("data/testkey.pem"),
cacert = tutils.test_data.path("data/testkey.pem"),
ssl_after_connect = False
certfile=tutils.test_data.path("data/testkey.pem"),
cacert=tutils.test_data.path("data/testkey.pem"),
ssl_after_connect=False
)
d = test.Daemon(ssl=ssloptions)
rsp = requests.get(

View File

@ -29,18 +29,18 @@ class DaemonTests(object):
anchors=[
(re.compile("/anchor/.*"), "202:da")
],
ssl = klass.ssl,
ssloptions = so,
sizelimit = 1 * 1024 * 1024,
noweb = klass.noweb,
noapi = klass.noapi,
nohang = klass.nohang,
timeout = klass.timeout,
hexdump = klass.hexdump,
nocraft = klass.nocraft,
logreq = True,
logresp = True,
explain = True
ssl=klass.ssl,
ssloptions=so,
sizelimit=1 * 1024 * 1024,
noweb=klass.noweb,
noapi=klass.noapi,
nohang=klass.nohang,
timeout=klass.timeout,
hexdump=klass.hexdump,
nocraft=klass.nocraft,
logreq=True,
logresp=True,
explain=True
)
@classmethod
@ -86,9 +86,9 @@ class DaemonTests(object):
("localhost", self.d.port),
ssl=ssl,
ws_read_limit=ws_read_limit,
timeout = timeout,
fp = logfp,
use_http2 = use_http2,
timeout=timeout,
fp=logfp,
use_http2=use_http2,
)
c.connect(connect_to)
ret = []
@ -100,6 +100,7 @@ class DaemonTests(object):
ret.append(frm)
return ret, logfp.getvalue()
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()