Websocket frame read limit.

This commit is contained in:
Aldo Cortesi 2015-04-30 08:03:26 +12:00
parent 7e69fab331
commit f927701e74
5 changed files with 82 additions and 15 deletions

View File

@ -1172,9 +1172,16 @@ class WebsocketFrame(_Message):
return resp return resp
def values(self, settings): def values(self, settings):
vals = [ vals = []
websockets.FrameHeader().to_bytes() if self.body:
] length = len(self.body.value.get_generator(settings))
else:
length = 0
frame = websockets.FrameHeader(
mask = True,
payload_length = length
)
vals = [frame.to_bytes()]
if self.body: if self.body:
vals.append(self.body.value.get_generator(settings)) vals.append(self.body.value.get_generator(settings))
return vals return vals

View File

@ -1,7 +1,9 @@
import sys import sys
import os import os
import hashlib import hashlib
import Queue
import random import random
import select
import time import time
import threading import threading
@ -77,14 +79,28 @@ class Response:
class WebsocketFrameReader(threading.Thread): class WebsocketFrameReader(threading.Thread):
def __init__(self, rfile, callback): def __init__(self, rfile, callback, ws_read_limit):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.ws_read_limit = ws_read_limit
self.rfile, self.callback = rfile, callback self.rfile, self.callback = rfile, callback
self.daemon = True self.terminate = Queue.Queue()
self.is_done = Queue.Queue()
def run(self): def run(self):
while 1: while 1:
print websockets.Frame.from_file(self.rfile) if self.ws_read_limit == 0:
break
r, _, _ = select.select([self.rfile], [], [], 0.05)
try:
self.terminate.get_nowait()
break
except Queue.Empty:
pass
for rfile in r:
print websockets.Frame.from_file(self.rfile).human_readable()
if self.ws_read_limit is not None:
self.ws_read_limit -= 1
self.is_done.put(None)
class Pathoc(tcp.TCPClient): class Pathoc(tcp.TCPClient):
@ -99,6 +115,9 @@ class Pathoc(tcp.TCPClient):
clientcert=None, clientcert=None,
ciphers=None, ciphers=None,
# Websockets
ws_read_limit = None,
# Output control # Output control
showreq = False, showreq = False,
showresp = False, showresp = False,
@ -131,6 +150,8 @@ class Pathoc(tcp.TCPClient):
self.ciphers = ciphers self.ciphers = ciphers
self.sslinfo = None self.sslinfo = None
self.ws_read_limit = ws_read_limit
self.showreq = showreq self.showreq = showreq
self.showresp = showresp self.showresp = showresp
self.explain = explain self.explain = explain
@ -140,6 +161,8 @@ class Pathoc(tcp.TCPClient):
self.showsummary = showsummary self.showsummary = showsummary
self.fp = fp self.fp = fp
self.ws_framereader = None
def http_connect(self, connect_to): def http_connect(self, connect_to):
self.wfile.write( self.wfile.write(
'CONNECT %s:%s HTTP/1.1\r\n'%tuple(connect_to) + 'CONNECT %s:%s HTTP/1.1\r\n'%tuple(connect_to) +
@ -196,6 +219,19 @@ class Pathoc(tcp.TCPClient):
print >> fp, "%s (unprintables escaped):"%header print >> fp, "%s (unprintables escaped):"%header
print >> fp, netlib.utils.cleanBin(data) print >> fp, netlib.utils.cleanBin(data)
def stop(self):
self.ws_framereader.terminate.put(None)
def wait(self):
if self.ws_framereader:
while 1:
try:
self.ws_framereader.is_done.get(timeout=0.05)
self.ws_framereader.join()
return
except Queue.Empty:
pass
def websocket_get_frame(self, frame): def websocket_get_frame(self, frame):
""" """
Called when a frame is received from the server. Called when a frame is received from the server.
@ -230,21 +266,30 @@ class Pathoc(tcp.TCPClient):
print >> self.fp, ">> Spec:", r.spec() print >> self.fp, ">> Spec:", r.spec()
if self.showreq: if self.showreq:
self._show( self._show(
self.fp, ">> Request", self.fp, ">> Websocket Frame",
self.wfile.get_log(), self.wfile.get_log(),
self.hexdump self.hexdump
) )
def websocket_start(self, r, callback=None): def websocket_start(self, r, callback=None, limit=None):
""" """
Performs an HTTP request, and attempts to drop into websocket Performs an HTTP request, and attempts to drop into websocket
connection. connection.
callback: A callback called within the websocket thread for every
server frame.
limit: Disconnect after receiving N server frames.
""" """
resp = self.http(r) resp = self.http(r)
if resp.status_code == 101: if resp.status_code == 101:
if self.showsummary: if self.showsummary:
print >> self.fp, "Websocket connection established..." print >> self.fp, "<< websocket connection established..."
WebsocketFrameReader(self.rfile, self.websocket_get_frame).start() self.ws_framereader = WebsocketFrameReader(
self.rfile,
self.websocket_get_frame,
self.ws_read_limit
)
self.ws_framereader.start()
return resp return resp
def http(self, r): def http(self, r):
@ -340,6 +385,7 @@ class Pathoc(tcp.TCPClient):
def main(args): # pragma: nocover def main(args): # pragma: nocover
memo = set([]) memo = set([])
trycount = 0 trycount = 0
p = None
try: try:
cnt = 0 cnt = 0
while 1: while 1:
@ -406,5 +452,9 @@ def main(args): # pragma: nocover
return return
except (http.HttpError, tcp.NetLibError), v: except (http.HttpError, tcp.NetLibError), v:
pass pass
p.wait()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
if p:
p.stop()
p.wait()

View File

@ -638,7 +638,6 @@ class TestRequest:
class TestWebsocketFrame: class TestWebsocketFrame:
def test_spec(self): def test_spec(self):
e = language.WebsocketFrame.expr() e = language.WebsocketFrame.expr()
wf = e.parseString("wf:b'foo'") wf = e.parseString("wf:b'foo'")

View File

@ -185,10 +185,10 @@ class CommonTests(tutils.DaemonTests):
assert r.status_code == 202 assert r.status_code == 202
def test_websocket(self): def test_websocket(self):
r = self.pathoc("ws:/p/") r = self.pathoc("ws:/p/", ws_read_limit=0)
assert r.status_code == 101 assert r.status_code == 101
r = self.pathoc("ws:/p/ws") r = self.pathoc("ws:/p/ws", ws_read_limit=0)
assert r.status_code == 101 assert r.status_code == 101

View File

@ -64,10 +64,21 @@ class DaemonTests(object):
def get(self, spec): def get(self, spec):
return requests.get(self.d.p(spec), verify=False) return requests.get(self.d.p(spec), verify=False)
def pathoc(self, spec, timeout=None, connect_to=None, ssl=None): def pathoc(
self,
spec,
timeout=None,
connect_to=None,
ssl=None,
ws_read_limit=None
):
if ssl is None: if ssl is None:
ssl = self.ssl ssl = self.ssl
c = pathoc.Pathoc(("localhost", self.d.port), ssl=ssl) c = pathoc.Pathoc(
("localhost", self.d.port),
ssl=ssl,
ws_read_limit=ws_read_limit
)
c.connect(connect_to) c.connect(connect_to)
if timeout: if timeout:
c.settimeout(timeout) c.settimeout(timeout)