fix reply concurrency

This commit is contained in:
Maximilian Hils 2016-08-04 17:03:27 -07:00
parent dcfa7027ae
commit a52a1df23c

View File

@ -222,7 +222,9 @@ def handler(f):
# Reset the handled flag - it's common for us to feed the same object # Reset the handled flag - it's common for us to feed the same object
# through handlers repeatedly, so we don't want this to persist across # through handlers repeatedly, so we don't want this to persist across
# calls. # calls.
if message.reply.handled: if handling:
if not message.reply.taken:
message.reply.commit()
message.reply.handled = False message.reply.handled = False
return ret return ret
# Mark this function as a handler wrapper # Mark this function as a handler wrapper
@ -230,6 +232,9 @@ def handler(f):
return wrapper return wrapper
NO_REPLY = object()
class Reply(object): class Reply(object):
""" """
Messages sent through a channel are decorated with a "reply" attribute. Messages sent through a channel are decorated with a "reply" attribute.
@ -241,6 +246,8 @@ class Reply(object):
self.q = queue.Queue() self.q = queue.Queue()
# Has this message been acked? # Has this message been acked?
self.acked = False self.acked = False
# What's the ack message?
self.ack_message = NO_REPLY
# Has the user taken responsibility for ack-ing? # Has the user taken responsibility for ack-ing?
self.taken = False self.taken = False
# Has a handler taken responsibility for ack-ing? # Has a handler taken responsibility for ack-ing?
@ -258,8 +265,21 @@ class Reply(object):
def send(self, msg): def send(self, msg):
if self.acked: if self.acked:
raise exceptions.ControlException("Message already acked.") raise exceptions.ControlException("Message already acked.")
if self.ack_message != NO_REPLY:
# We may have overrides for this later.
raise exceptions.ControlException("Message already has a response.")
self.acked = True self.acked = True
self.q.put(msg) self.ack_message = msg
if self.taken:
self.commit()
def commit(self):
"""
This is called by the handler to actually send the ack message.
"""
if self.ack_message == NO_REPLY:
raise exceptions.ControlException("Message has no response.")
self.q.put(self.ack_message)
def __del__(self): def __del__(self):
if not self.acked: if not self.acked: