From 7412ec83f55e6a9dcdde84603b88cd67bbf8b04d Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Fri, 5 Jun 2015 16:03:17 +1200 Subject: [PATCH] Refactor pathoc message receive to use queues and generators This gives us a nicer, thread-safe interface. --- libpathod/pathoc.py | 46 +++++++++++++++++++++++++++++++-------------- test/test_pathod.py | 6 ++---- test/tutils.py | 9 ++++++--- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py index cb954e154..385b61db2 100644 --- a/libpathod/pathoc.py +++ b/libpathod/pathoc.py @@ -85,16 +85,15 @@ class WebsocketFrameReader(threading.Thread): logfp, showresp, hexdump, - callback, ws_read_limit): threading.Thread.__init__(self) self.ws_read_limit = ws_read_limit self.logfp = logfp self.showresp = showresp self.hexdump = hexdump - self.rfile, self.callback = rfile, callback + self.rfile = rfile self.terminate = Queue.Queue() - self.is_done = Queue.Queue() + self.frames_queue = Queue.Queue() def log(self, rfile): return log.Log( @@ -121,11 +120,11 @@ class WebsocketFrameReader(threading.Thread): except tcp.NetLibError: self.ws_read_limit = 0 break + self.frames_queue.put(frm) log("<< %s" % frm.header.human_readable()) - self.callback(frm) if self.ws_read_limit is not None: self.ws_read_limit -= 1 - self.is_done.put(None) + self.frames_queue.put(None) class Pathoc(tcp.TCPClient): @@ -248,15 +247,33 @@ class Pathoc(tcp.TCPClient): if self.ws_framereader: self.ws_framereader.terminate.put(None) - def wait(self): + def wait(self, timeout=0.01, finish=True): + """ + A generator that yields frames until Pathoc terminates. + + timeout: If specified None may be yielded instead if timeout is + reached. If timeout is None, wait forever. If timeout is 0, return + immedately if nothing is on the queue. + + finish: If true, consume messages until the reader shuts down. + Otherwise, return None on timeout. + """ if self.ws_framereader: while True: try: - self.ws_framereader.is_done.get(timeout=0.05) + frm = self.ws_framereader.frames_queue.get( + timeout = timeout, + block = True if timeout != 0 else False + ) + except Queue.Empty: + if finish: + continue + else: + return + if frm is None: self.ws_framereader.join() return - except Queue.Empty: - pass + yield frm def websocket_get_frame(self, frame): """ @@ -281,13 +298,11 @@ class Pathoc(tcp.TCPClient): return None raise - def websocket_start(self, r, callback=None, limit=None): + def websocket_start(self, r, limit=None): """ Performs an HTTP request, and attempts to drop into websocket connection. - callback: A callback called within the websocket thread for every - server frame. limit: Disconnect after receiving N server frames. """ resp = self.http(r) @@ -297,7 +312,6 @@ class Pathoc(tcp.TCPClient): self.fp, self.showresp, self.hexdump, - callback, self.ws_read_limit ) self.ws_framereader.start() @@ -432,9 +446,13 @@ def main(args): # pragma: nocover ret = p.request(spec) if ret and args.oneshot: return + # We consume the queue when we can, so it doesn't build up. + for i in p.wait(timeout=0, finish=False): + pass except (http.HttpError, tcp.NetLibError) as v: break - p.wait() + for i in p.wait(timeout=0.01, finish=True): + pass except KeyboardInterrupt: pass if p: diff --git a/test/test_pathod.py b/test/test_pathod.py index d30136929..55a5b32e0 100644 --- a/test/test_pathod.py +++ b/test/test_pathod.py @@ -210,10 +210,8 @@ class CommonTests(tutils.DaemonTests): assert r.status_code == 101 def test_websocket_frame(self): - r = self.pathoc(["ws:/p/", "wf:f'wf'"], ws_read_limit=1) - #print r - #pprint.pprint(r) - #pprint.pprint(self.d.log()) + r = self.pathoc(["ws:/p/", "wf:f'wf:b\"test\"'"], ws_read_limit=1) + assert r[1].payload == "test" class TestDaemon(CommonTests): diff --git a/test/tutils.py b/test/tutils.py index dc118bbef..1e33dfe7b 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -88,9 +88,12 @@ class DaemonTests(object): c.settimeout(timeout) ret = [] for i in specs: - ret.append(c.request(i)) - return ret - + resp = c.request(i) + if resp: + ret.append(resp) + for frm in c.wait(): + ret.append(frm) + return ret @contextmanager def tmpdir(*args, **kwargs):