diff --git a/pyrogram/crypto/aes.py b/pyrogram/crypto/aes.py index 8ca72535..05a01044 100644 --- a/pyrogram/crypto/aes.py +++ b/pyrogram/crypto/aes.py @@ -16,33 +16,52 @@ # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . +import logging + +log = logging.getLogger(__name__) + try: import tgcrypto -except ImportError as e: - e.msg = ( - "TgCrypto is missing and Pyrogram can't run without. " - "Please install it using \"pip3 install tgcrypto\". " +except ImportError: + log.warning( + "TgCrypto is missing! " + "Pyrogram will work the same, but at a much slower speed. " "More info: https://docs.pyrogram.ml/resources/TgCrypto" ) - - raise e + is_fast = False + import pyaes +else: + log.info("Using TgCrypto") + is_fast = True +# TODO: Ugly IFs class AES: @classmethod def ige_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: - return tgcrypto.ige_encrypt(data, key, iv) + if is_fast: + return tgcrypto.ige_encrypt(data, key, iv) + else: + return cls.ige(data, key, iv, True) @classmethod def ige_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: - return tgcrypto.ige_decrypt(data, key, iv) + if is_fast: + return tgcrypto.ige_decrypt(data, key, iv) + else: + return cls.ige(data, key, iv, False) @staticmethod def ctr_decrypt(data: bytes, key: bytes, iv: bytes, offset: int) -> bytes: replace = int.to_bytes(offset // 16, 4, "big") iv = iv[:-4] + replace - return tgcrypto.ctr_decrypt(data, key, iv) + if is_fast: + return tgcrypto.ctr_decrypt(data, key, iv) + else: + ctr = pyaes.AESModeOfOperationCTR(key) + ctr._counter._counter = list(iv) + return ctr.decrypt(data) @staticmethod def xor(a: bytes, b: bytes) -> bytes: @@ -51,3 +70,23 @@ class AES: len(a), "big", ) + + @classmethod + def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes: + cipher = pyaes.AES(key) + + iv_1 = iv[:16] + iv_2 = iv[16:] + + data = [data[i: i + 16] for i in range(0, len(data), 16)] + + if encrypt: + for i, chunk in enumerate(data): + iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2) + iv_2 = chunk + else: + for i, chunk in enumerate(data): + iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1) + iv_1 = chunk + + return b"".join(data)