Add session package

This commit is contained in:
Dan 2017-12-05 12:41:07 +01:00
parent afcd19a120
commit a8cad5abfb
8 changed files with 808 additions and 0 deletions

View File

@ -0,0 +1,20 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
from .auth import Auth
from .session import Session

246
pyrogram/session/auth.py Normal file
View File

@ -0,0 +1,246 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from hashlib import sha1
from io import BytesIO
from os import urandom
from pyrogram.api import functions, types
from pyrogram.api.core import Object, Long, Int
from pyrogram.connection import Connection
from pyrogram.crypto import IGE, RSA, Prime
from .internals import MsgId, DataCenter
log = logging.getLogger(__name__)
# TODO: When using TCP connection mode, the server may close it at any time, causing the Auth key creation to fail
# The above is true when dealing with temporary keys, although for perm keys it didn't happened, yet.
class Auth:
CURRENT_DH_PRIME = int(
"C71CAEB9C6B1C9048E6C522F70F13F73980D40238E3E21C14934D037563D930F"
"48198A0AA7C14058229493D22530F4DBFA336F6E0AC925139543AED44CCE7C37"
"20FD51F69458705AC68CD4FE6B6B13ABDC9746512969328454F18FAF8C595F64"
"2477FE96BB2A941D5BCD1D4AC8CC49880708FA9B378E3C4F3A9060BEE67CF9A4"
"A4A695811051907E162753B56B0F6B410DBA74D8A84B2A14B3144E0EF1284754"
"FD17ED950D5965B4B9DD46582DB1178D169C6BC465B0D6FF9CA3928FEF5B9AE4"
"E418FC15E83EBEA0F87FA9FF5EED70050DED2849F47BF959D956850CE929851F"
"0D8115F635B105EE2E4E15D04B2454BF6F4FADF034B10403119CD8E3B92FCC5B",
16
)
def __init__(self, dc_id: int, test_mode: bool):
self.dc_id = dc_id
self.test_mode = test_mode
self.connection = Connection(DataCenter(dc_id, test_mode))
self.msg_id = MsgId()
def pack(self, data: Object) -> bytes:
return (
bytes(8)
+ Long(self.msg_id())
+ Int(len(data.write()))
+ data.write()
)
@staticmethod
def unpack(b: BytesIO):
b.seek(20) # Skip auth_key_id (8), message_id (8) and message_length (4)
return Object.read(b)
def send(self, data: Object):
data = self.pack(data)
self.connection.send(data)
response = BytesIO(self.connection.recv())
return self.unpack(response)
def create(self):
"""
https://core.telegram.org/mtproto/auth_key
https://core.telegram.org/mtproto/samples-auth_key
"""
log.info("Start creating a new auth key on DC{}".format(self.dc_id))
self.connection.connect()
# Step 1; Step 2
nonce = int.from_bytes(urandom(16), "little", signed=True)
log.debug("Send req_pq: {}".format(nonce))
res_pq = self.send(functions.ReqPq(nonce))
log.debug("Got ResPq: {}".format(res_pq.server_nonce))
# Step 3
pq = int.from_bytes(res_pq.pq, "big")
log.debug("Start PQ factorization: {}".format(pq))
start = time.time()
g = Prime.decompose(pq)
p, q = sorted((g, pq // g)) # p < q
log.debug("Done PQ factorization ({}s): {} {}".format(round(time.time() - start, 3), p, q))
# Step 4
server_nonce = res_pq.server_nonce
new_nonce = int.from_bytes(urandom(32), "little", signed=True)
data = types.PQInnerData(
res_pq.pq,
int.to_bytes(p, 4, "big"),
int.to_bytes(q, 4, "big"),
nonce,
server_nonce,
new_nonce,
).write()
sha = sha1(data).digest()
padding = urandom(- (len(data) + len(sha)) % 255)
data_with_hash = sha + data + padding
encrypted_data = RSA.encrypt(data_with_hash, res_pq.server_public_key_fingerprints[0])
log.debug("Done encrypt data with RSA")
# Step 5. TODO: Handle "server_DH_params_fail". Code assumes response is ok
log.debug("Send req_DH_params")
server_dh_params = self.send(
functions.ReqDhParams(
nonce,
server_nonce,
int.to_bytes(p, 4, "big"),
int.to_bytes(q, 4, "big"),
res_pq.server_public_key_fingerprints[0],
encrypted_data
)
)
encrypted_answer = server_dh_params.encrypted_answer
server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True)
new_nonce = int.to_bytes(new_nonce, 32, "little", signed=True)
tmp_aes_key = (
sha1(new_nonce + server_nonce).digest()
+ sha1(server_nonce + new_nonce).digest()[:12]
)
tmp_aes_iv = (
sha1(server_nonce + new_nonce).digest()[12:]
+ sha1(new_nonce + new_nonce).digest() + new_nonce[:4]
)
server_nonce = int.from_bytes(server_nonce, "little", signed=True)
answer_with_hash = IGE.decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv)
answer = answer_with_hash[20:]
server_dh_inner_data = Object.read(BytesIO(answer))
log.debug("Done decrypting answer")
dh_prime = int.from_bytes(server_dh_inner_data.dh_prime, "big")
delta_time = server_dh_inner_data.server_time - time.time()
log.debug("Delta time: {}".format(round(delta_time, 3)))
# Step 6
g = server_dh_inner_data.g
b = int.from_bytes(urandom(256), "big")
g_b = int.to_bytes(pow(g, b, dh_prime), 256, "big")
retry_id = 0
data = types.ClientDhInnerData(
nonce,
server_nonce,
retry_id,
g_b
).write()
sha = sha1(data).digest()
padding = urandom(- (len(data) + len(sha)) % 16)
data_with_hash = sha + data + padding
encrypted_data = IGE.encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv)
log.debug("Send set_client_DH_params")
set_client_dh_params_answer = self.send(
functions.SetClientDhParams(
nonce,
server_nonce,
encrypted_data
)
)
# TODO: Handle "auth_key_aux_hash" if the previous step fails
# Step 7; Step 8
g_a = int.from_bytes(server_dh_inner_data.g_a, "big")
auth_key = int.to_bytes(pow(g_a, b, dh_prime), 256, "big")
server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True)
# TODO: Handle errors
#######################
# Security checks
#######################
assert dh_prime == self.CURRENT_DH_PRIME
log.debug("DH parameters check: OK")
# https://core.telegram.org/mtproto/security_guidelines#g-a-and-g-b-validation
g_b = int.from_bytes(g_b, "big")
assert 1 < g < dh_prime - 1
assert 1 < g_a < dh_prime - 1
assert 1 < g_b < dh_prime - 1
assert 2 ** (2048 - 64) < g_a < dh_prime - 2 ** (2048 - 64)
assert 2 ** (2048 - 64) < g_b < dh_prime - 2 ** (2048 - 64)
log.debug("g_a and g_b validation: OK")
# https://core.telegram.org/mtproto/security_guidelines#checking-sha1-hash-values
answer = server_dh_inner_data.write() # Call .write() to remove padding
assert answer_with_hash[:20] == sha1(answer).digest()
log.debug("SHA1 hash values check: OK")
# https://core.telegram.org/mtproto/security_guidelines#checking-nonce-server-nonce-and-new-nonce-fields
# 1st message
assert nonce == res_pq.nonce
# 2nd message
server_nonce = int.from_bytes(server_nonce, "little", signed=True)
assert nonce == server_dh_params.nonce
assert server_nonce == server_dh_params.server_nonce
# 3rd message
assert nonce == set_client_dh_params_answer.nonce
assert server_nonce == set_client_dh_params_answer.server_nonce
server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True)
log.debug("Nonce fields check: OK")
# Step 9
server_salt = IGE.xor(new_nonce[:8], server_nonce[:8])
log.debug("Server salt: {}".format(int.from_bytes(server_salt, "little")))
log.info(
"Done auth key exchange: {}".format(
set_client_dh_params_answer.__class__.__name__
)
)
self.connection.close()
return auth_key

View File

@ -0,0 +1,21 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
from .data_center import DataCenter
from .msg_factory import MsgFactory
from .msg_id import MsgId

View File

@ -0,0 +1,36 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
class DataCenter:
TEST = {
1: "149.154.175.10",
2: "149.154.167.40",
3: "149.154.175.117",
}
PROD = {
1: "149.154.175.50",
2: "149.154.167.51",
3: "149.154.175.100",
4: "149.154.167.91",
5: "91.108.56.149"
}
def __new__(cls, dc_id: int, test_mode: bool):
return cls.TEST[dc_id] if test_mode else cls.PROD[dc_id]

View File

@ -0,0 +1,39 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
from pyrogram.api.core import Message, MsgContainer, Object
from pyrogram.api.functions import Ping, HttpWait
from pyrogram.api.types import MsgsAck
from .msg_id import MsgId
from .seq_no import SeqNo
not_content_related = [Ping, HttpWait, MsgsAck, MsgContainer]
class MsgFactory:
def __init__(self, msg_id: MsgId):
self.msg_id = msg_id
self.seq_no = SeqNo()
def __call__(self, body: Object) -> Message:
return Message(
body,
self.msg_id(),
self.seq_no(type(body) not in not_content_related),
len(body)
)

View File

@ -0,0 +1,34 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
from time import time
class MsgId:
def __init__(self, delta_time: float = 0.0):
self.delta_time = delta_time
self.last_time = 0
self.offset = 0
def __call__(self) -> int:
now = time()
self.offset = self.offset + 4 if now == self.last_time else 0
msg_id = int((now + self.delta_time) * 2 ** 32) + self.offset
self.last_time = now
return msg_id

View File

@ -0,0 +1,30 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
class SeqNo:
def __init__(self):
self.content_related_messages_sent = 0
def __call__(self, is_content_related: bool) -> int:
seq_no = (self.content_related_messages_sent * 2) + (1 if is_content_related else 0)
if is_content_related:
self.content_related_messages_sent += 1
return seq_no

382
pyrogram/session/session.py Normal file
View File

@ -0,0 +1,382 @@
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017 Dan Tès <https://github.com/delivrance>
#
# This file is part of Pyrogram.
#
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import logging
import platform
import threading
from datetime import timedelta, datetime
from hashlib import sha1
from io import BytesIO
from os import urandom
from queue import Queue
from threading import Event, Thread
from pyrogram import __copyright__, __license__, __version__
from pyrogram.api import functions, types, core
from pyrogram.api.all import layer
from pyrogram.api.core import Message, Object, MsgContainer, Long, FutureSalt
from pyrogram.api.errors import Error
from pyrogram.connection import Connection
from pyrogram.crypto import IGE, KDF
from .internals import MsgId, MsgFactory, DataCenter
log = logging.getLogger(__name__)
class Result:
def __init__(self):
self.value = None
self.event = Event()
class Session:
VERSION = __version__
APP_VERSION = "Pyrogram \U0001f525 {}".format(VERSION)
DEVICE_MODEL = "{} {}".format(
platform.python_implementation(),
platform.python_version()
)
SYSTEM_VERSION = "{} {}".format(
platform.system(),
platform.release()
)
INITIAL_SALT = 0x616e67656c696361
WORKERS = 4
WAIT_TIMEOUT = 5
MAX_RETRIES = 5
ACKS_THRESHOLD = 8
PING_INTERVAL = 5
def __init__(self, dc_id: int, test_mode: bool, auth_key: bytes, api_id: str):
print("Pyrogram v{}, {}".format(__version__, __copyright__))
print("Licensed under the terms of the " + __license__)
self.connection = Connection(DataCenter(dc_id, test_mode))
self.api_id = api_id
self.auth_key = auth_key
self.auth_key_id = sha1(auth_key).digest()[-8:]
self.msg_id = MsgId()
self.session_id = Long(self.msg_id())
self.msg_factory = MsgFactory(self.msg_id)
self.current_salt = None
self.pending_acks = set()
self.recv_queue = Queue()
self.results = {}
self.ping_thread = None
self.ping_thread_event = Event()
self.next_salt_thread = None
self.next_salt_thread_event = Event()
self.is_connected = Event()
self.total_connections = 0
self.total_messages = 0
self.total_bytes = 0
def start(self):
while True:
try:
self.connection.connect()
for i in range(self.WORKERS):
Thread(target=self.worker, name="Worker#{}".format(i + 1)).start()
Thread(target=self.recv, name="RecvThread").start()
self.current_salt = FutureSalt(0, 0, self.INITIAL_SALT)
self.current_salt = FutureSalt(0, 0, self._send(functions.Ping(0)).new_server_salt)
self.current_salt = self._send(functions.GetFutureSalts(1)).salts[0]
if self.next_salt_thread is not None:
self.next_salt_thread.join()
self.next_salt_thread_event.clear()
self.next_salt_thread = Thread(target=self.next_salt, name="NextSaltThread")
self.next_salt_thread.start()
terms = self._send(
functions.InvokeWithLayer(
layer,
functions.InitConnection(
self.api_id,
self.DEVICE_MODEL,
self.SYSTEM_VERSION,
self.APP_VERSION,
"en", "", "en",
functions.help.GetTermsOfService(),
)
)
)
if self.ping_thread is not None:
self.ping_thread.join()
self.ping_thread_event.clear()
self.ping_thread = Thread(target=self.ping, name="PingThread")
self.ping_thread.start()
log.info("Connection inited: Layer {}".format(layer))
except (OSError, TimeoutError):
self.stop()
else:
break
self.is_connected.set()
self.total_connections += 1
log.debug("Session started")
return terms.text
def stop(self):
self.is_connected.clear()
self.ping_thread_event.set()
self.next_salt_thread_event.set()
self.connection.close()
for i in range(self.WORKERS):
self.recv_queue.put(None)
log.debug("Session stopped")
def restart(self):
self.stop()
self.start()
def pack(self, message: Message) -> bytes:
data = Long(self.current_salt.salt) + self.session_id + message.write()
msg_key = sha1(data).digest()[-16:]
aes_key, aes_iv = KDF(self.auth_key, msg_key, True)
padding = urandom(-len(data) % 16)
return self.auth_key_id + msg_key + IGE.encrypt(data + padding, aes_key, aes_iv)
def unpack(self, b: BytesIO) -> Message:
assert b.read(8) == self.auth_key_id, b.getvalue()
msg_key = b.read(16)
aes_key, aes_iv = KDF(self.auth_key, msg_key, False)
data = BytesIO(IGE.decrypt(b.read(), aes_key, aes_iv))
data.read(8) # Server salt
# https://core.telegram.org/mtproto/security_guidelines#checking-session-id
assert data.read(8) == self.session_id
message = Message.read(data)
# https://core.telegram.org/mtproto/security_guidelines#checking-sha1-hash-value-of-msg-key
# https://core.telegram.org/mtproto/security_guidelines#checking-message-length
# 32 = salt (8) + session_id (8) + msg_id (8) + seq_no (4) + length (4)
assert msg_key == sha1(data.getvalue()[:32 + message.length]).digest()[-16:]
# https://core.telegram.org/mtproto/security_guidelines#checking-msg-id
# TODO: check for lower msg_ids
assert message.msg_id % 2 != 0
return message
def worker(self):
name = threading.current_thread().name
log.debug("{} started".format(name))
while True:
packet = self.recv_queue.get()
if packet is None:
break
try:
self.unpack_dispatch_and_ack(packet)
except Exception as e:
log.error(e, exc_info=True)
log.debug("{} stopped".format(name))
def unpack_dispatch_and_ack(self, packet: bytes):
# TODO: A better dispatcher
data = self.unpack(BytesIO(packet))
messages = (
data.body.messages
if isinstance(data.body, MsgContainer)
else [data]
)
log.debug(data)
self.total_bytes += len(packet)
self.total_messages += len(messages)
for i in messages:
if i.seq_no % 2 != 0:
self.pending_acks.add(i.msg_id)
# log.debug("{}".format(type(i.body)))
if isinstance(i.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)):
self.pending_acks.add(i.body.answer_msg_id)
continue
msg_id = None
if isinstance(i.body, (types.BadMsgNotification, types.BadServerSalt)):
msg_id = i.body.bad_msg_id
elif isinstance(i.body, types.RpcResult):
msg_id = i.body.req_msg_id
elif isinstance(i.body, types.Pong):
msg_id = i.body.msg_id
elif isinstance(i.body, core.FutureSalts):
msg_id = i.body.req_msg_id
if msg_id in self.results:
self.results[msg_id].value = getattr(i.body, "result", i.body)
self.results[msg_id].event.set()
# print(
# "This packet bytes: ({}) | Total bytes: ({})\n"
# "This packet messages: ({}) | Total messages: ({})\n"
# "Total connections: ({})".format(
# len(packet), self.total_bytes, len(messages), self.total_messages, self.total_connections
# )
# )
if len(self.pending_acks) >= self.ACKS_THRESHOLD:
log.warning("Send {} acks".format(len(self.pending_acks)))
try:
self._send(types.MsgsAck(list(self.pending_acks)), False)
except (OSError, TimeoutError):
pass
else:
self.pending_acks.clear()
def ping(self):
log.debug("PingThread started")
while True:
self.ping_thread_event.wait(self.PING_INTERVAL)
if self.ping_thread_event.is_set():
break
try:
self._send(functions.Ping(0), False)
except (OSError, TimeoutError):
pass
log.debug("PingThread stopped")
def next_salt(self):
log.debug("NextSaltThread started")
while True:
now = datetime.now()
# Seconds to wait until middle-overlap, which is
# 15 minutes before/after the current/next salt end/start time
dt = (self.current_salt.valid_until - now).total_seconds() - 900
log.debug("Current salt: {} | Next salt in {:.0f}m {:.0f}s ({})".format(
self.current_salt.salt,
dt // 60,
dt % 60,
now + timedelta(seconds=dt)
))
self.next_salt_thread_event.wait(dt)
if self.next_salt_thread_event.is_set():
break
try:
self.current_salt = self._send(functions.GetFutureSalts(1)).salts[0]
except (OSError, TimeoutError):
self.connection.close()
break
log.debug("NextSaltThread stopped")
def recv(self):
log.debug("RecvThread started")
while True:
packet = self.connection.recv()
if packet is None:
if self.is_connected.is_set():
Thread(target=self.restart, name="RestartThread").start()
break
self.recv_queue.put(packet)
log.debug("RecvThread stopped")
def _send(self, data: Object, wait_response: bool = True):
message = self.msg_factory(data)
msg_id = message.msg_id
if wait_response:
self.results[msg_id] = Result()
payload = self.pack(message)
try:
self.connection.send(payload)
except OSError as e:
self.results.pop(msg_id, None)
raise e
if wait_response:
self.results[msg_id].event.wait(self.WAIT_TIMEOUT)
result = self.results.pop(msg_id).value
if result is None:
raise TimeoutError
elif isinstance(result, types.RpcError):
Error.raise_it(result, type(data))
else:
return result
def send(self, data: Object):
for i in range(self.MAX_RETRIES):
self.is_connected.wait()
try:
return self._send(data)
except (OSError, TimeoutError):
log.debug("Retrying {}".format(type(data)))
continue
else:
return None