Rename AES class methods and add ctr256_encrypt

This commit is contained in:
Dan 2018-05-18 14:15:35 +02:00
parent c531e6d146
commit f0c801be8c
3 changed files with 14 additions and 13 deletions

View File

@ -38,31 +38,32 @@ else:
# TODO: Ugly IFs # TODO: Ugly IFs
class AES: class AES:
@classmethod @classmethod
def ige_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: def ige256_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
if is_fast: if is_fast:
return tgcrypto.ige_encrypt(data, key, iv) return tgcrypto.ige256_encrypt(data, key, iv)
else: else:
return cls.ige(data, key, iv, True) return cls.ige(data, key, iv, True)
@classmethod @classmethod
def ige_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes: def ige256_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
if is_fast: if is_fast:
return tgcrypto.ige_decrypt(data, key, iv) return tgcrypto.ige256_decrypt(data, key, iv)
else: else:
return cls.ige(data, key, iv, False) return cls.ige(data, key, iv, False)
@staticmethod @staticmethod
def ctr_decrypt(data: bytes, key: bytes, iv: bytes, offset: int) -> bytes: def ctr256_encrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
replace = int.to_bytes(offset // 16, 4, "big")
iv = iv[:-4] + replace
if is_fast: if is_fast:
return tgcrypto.ctr_decrypt(data, key, iv) return tgcrypto.ctr256_decrypt(data, key, iv, state)
else: else:
ctr = pyaes.AESModeOfOperationCTR(key) ctr = pyaes.AESModeOfOperationCTR(key)
ctr._counter._counter = list(iv) ctr._counter._counter = list(iv)
return ctr.decrypt(data) return ctr.decrypt(data)
@staticmethod
def ctr256_decrypt(data: bytes, key: bytes, iv: bytes, state: bytes) -> bytes:
return AES.ctr256_encrypt(data, key, iv, state)
@staticmethod @staticmethod
def xor(a: bytes, b: bytes) -> bytes: def xor(a: bytes, b: bytes) -> bytes:
return int.to_bytes( return int.to_bytes(

View File

@ -163,7 +163,7 @@ class Auth:
server_nonce = int.from_bytes(server_nonce, "little", signed=True) server_nonce = int.from_bytes(server_nonce, "little", signed=True)
answer_with_hash = AES.ige_decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv) answer_with_hash = AES.ige256_decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv)
answer = answer_with_hash[20:] answer = answer_with_hash[20:]
server_dh_inner_data = Object.read(BytesIO(answer)) server_dh_inner_data = Object.read(BytesIO(answer))
@ -192,7 +192,7 @@ class Auth:
sha = sha1(data).digest() sha = sha1(data).digest()
padding = urandom(- (len(data) + len(sha)) % 16) padding = urandom(- (len(data) + len(sha)) % 16)
data_with_hash = sha + data + padding data_with_hash = sha + data + padding
encrypted_data = AES.ige_encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv) encrypted_data = AES.ige256_encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv)
log.debug("Send set_client_DH_params") log.debug("Send set_client_DH_params")
set_client_dh_params_answer = self.send( set_client_dh_params_answer = self.send(

View File

@ -222,14 +222,14 @@ class Session:
msg_key = msg_key_large[8:24] msg_key = msg_key_large[8:24]
aes_key, aes_iv = KDF(self.auth_key, msg_key, True) aes_key, aes_iv = KDF(self.auth_key, msg_key, True)
return self.auth_key_id + msg_key + AES.ige_encrypt(data + padding, aes_key, aes_iv) return self.auth_key_id + msg_key + AES.ige256_encrypt(data + padding, aes_key, aes_iv)
def unpack(self, b: BytesIO) -> Message: def unpack(self, b: BytesIO) -> Message:
assert b.read(8) == self.auth_key_id, b.getvalue() assert b.read(8) == self.auth_key_id, b.getvalue()
msg_key = b.read(16) msg_key = b.read(16)
aes_key, aes_iv = KDF(self.auth_key, msg_key, False) aes_key, aes_iv = KDF(self.auth_key, msg_key, False)
data = BytesIO(AES.ige_decrypt(b.read(), aes_key, aes_iv)) data = BytesIO(AES.ige256_decrypt(b.read(), aes_key, aes_iv))
data.read(8) data.read(8)
# https://core.telegram.org/mtproto/security_guidelines#checking-session-id # https://core.telegram.org/mtproto/security_guidelines#checking-session-id