Remove unpack_dispatch_and_ack method
This commit is contained in:
parent
7f27c43b30
commit
e270c7d5f8
@ -259,63 +259,60 @@ class Session:
|
||||
break
|
||||
|
||||
try:
|
||||
self.unpack_dispatch_and_ack(packet)
|
||||
data = self.unpack(BytesIO(packet))
|
||||
|
||||
messages = (
|
||||
data.body.messages
|
||||
if isinstance(data.body, MsgContainer)
|
||||
else [data]
|
||||
)
|
||||
|
||||
log.debug(data)
|
||||
|
||||
for msg in messages:
|
||||
if msg.seq_no % 2 != 0:
|
||||
if msg.msg_id in self.pending_acks:
|
||||
continue
|
||||
else:
|
||||
self.pending_acks.add(msg.msg_id)
|
||||
|
||||
if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)):
|
||||
self.pending_acks.add(msg.body.answer_msg_id)
|
||||
continue
|
||||
|
||||
if isinstance(msg.body, types.NewSessionCreated):
|
||||
continue
|
||||
|
||||
msg_id = None
|
||||
|
||||
if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)):
|
||||
msg_id = msg.body.bad_msg_id
|
||||
elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)):
|
||||
msg_id = msg.body.req_msg_id
|
||||
elif isinstance(msg.body, types.Pong):
|
||||
msg_id = msg.body.msg_id
|
||||
else:
|
||||
if self.client is not None:
|
||||
self.client.updates_queue.put(msg.body)
|
||||
|
||||
if msg_id in self.results:
|
||||
self.results[msg_id].value = getattr(msg.body, "result", msg.body)
|
||||
self.results[msg_id].event.set()
|
||||
|
||||
if len(self.pending_acks) >= self.ACKS_THRESHOLD:
|
||||
log.info("Send {} acks".format(len(self.pending_acks)))
|
||||
|
||||
try:
|
||||
self._send(types.MsgsAck(list(self.pending_acks)), False)
|
||||
except (OSError, TimeoutError):
|
||||
pass
|
||||
else:
|
||||
self.pending_acks.clear()
|
||||
except Exception as e:
|
||||
log.error(e, exc_info=True)
|
||||
|
||||
log.debug("{} stopped".format(name))
|
||||
|
||||
def unpack_dispatch_and_ack(self, packet: bytes):
|
||||
data = self.unpack(BytesIO(packet))
|
||||
|
||||
messages = (
|
||||
data.body.messages
|
||||
if isinstance(data.body, MsgContainer)
|
||||
else [data]
|
||||
)
|
||||
|
||||
log.debug(data)
|
||||
|
||||
for msg in messages:
|
||||
if msg.seq_no % 2 != 0:
|
||||
if msg.msg_id in self.pending_acks:
|
||||
continue
|
||||
else:
|
||||
self.pending_acks.add(msg.msg_id)
|
||||
|
||||
if isinstance(msg.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)):
|
||||
self.pending_acks.add(msg.body.answer_msg_id)
|
||||
continue
|
||||
|
||||
if isinstance(msg.body, types.NewSessionCreated):
|
||||
continue
|
||||
|
||||
msg_id = None
|
||||
|
||||
if isinstance(msg.body, (types.BadMsgNotification, types.BadServerSalt)):
|
||||
msg_id = msg.body.bad_msg_id
|
||||
elif isinstance(msg.body, (core.FutureSalts, types.RpcResult)):
|
||||
msg_id = msg.body.req_msg_id
|
||||
elif isinstance(msg.body, types.Pong):
|
||||
msg_id = msg.body.msg_id
|
||||
else:
|
||||
if self.client is not None:
|
||||
self.client.updates_queue.put(msg.body)
|
||||
|
||||
if msg_id in self.results:
|
||||
self.results[msg_id].value = getattr(msg.body, "result", msg.body)
|
||||
self.results[msg_id].event.set()
|
||||
|
||||
if len(self.pending_acks) >= self.ACKS_THRESHOLD:
|
||||
log.info("Send {} acks".format(len(self.pending_acks)))
|
||||
|
||||
try:
|
||||
self._send(types.MsgsAck(list(self.pending_acks)), False)
|
||||
except (OSError, TimeoutError):
|
||||
pass
|
||||
else:
|
||||
self.pending_acks.clear()
|
||||
|
||||
def ping(self):
|
||||
log.debug("PingThread started")
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user