From a8cad5abfb1085f8fe36452d2e8959cf2a1d661a Mon Sep 17 00:00:00 2001 From: Dan <14043624+delivrance@users.noreply.github.com> Date: Tue, 5 Dec 2017 12:41:07 +0100 Subject: [PATCH] Add session package --- pyrogram/session/__init__.py | 20 ++ pyrogram/session/auth.py | 246 ++++++++++++++ pyrogram/session/internals/__init__.py | 21 ++ pyrogram/session/internals/data_center.py | 36 ++ pyrogram/session/internals/msg_factory.py | 39 +++ pyrogram/session/internals/msg_id.py | 34 ++ pyrogram/session/internals/seq_no.py | 30 ++ pyrogram/session/session.py | 382 ++++++++++++++++++++++ 8 files changed, 808 insertions(+) create mode 100644 pyrogram/session/__init__.py create mode 100644 pyrogram/session/auth.py create mode 100644 pyrogram/session/internals/__init__.py create mode 100644 pyrogram/session/internals/data_center.py create mode 100644 pyrogram/session/internals/msg_factory.py create mode 100644 pyrogram/session/internals/msg_id.py create mode 100644 pyrogram/session/internals/seq_no.py create mode 100644 pyrogram/session/session.py diff --git a/pyrogram/session/__init__.py b/pyrogram/session/__init__.py new file mode 100644 index 00000000..d0395d19 --- /dev/null +++ b/pyrogram/session/__init__.py @@ -0,0 +1,20 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +from .auth import Auth +from .session import Session diff --git a/pyrogram/session/auth.py b/pyrogram/session/auth.py new file mode 100644 index 00000000..bf3cd05c --- /dev/null +++ b/pyrogram/session/auth.py @@ -0,0 +1,246 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +import logging +import time +from hashlib import sha1 +from io import BytesIO +from os import urandom + +from pyrogram.api import functions, types +from pyrogram.api.core import Object, Long, Int +from pyrogram.connection import Connection +from pyrogram.crypto import IGE, RSA, Prime +from .internals import MsgId, DataCenter + +log = logging.getLogger(__name__) + + +# TODO: When using TCP connection mode, the server may close it at any time, causing the Auth key creation to fail +# The above is true when dealing with temporary keys, although for perm keys it didn't happened, yet. + +class Auth: + CURRENT_DH_PRIME = int( + "C71CAEB9C6B1C9048E6C522F70F13F73980D40238E3E21C14934D037563D930F" + "48198A0AA7C14058229493D22530F4DBFA336F6E0AC925139543AED44CCE7C37" + "20FD51F69458705AC68CD4FE6B6B13ABDC9746512969328454F18FAF8C595F64" + "2477FE96BB2A941D5BCD1D4AC8CC49880708FA9B378E3C4F3A9060BEE67CF9A4" + "A4A695811051907E162753B56B0F6B410DBA74D8A84B2A14B3144E0EF1284754" + "FD17ED950D5965B4B9DD46582DB1178D169C6BC465B0D6FF9CA3928FEF5B9AE4" + "E418FC15E83EBEA0F87FA9FF5EED70050DED2849F47BF959D956850CE929851F" + "0D8115F635B105EE2E4E15D04B2454BF6F4FADF034B10403119CD8E3B92FCC5B", + 16 + ) + + def __init__(self, dc_id: int, test_mode: bool): + self.dc_id = dc_id + self.test_mode = test_mode + + self.connection = Connection(DataCenter(dc_id, test_mode)) + self.msg_id = MsgId() + + def pack(self, data: Object) -> bytes: + return ( + bytes(8) + + Long(self.msg_id()) + + Int(len(data.write())) + + data.write() + ) + + @staticmethod + def unpack(b: BytesIO): + b.seek(20) # Skip auth_key_id (8), message_id (8) and message_length (4) + return Object.read(b) + + def send(self, data: Object): + data = self.pack(data) + self.connection.send(data) + response = BytesIO(self.connection.recv()) + + return self.unpack(response) + + def create(self): + """ + https://core.telegram.org/mtproto/auth_key + https://core.telegram.org/mtproto/samples-auth_key + """ + log.info("Start creating a new auth key on DC{}".format(self.dc_id)) + + self.connection.connect() + + # Step 1; Step 2 + nonce = int.from_bytes(urandom(16), "little", signed=True) + log.debug("Send req_pq: {}".format(nonce)) + res_pq = self.send(functions.ReqPq(nonce)) + log.debug("Got ResPq: {}".format(res_pq.server_nonce)) + + # Step 3 + pq = int.from_bytes(res_pq.pq, "big") + log.debug("Start PQ factorization: {}".format(pq)) + start = time.time() + g = Prime.decompose(pq) + p, q = sorted((g, pq // g)) # p < q + log.debug("Done PQ factorization ({}s): {} {}".format(round(time.time() - start, 3), p, q)) + + # Step 4 + server_nonce = res_pq.server_nonce + new_nonce = int.from_bytes(urandom(32), "little", signed=True) + + data = types.PQInnerData( + res_pq.pq, + int.to_bytes(p, 4, "big"), + int.to_bytes(q, 4, "big"), + nonce, + server_nonce, + new_nonce, + ).write() + + sha = sha1(data).digest() + padding = urandom(- (len(data) + len(sha)) % 255) + data_with_hash = sha + data + padding + encrypted_data = RSA.encrypt(data_with_hash, res_pq.server_public_key_fingerprints[0]) + + log.debug("Done encrypt data with RSA") + + # Step 5. TODO: Handle "server_DH_params_fail". Code assumes response is ok + log.debug("Send req_DH_params") + server_dh_params = self.send( + functions.ReqDhParams( + nonce, + server_nonce, + int.to_bytes(p, 4, "big"), + int.to_bytes(q, 4, "big"), + res_pq.server_public_key_fingerprints[0], + encrypted_data + ) + ) + + encrypted_answer = server_dh_params.encrypted_answer + + server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True) + new_nonce = int.to_bytes(new_nonce, 32, "little", signed=True) + + tmp_aes_key = ( + sha1(new_nonce + server_nonce).digest() + + sha1(server_nonce + new_nonce).digest()[:12] + ) + + tmp_aes_iv = ( + sha1(server_nonce + new_nonce).digest()[12:] + + sha1(new_nonce + new_nonce).digest() + new_nonce[:4] + ) + + server_nonce = int.from_bytes(server_nonce, "little", signed=True) + + answer_with_hash = IGE.decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv) + answer = answer_with_hash[20:] + + server_dh_inner_data = Object.read(BytesIO(answer)) + + log.debug("Done decrypting answer") + + dh_prime = int.from_bytes(server_dh_inner_data.dh_prime, "big") + delta_time = server_dh_inner_data.server_time - time.time() + + log.debug("Delta time: {}".format(round(delta_time, 3))) + + # Step 6 + g = server_dh_inner_data.g + b = int.from_bytes(urandom(256), "big") + g_b = int.to_bytes(pow(g, b, dh_prime), 256, "big") + + retry_id = 0 + + data = types.ClientDhInnerData( + nonce, + server_nonce, + retry_id, + g_b + ).write() + + sha = sha1(data).digest() + padding = urandom(- (len(data) + len(sha)) % 16) + data_with_hash = sha + data + padding + encrypted_data = IGE.encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv) + + log.debug("Send set_client_DH_params") + set_client_dh_params_answer = self.send( + functions.SetClientDhParams( + nonce, + server_nonce, + encrypted_data + ) + ) + + # TODO: Handle "auth_key_aux_hash" if the previous step fails + + # Step 7; Step 8 + g_a = int.from_bytes(server_dh_inner_data.g_a, "big") + auth_key = int.to_bytes(pow(g_a, b, dh_prime), 256, "big") + server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True) + + # TODO: Handle errors + + ####################### + # Security checks + ####################### + + assert dh_prime == self.CURRENT_DH_PRIME + log.debug("DH parameters check: OK") + + # https://core.telegram.org/mtproto/security_guidelines#g-a-and-g-b-validation + g_b = int.from_bytes(g_b, "big") + assert 1 < g < dh_prime - 1 + assert 1 < g_a < dh_prime - 1 + assert 1 < g_b < dh_prime - 1 + assert 2 ** (2048 - 64) < g_a < dh_prime - 2 ** (2048 - 64) + assert 2 ** (2048 - 64) < g_b < dh_prime - 2 ** (2048 - 64) + log.debug("g_a and g_b validation: OK") + + # https://core.telegram.org/mtproto/security_guidelines#checking-sha1-hash-values + answer = server_dh_inner_data.write() # Call .write() to remove padding + assert answer_with_hash[:20] == sha1(answer).digest() + log.debug("SHA1 hash values check: OK") + + # https://core.telegram.org/mtproto/security_guidelines#checking-nonce-server-nonce-and-new-nonce-fields + # 1st message + assert nonce == res_pq.nonce + # 2nd message + server_nonce = int.from_bytes(server_nonce, "little", signed=True) + assert nonce == server_dh_params.nonce + assert server_nonce == server_dh_params.server_nonce + # 3rd message + assert nonce == set_client_dh_params_answer.nonce + assert server_nonce == set_client_dh_params_answer.server_nonce + server_nonce = int.to_bytes(server_nonce, 16, "little", signed=True) + log.debug("Nonce fields check: OK") + + # Step 9 + server_salt = IGE.xor(new_nonce[:8], server_nonce[:8]) + + log.debug("Server salt: {}".format(int.from_bytes(server_salt, "little"))) + + log.info( + "Done auth key exchange: {}".format( + set_client_dh_params_answer.__class__.__name__ + ) + ) + + self.connection.close() + + return auth_key diff --git a/pyrogram/session/internals/__init__.py b/pyrogram/session/internals/__init__.py new file mode 100644 index 00000000..7d5ece45 --- /dev/null +++ b/pyrogram/session/internals/__init__.py @@ -0,0 +1,21 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +from .data_center import DataCenter +from .msg_factory import MsgFactory +from .msg_id import MsgId diff --git a/pyrogram/session/internals/data_center.py b/pyrogram/session/internals/data_center.py new file mode 100644 index 00000000..5eeb4124 --- /dev/null +++ b/pyrogram/session/internals/data_center.py @@ -0,0 +1,36 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + + +class DataCenter: + TEST = { + 1: "149.154.175.10", + 2: "149.154.167.40", + 3: "149.154.175.117", + } + + PROD = { + 1: "149.154.175.50", + 2: "149.154.167.51", + 3: "149.154.175.100", + 4: "149.154.167.91", + 5: "91.108.56.149" + } + + def __new__(cls, dc_id: int, test_mode: bool): + return cls.TEST[dc_id] if test_mode else cls.PROD[dc_id] diff --git a/pyrogram/session/internals/msg_factory.py b/pyrogram/session/internals/msg_factory.py new file mode 100644 index 00000000..6874d5f5 --- /dev/null +++ b/pyrogram/session/internals/msg_factory.py @@ -0,0 +1,39 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +from pyrogram.api.core import Message, MsgContainer, Object +from pyrogram.api.functions import Ping, HttpWait +from pyrogram.api.types import MsgsAck +from .msg_id import MsgId +from .seq_no import SeqNo + +not_content_related = [Ping, HttpWait, MsgsAck, MsgContainer] + + +class MsgFactory: + def __init__(self, msg_id: MsgId): + self.msg_id = msg_id + self.seq_no = SeqNo() + + def __call__(self, body: Object) -> Message: + return Message( + body, + self.msg_id(), + self.seq_no(type(body) not in not_content_related), + len(body) + ) diff --git a/pyrogram/session/internals/msg_id.py b/pyrogram/session/internals/msg_id.py new file mode 100644 index 00000000..3288001b --- /dev/null +++ b/pyrogram/session/internals/msg_id.py @@ -0,0 +1,34 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +from time import time + + +class MsgId: + def __init__(self, delta_time: float = 0.0): + self.delta_time = delta_time + self.last_time = 0 + self.offset = 0 + + def __call__(self) -> int: + now = time() + self.offset = self.offset + 4 if now == self.last_time else 0 + msg_id = int((now + self.delta_time) * 2 ** 32) + self.offset + self.last_time = now + + return msg_id diff --git a/pyrogram/session/internals/seq_no.py b/pyrogram/session/internals/seq_no.py new file mode 100644 index 00000000..97ad6c61 --- /dev/null +++ b/pyrogram/session/internals/seq_no.py @@ -0,0 +1,30 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + + +class SeqNo: + def __init__(self): + self.content_related_messages_sent = 0 + + def __call__(self, is_content_related: bool) -> int: + seq_no = (self.content_related_messages_sent * 2) + (1 if is_content_related else 0) + + if is_content_related: + self.content_related_messages_sent += 1 + + return seq_no diff --git a/pyrogram/session/session.py b/pyrogram/session/session.py new file mode 100644 index 00000000..1a46ce56 --- /dev/null +++ b/pyrogram/session/session.py @@ -0,0 +1,382 @@ +# Pyrogram - Telegram MTProto API Client Library for Python +# Copyright (C) 2017 Dan Tès +# +# This file is part of Pyrogram. +# +# Pyrogram is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pyrogram is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pyrogram. If not, see . + +import logging +import platform +import threading +from datetime import timedelta, datetime +from hashlib import sha1 +from io import BytesIO +from os import urandom +from queue import Queue +from threading import Event, Thread + +from pyrogram import __copyright__, __license__, __version__ +from pyrogram.api import functions, types, core +from pyrogram.api.all import layer +from pyrogram.api.core import Message, Object, MsgContainer, Long, FutureSalt +from pyrogram.api.errors import Error +from pyrogram.connection import Connection +from pyrogram.crypto import IGE, KDF +from .internals import MsgId, MsgFactory, DataCenter + +log = logging.getLogger(__name__) + + +class Result: + def __init__(self): + self.value = None + self.event = Event() + + +class Session: + VERSION = __version__ + APP_VERSION = "Pyrogram \U0001f525 {}".format(VERSION) + + DEVICE_MODEL = "{} {}".format( + platform.python_implementation(), + platform.python_version() + ) + + SYSTEM_VERSION = "{} {}".format( + platform.system(), + platform.release() + ) + + INITIAL_SALT = 0x616e67656c696361 + + WORKERS = 4 + WAIT_TIMEOUT = 5 + MAX_RETRIES = 5 + ACKS_THRESHOLD = 8 + PING_INTERVAL = 5 + + def __init__(self, dc_id: int, test_mode: bool, auth_key: bytes, api_id: str): + print("Pyrogram v{}, {}".format(__version__, __copyright__)) + print("Licensed under the terms of the " + __license__) + + self.connection = Connection(DataCenter(dc_id, test_mode)) + + self.api_id = api_id + + self.auth_key = auth_key + self.auth_key_id = sha1(auth_key).digest()[-8:] + + self.msg_id = MsgId() + self.session_id = Long(self.msg_id()) + self.msg_factory = MsgFactory(self.msg_id) + + self.current_salt = None + + self.pending_acks = set() + + self.recv_queue = Queue() + self.results = {} + + self.ping_thread = None + self.ping_thread_event = Event() + + self.next_salt_thread = None + self.next_salt_thread_event = Event() + + self.is_connected = Event() + + self.total_connections = 0 + self.total_messages = 0 + self.total_bytes = 0 + + def start(self): + while True: + try: + self.connection.connect() + + for i in range(self.WORKERS): + Thread(target=self.worker, name="Worker#{}".format(i + 1)).start() + + Thread(target=self.recv, name="RecvThread").start() + + self.current_salt = FutureSalt(0, 0, self.INITIAL_SALT) + self.current_salt = FutureSalt(0, 0, self._send(functions.Ping(0)).new_server_salt) + self.current_salt = self._send(functions.GetFutureSalts(1)).salts[0] + + if self.next_salt_thread is not None: + self.next_salt_thread.join() + + self.next_salt_thread_event.clear() + + self.next_salt_thread = Thread(target=self.next_salt, name="NextSaltThread") + self.next_salt_thread.start() + + terms = self._send( + functions.InvokeWithLayer( + layer, + functions.InitConnection( + self.api_id, + self.DEVICE_MODEL, + self.SYSTEM_VERSION, + self.APP_VERSION, + "en", "", "en", + functions.help.GetTermsOfService(), + ) + ) + ) + + if self.ping_thread is not None: + self.ping_thread.join() + + self.ping_thread_event.clear() + + self.ping_thread = Thread(target=self.ping, name="PingThread") + self.ping_thread.start() + + log.info("Connection inited: Layer {}".format(layer)) + except (OSError, TimeoutError): + self.stop() + else: + break + + self.is_connected.set() + self.total_connections += 1 + + log.debug("Session started") + + return terms.text + + def stop(self): + self.is_connected.clear() + self.ping_thread_event.set() + self.next_salt_thread_event.set() + self.connection.close() + + for i in range(self.WORKERS): + self.recv_queue.put(None) + + log.debug("Session stopped") + + def restart(self): + self.stop() + self.start() + + def pack(self, message: Message) -> bytes: + data = Long(self.current_salt.salt) + self.session_id + message.write() + msg_key = sha1(data).digest()[-16:] + aes_key, aes_iv = KDF(self.auth_key, msg_key, True) + padding = urandom(-len(data) % 16) + + return self.auth_key_id + msg_key + IGE.encrypt(data + padding, aes_key, aes_iv) + + def unpack(self, b: BytesIO) -> Message: + assert b.read(8) == self.auth_key_id, b.getvalue() + + msg_key = b.read(16) + aes_key, aes_iv = KDF(self.auth_key, msg_key, False) + data = BytesIO(IGE.decrypt(b.read(), aes_key, aes_iv)) + data.read(8) # Server salt + + # https://core.telegram.org/mtproto/security_guidelines#checking-session-id + assert data.read(8) == self.session_id + + message = Message.read(data) + + # https://core.telegram.org/mtproto/security_guidelines#checking-sha1-hash-value-of-msg-key + # https://core.telegram.org/mtproto/security_guidelines#checking-message-length + # 32 = salt (8) + session_id (8) + msg_id (8) + seq_no (4) + length (4) + assert msg_key == sha1(data.getvalue()[:32 + message.length]).digest()[-16:] + + # https://core.telegram.org/mtproto/security_guidelines#checking-msg-id + # TODO: check for lower msg_ids + assert message.msg_id % 2 != 0 + + return message + + def worker(self): + name = threading.current_thread().name + log.debug("{} started".format(name)) + + while True: + packet = self.recv_queue.get() + + if packet is None: + break + + try: + self.unpack_dispatch_and_ack(packet) + except Exception as e: + log.error(e, exc_info=True) + + log.debug("{} stopped".format(name)) + + def unpack_dispatch_and_ack(self, packet: bytes): + # TODO: A better dispatcher + data = self.unpack(BytesIO(packet)) + + messages = ( + data.body.messages + if isinstance(data.body, MsgContainer) + else [data] + ) + + log.debug(data) + + self.total_bytes += len(packet) + self.total_messages += len(messages) + + for i in messages: + if i.seq_no % 2 != 0: + self.pending_acks.add(i.msg_id) + + # log.debug("{}".format(type(i.body))) + + if isinstance(i.body, (types.MsgDetailedInfo, types.MsgNewDetailedInfo)): + self.pending_acks.add(i.body.answer_msg_id) + continue + + msg_id = None + + if isinstance(i.body, (types.BadMsgNotification, types.BadServerSalt)): + msg_id = i.body.bad_msg_id + + elif isinstance(i.body, types.RpcResult): + msg_id = i.body.req_msg_id + + elif isinstance(i.body, types.Pong): + msg_id = i.body.msg_id + + elif isinstance(i.body, core.FutureSalts): + msg_id = i.body.req_msg_id + + if msg_id in self.results: + self.results[msg_id].value = getattr(i.body, "result", i.body) + self.results[msg_id].event.set() + + # print( + # "This packet bytes: ({}) | Total bytes: ({})\n" + # "This packet messages: ({}) | Total messages: ({})\n" + # "Total connections: ({})".format( + # len(packet), self.total_bytes, len(messages), self.total_messages, self.total_connections + # ) + # ) + + if len(self.pending_acks) >= self.ACKS_THRESHOLD: + log.warning("Send {} acks".format(len(self.pending_acks))) + + try: + self._send(types.MsgsAck(list(self.pending_acks)), False) + except (OSError, TimeoutError): + pass + else: + self.pending_acks.clear() + + def ping(self): + log.debug("PingThread started") + + while True: + self.ping_thread_event.wait(self.PING_INTERVAL) + + if self.ping_thread_event.is_set(): + break + + try: + self._send(functions.Ping(0), False) + except (OSError, TimeoutError): + pass + + log.debug("PingThread stopped") + + def next_salt(self): + log.debug("NextSaltThread started") + + while True: + now = datetime.now() + + # Seconds to wait until middle-overlap, which is + # 15 minutes before/after the current/next salt end/start time + dt = (self.current_salt.valid_until - now).total_seconds() - 900 + + log.debug("Current salt: {} | Next salt in {:.0f}m {:.0f}s ({})".format( + self.current_salt.salt, + dt // 60, + dt % 60, + now + timedelta(seconds=dt) + )) + + self.next_salt_thread_event.wait(dt) + + if self.next_salt_thread_event.is_set(): + break + + try: + self.current_salt = self._send(functions.GetFutureSalts(1)).salts[0] + except (OSError, TimeoutError): + self.connection.close() + break + + log.debug("NextSaltThread stopped") + + def recv(self): + log.debug("RecvThread started") + + while True: + packet = self.connection.recv() + + if packet is None: + if self.is_connected.is_set(): + Thread(target=self.restart, name="RestartThread").start() + break + + self.recv_queue.put(packet) + + log.debug("RecvThread stopped") + + def _send(self, data: Object, wait_response: bool = True): + message = self.msg_factory(data) + msg_id = message.msg_id + + if wait_response: + self.results[msg_id] = Result() + + payload = self.pack(message) + + try: + self.connection.send(payload) + except OSError as e: + self.results.pop(msg_id, None) + raise e + + if wait_response: + self.results[msg_id].event.wait(self.WAIT_TIMEOUT) + result = self.results.pop(msg_id).value + + if result is None: + raise TimeoutError + elif isinstance(result, types.RpcError): + Error.raise_it(result, type(data)) + else: + return result + + def send(self, data: Object): + for i in range(self.MAX_RETRIES): + self.is_connected.wait() + + try: + return self._send(data) + except (OSError, TimeoutError): + log.debug("Retrying {}".format(type(data))) + continue + else: + return None