From c5911a6f6b53b838f698a206866932e58369a5bd Mon Sep 17 00:00:00 2001 From: Dan <14043624+delivrance@users.noreply.github.com> Date: Sat, 19 May 2018 15:36:38 +0200 Subject: [PATCH] Reorganize AES module --- pyrogram/client/client.py | 8 ++- pyrogram/crypto/aes.py | 141 ++++++++++++++++++++++++-------------- 2 files changed, 96 insertions(+), 53 deletions(-) diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index 24a09a5c..209488f7 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -1213,11 +1213,13 @@ class Client(Methods, BaseClient): chunk = r2.bytes # https://core.telegram.org/cdn#decrypting-files - decrypted_chunk = AES.ctr_decrypt( + decrypted_chunk = AES.ctr256_decrypt( chunk, r.encryption_key, - r.encryption_iv, - offset + bytearray( + r.encryption_iv[:-4] + + (offset // 16).to_bytes(4, "big") + ) ) hashes = session.send( diff --git a/pyrogram/crypto/aes.py b/pyrogram/crypto/aes.py index 19eec9a1..f16688c4 100644 --- a/pyrogram/crypto/aes.py +++ b/pyrogram/crypto/aes.py @@ -22,72 +22,113 @@ log = logging.getLogger(__name__) try: import tgcrypto + + log.info("Using TgCrypto") + + + class AES: + @classmethod + def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: + return tgcrypto.ige256_encrypt(data, key, iv) + + @classmethod + def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: + return tgcrypto.ige256_decrypt(data, key, iv) + + @staticmethod + def ctr256_encrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: + return tgcrypto.ctr256_encrypt(data, key, iv, state or bytearray(1)) + + @staticmethod + def ctr256_decrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: + return tgcrypto.ctr256_decrypt(data, key, iv, state or bytearray(1)) + + @staticmethod + def xor(a: bytes, b: bytes) -> bytes: + return int.to_bytes( + int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), + len(a), + "big", + ) except ImportError: + import pyaes + log.warning( "TgCrypto is missing! " "Pyrogram will work the same, but at a much slower speed. " "More info: https://docs.pyrogram.ml/resources/TgCrypto" ) - is_fast = False - import pyaes -else: - log.info("Using TgCrypto") - is_fast = True -# TODO: Ugly IFs -class AES: - @classmethod - def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: - if is_fast: - return tgcrypto.ige256_encrypt(data, key, iv) - else: + class AES: + @classmethod + def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: return cls.ige(data, key, iv, True) - @classmethod - def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: - if is_fast: - return tgcrypto.ige256_decrypt(data, key, iv) - else: + @classmethod + def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: return cls.ige(data, key, iv, False) - @staticmethod - def ctr256_encrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes: - if is_fast: - return tgcrypto.ctr256_decrypt(data, key, iv, state) - else: - ctr = pyaes.AESModeOfOperationCTR(key) - ctr._counter._counter = list(iv) - return ctr.decrypt(data) + @classmethod + def ctr256_encrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: + return cls.ctr(data, key, iv, state or bytearray(1)) - @staticmethod - def ctr256_decrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes: - return AES.ctr256_encrypt(data, key, iv, state) + @classmethod + def ctr256_decrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes: + return cls.ctr(data, key, iv, state or bytearray(1)) - @staticmethod - def xor(a: bytes, b: bytes) -> bytes: - return int.to_bytes( - int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), - len(a), - "big", - ) + @staticmethod + def xor(a: bytes, b: bytes) -> bytes: + return int.to_bytes( + int.from_bytes(a, "big") ^ int.from_bytes(b, "big"), + len(a), + "big", + ) - @classmethod - def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes: - cipher = pyaes.AES(key) + @classmethod + def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes: + cipher = pyaes.AES(key) - iv_1 = iv[:16] - iv_2 = iv[16:] + iv_1 = iv[:16] + iv_2 = iv[16:] - data = [data[i: i + 16] for i in range(0, len(data), 16)] + data = [data[i: i + 16] for i in range(0, len(data), 16)] - if encrypt: - for i, chunk in enumerate(data): - iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2) - iv_2 = chunk - else: - for i, chunk in enumerate(data): - iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1) - iv_1 = chunk + if encrypt: + for i, chunk in enumerate(data): + iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2) + iv_2 = chunk + else: + for i, chunk in enumerate(data): + iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1) + iv_1 = chunk - return b"".join(data) + return b"".join(data) + + @classmethod + def ctr(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray) -> bytes: + cipher = pyaes.AES(key) + + out = bytearray(data) + chunk = cipher.encrypt(iv) + + for i in range(0, len(data), 16): + for j in range(0, min(len(data) - i, 16)): + out[i + j] ^= chunk[state[0]] + + state[0] += 1 + + if state[0] >= 16: + state[0] = 0 + + if state[0] == 0: + for k in range(15, -1, -1): + try: + iv[k] += 1 + break + except ValueError: + iv[k] = 0 + + chunk = cipher.encrypt(iv) + + return out