Refactor pathoc message receive to use queues and generators

This gives us a nicer, thread-safe interface.
This commit is contained in:
Aldo Cortesi 2015-06-05 16:03:17 +12:00
parent d23691f98c
commit 7412ec83f5
3 changed files with 40 additions and 21 deletions

View File

@ -85,16 +85,15 @@ class WebsocketFrameReader(threading.Thread):
logfp,
showresp,
hexdump,
callback,
ws_read_limit):
threading.Thread.__init__(self)
self.ws_read_limit = ws_read_limit
self.logfp = logfp
self.showresp = showresp
self.hexdump = hexdump
self.rfile, self.callback = rfile, callback
self.rfile = rfile
self.terminate = Queue.Queue()
self.is_done = Queue.Queue()
self.frames_queue = Queue.Queue()
def log(self, rfile):
return log.Log(
@ -121,11 +120,11 @@ class WebsocketFrameReader(threading.Thread):
except tcp.NetLibError:
self.ws_read_limit = 0
break
self.frames_queue.put(frm)
log("<< %s" % frm.header.human_readable())
self.callback(frm)
if self.ws_read_limit is not None:
self.ws_read_limit -= 1
self.is_done.put(None)
self.frames_queue.put(None)
class Pathoc(tcp.TCPClient):
@ -248,15 +247,33 @@ class Pathoc(tcp.TCPClient):
if self.ws_framereader:
self.ws_framereader.terminate.put(None)
def wait(self):
def wait(self, timeout=0.01, finish=True):
"""
A generator that yields frames until Pathoc terminates.
timeout: If specified None may be yielded instead if timeout is
reached. If timeout is None, wait forever. If timeout is 0, return
immedately if nothing is on the queue.
finish: If true, consume messages until the reader shuts down.
Otherwise, return None on timeout.
"""
if self.ws_framereader:
while True:
try:
self.ws_framereader.is_done.get(timeout=0.05)
frm = self.ws_framereader.frames_queue.get(
timeout = timeout,
block = True if timeout != 0 else False
)
except Queue.Empty:
if finish:
continue
else:
return
if frm is None:
self.ws_framereader.join()
return
except Queue.Empty:
pass
yield frm
def websocket_get_frame(self, frame):
"""
@ -281,13 +298,11 @@ class Pathoc(tcp.TCPClient):
return None
raise
def websocket_start(self, r, callback=None, limit=None):
def websocket_start(self, r, limit=None):
"""
Performs an HTTP request, and attempts to drop into websocket
connection.
callback: A callback called within the websocket thread for every
server frame.
limit: Disconnect after receiving N server frames.
"""
resp = self.http(r)
@ -297,7 +312,6 @@ class Pathoc(tcp.TCPClient):
self.fp,
self.showresp,
self.hexdump,
callback,
self.ws_read_limit
)
self.ws_framereader.start()
@ -432,9 +446,13 @@ def main(args): # pragma: nocover
ret = p.request(spec)
if ret and args.oneshot:
return
# We consume the queue when we can, so it doesn't build up.
for i in p.wait(timeout=0, finish=False):
pass
except (http.HttpError, tcp.NetLibError) as v:
break
p.wait()
for i in p.wait(timeout=0.01, finish=True):
pass
except KeyboardInterrupt:
pass
if p:

View File

@ -210,10 +210,8 @@ class CommonTests(tutils.DaemonTests):
assert r.status_code == 101
def test_websocket_frame(self):
r = self.pathoc(["ws:/p/", "wf:f'wf'"], ws_read_limit=1)
#print r
#pprint.pprint(r)
#pprint.pprint(self.d.log())
r = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1)
assert r[1].payload == "test"
class TestDaemon(CommonTests):

View File

@ -88,10 +88,13 @@ class DaemonTests(object):
c.settimeout(timeout)
ret = []
for i in specs:
ret.append(c.request(i))
resp = c.request(i)
if resp:
ret.append(resp)
for frm in c.wait():
ret.append(frm)
return ret
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()