Reorganize AES module
This commit is contained in:
parent
f0c801be8c
commit
c5911a6f6b
@ -1213,11 +1213,13 @@ class Client(Methods, BaseClient):
|
||||
chunk = r2.bytes
|
||||
|
||||
# https://core.telegram.org/cdn#decrypting-files
|
||||
decrypted_chunk = AES.ctr_decrypt(
|
||||
decrypted_chunk = AES.ctr256_decrypt(
|
||||
chunk,
|
||||
r.encryption_key,
|
||||
r.encryption_iv,
|
||||
offset
|
||||
bytearray(
|
||||
r.encryption_iv[:-4]
|
||||
+ (offset // 16).to_bytes(4, "big")
|
||||
)
|
||||
)
|
||||
|
||||
hashes = session.send(
|
||||
|
@ -22,72 +22,113 @@ log = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import tgcrypto
|
||||
|
||||
log.info("Using TgCrypto")
|
||||
|
||||
|
||||
class AES:
|
||||
@classmethod
|
||||
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return tgcrypto.ige256_encrypt(data, key, iv)
|
||||
|
||||
@classmethod
|
||||
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return tgcrypto.ige256_decrypt(data, key, iv)
|
||||
|
||||
@staticmethod
|
||||
def ctr256_encrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
|
||||
return tgcrypto.ctr256_encrypt(data, key, iv, state or bytearray(1))
|
||||
|
||||
@staticmethod
|
||||
def ctr256_decrypt(data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
|
||||
return tgcrypto.ctr256_decrypt(data, key, iv, state or bytearray(1))
|
||||
|
||||
@staticmethod
|
||||
def xor(a: bytes, b: bytes) -> bytes:
|
||||
return int.to_bytes(
|
||||
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
|
||||
len(a),
|
||||
"big",
|
||||
)
|
||||
except ImportError:
|
||||
import pyaes
|
||||
|
||||
log.warning(
|
||||
"TgCrypto is missing! "
|
||||
"Pyrogram will work the same, but at a much slower speed. "
|
||||
"More info: https://docs.pyrogram.ml/resources/TgCrypto"
|
||||
)
|
||||
is_fast = False
|
||||
import pyaes
|
||||
else:
|
||||
log.info("Using TgCrypto")
|
||||
is_fast = True
|
||||
|
||||
|
||||
# TODO: Ugly IFs
|
||||
class AES:
|
||||
@classmethod
|
||||
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
if is_fast:
|
||||
return tgcrypto.ige256_encrypt(data, key, iv)
|
||||
else:
|
||||
class AES:
|
||||
@classmethod
|
||||
def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return cls.ige(data, key, iv, True)
|
||||
|
||||
@classmethod
|
||||
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
if is_fast:
|
||||
return tgcrypto.ige256_decrypt(data, key, iv)
|
||||
else:
|
||||
@classmethod
|
||||
def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return cls.ige(data, key, iv, False)
|
||||
|
||||
@staticmethod
|
||||
def ctr256_encrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
|
||||
if is_fast:
|
||||
return tgcrypto.ctr256_decrypt(data, key, iv, state)
|
||||
else:
|
||||
ctr = pyaes.AESModeOfOperationCTR(key)
|
||||
ctr._counter._counter = list(iv)
|
||||
return ctr.decrypt(data)
|
||||
@classmethod
|
||||
def ctr256_encrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
|
||||
return cls.ctr(data, key, iv, state or bytearray(1))
|
||||
|
||||
@staticmethod
|
||||
def ctr256_decrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
|
||||
return AES.ctr256_encrypt(data, key, iv, state)
|
||||
@classmethod
|
||||
def ctr256_decrypt(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray = None) -> bytes:
|
||||
return cls.ctr(data, key, iv, state or bytearray(1))
|
||||
|
||||
@staticmethod
|
||||
def xor(a: bytes, b: bytes) -> bytes:
|
||||
return int.to_bytes(
|
||||
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
|
||||
len(a),
|
||||
"big",
|
||||
)
|
||||
@staticmethod
|
||||
def xor(a: bytes, b: bytes) -> bytes:
|
||||
return int.to_bytes(
|
||||
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
|
||||
len(a),
|
||||
"big",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
|
||||
cipher = pyaes.AES(key)
|
||||
@classmethod
|
||||
def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
|
||||
cipher = pyaes.AES(key)
|
||||
|
||||
iv_1 = iv[:16]
|
||||
iv_2 = iv[16:]
|
||||
iv_1 = iv[:16]
|
||||
iv_2 = iv[16:]
|
||||
|
||||
data = [data[i: i + 16] for i in range(0, len(data), 16)]
|
||||
data = [data[i: i + 16] for i in range(0, len(data), 16)]
|
||||
|
||||
if encrypt:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
|
||||
iv_2 = chunk
|
||||
else:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
|
||||
iv_1 = chunk
|
||||
if encrypt:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
|
||||
iv_2 = chunk
|
||||
else:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
|
||||
iv_1 = chunk
|
||||
|
||||
return b"".join(data)
|
||||
return b"".join(data)
|
||||
|
||||
@classmethod
|
||||
def ctr(cls, data: bytes, key: bytes, iv: bytearray, state: bytearray) -> bytes:
|
||||
cipher = pyaes.AES(key)
|
||||
|
||||
out = bytearray(data)
|
||||
chunk = cipher.encrypt(iv)
|
||||
|
||||
for i in range(0, len(data), 16):
|
||||
for j in range(0, min(len(data) - i, 16)):
|
||||
out[i + j] ^= chunk[state[0]]
|
||||
|
||||
state[0] += 1
|
||||
|
||||
if state[0] >= 16:
|
||||
state[0] = 0
|
||||
|
||||
if state[0] == 0:
|
||||
for k in range(15, -1, -1):
|
||||
try:
|
||||
iv[k] += 1
|
||||
break
|
||||
except ValueError:
|
||||
iv[k] = 0
|
||||
|
||||
chunk = cipher.encrypt(iv)
|
||||
|
||||
return out
|
||||
|
Loading…
Reference in New Issue
Block a user