Merge IGE and CTR into a single class (AES)
This commit is contained in:
parent
0dd5843473
commit
2b7425019b
@ -46,7 +46,7 @@ from pyrogram.api.types import (
|
||||
InputPeerEmpty, InputPeerSelf,
|
||||
InputPeerUser, InputPeerChat, InputPeerChannel
|
||||
)
|
||||
from pyrogram.crypto import CTR
|
||||
from pyrogram.crypto import AES
|
||||
from pyrogram.session import Auth, Session
|
||||
from .style import Markdown, HTML
|
||||
|
||||
@ -1633,8 +1633,6 @@ class Client:
|
||||
)
|
||||
)
|
||||
if isinstance(r, types.upload.FileCdnRedirect):
|
||||
ctr = CTR(r.encryption_key, r.encryption_iv)
|
||||
|
||||
cdn_session = Session(
|
||||
r.dc_id,
|
||||
self.test_mode,
|
||||
@ -1673,7 +1671,7 @@ class Client:
|
||||
break
|
||||
|
||||
# https://core.telegram.org/cdn#decrypting-files
|
||||
decrypted_chunk = ctr.decrypt(chunk, offset)
|
||||
decrypted_chunk = AES.ctr_decrypt(chunk, r.encryption_key, r.encryption_iv, offset)
|
||||
|
||||
# TODO: https://core.telegram.org/cdn#verifying-files
|
||||
# TODO: Save to temp file, flush each chunk, rename to full if everything is ok
|
||||
|
@ -16,8 +16,7 @@
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from .ctr import CTR
|
||||
from .ige import IGE
|
||||
from .aes import AES
|
||||
from .kdf import KDF
|
||||
from .prime import Prime
|
||||
from .rsa import RSA
|
||||
|
88
pyrogram/crypto/aes.py
Normal file
88
pyrogram/crypto/aes.py
Normal file
@ -0,0 +1,88 @@
|
||||
# Pyrogram - Telegram MTProto API Client Library for Python
|
||||
# Copyright (C) 2017-2018 Dan Tès <https://github.com/delivrance>
|
||||
#
|
||||
# This file is part of Pyrogram.
|
||||
#
|
||||
# Pyrogram is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Pyrogram is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import tgcrypto
|
||||
except ImportError:
|
||||
logging.warning("Warning: TgCrypto is missing")
|
||||
is_fast = False
|
||||
import pyaes
|
||||
else:
|
||||
log.info("Using TgCrypto")
|
||||
is_fast = True
|
||||
|
||||
|
||||
# TODO: Ugly IFs
|
||||
class AES:
|
||||
@classmethod
|
||||
def ige_encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
if is_fast:
|
||||
return tgcrypto.ige_encrypt(data, key, iv)
|
||||
else:
|
||||
return cls.ige(data, key, iv, True)
|
||||
|
||||
@classmethod
|
||||
def ige_decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
if is_fast:
|
||||
return tgcrypto.ige_decrypt(data, key, iv)
|
||||
else:
|
||||
return cls.ige(data, key, iv, False)
|
||||
|
||||
@staticmethod
|
||||
def ctr_decrypt(data: bytes, key: bytes, iv: bytes, offset: int) -> bytes:
|
||||
replace = int.to_bytes(offset // 16, byteorder="big", length=4)
|
||||
iv = iv[:-4] + replace
|
||||
|
||||
if is_fast:
|
||||
return tgcrypto.ctr_decrypt(data, key, iv)
|
||||
else:
|
||||
ctr = pyaes.AESModeOfOperationCTR(key)
|
||||
ctr._counter._counter = list(iv)
|
||||
return ctr.decrypt(data)
|
||||
|
||||
@staticmethod
|
||||
def xor(a: bytes, b: bytes) -> bytes:
|
||||
return int.to_bytes(
|
||||
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
|
||||
len(a),
|
||||
"big",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
|
||||
cipher = pyaes.AES(key)
|
||||
|
||||
iv_1 = iv[:16]
|
||||
iv_2 = iv[16:]
|
||||
|
||||
data = [data[i: i + 16] for i in range(0, len(data), 16)]
|
||||
|
||||
if encrypt:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
|
||||
iv_2 = chunk
|
||||
else:
|
||||
for i, chunk in enumerate(data):
|
||||
iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
|
||||
iv_1 = chunk
|
||||
|
||||
return b"".join(data)
|
@ -1,35 +0,0 @@
|
||||
# Pyrogram - Telegram MTProto API Client Library for Python
|
||||
# Copyright (C) 2017-2018 Dan Tès <https://github.com/delivrance>
|
||||
#
|
||||
# This file is part of Pyrogram.
|
||||
#
|
||||
# Pyrogram is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Pyrogram is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
try:
|
||||
from pyaes import AESModeOfOperationCTR
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
class CTR:
|
||||
def __init__(self, key: bytes, iv: bytes):
|
||||
self.ctr = AESModeOfOperationCTR(key)
|
||||
self.iv = iv
|
||||
|
||||
def decrypt(self, data: bytes, offset: int) -> bytes:
|
||||
replace = int.to_bytes(offset // 16, byteorder="big", length=4)
|
||||
iv = self.iv[:-4] + replace
|
||||
self.ctr._counter._counter = list(iv)
|
||||
|
||||
return self.ctr.decrypt(data)
|
@ -1,64 +0,0 @@
|
||||
# Pyrogram - Telegram MTProto API Client Library for Python
|
||||
# Copyright (C) 2017-2018 Dan Tès <https://github.com/delivrance>
|
||||
#
|
||||
# This file is part of Pyrogram.
|
||||
#
|
||||
# Pyrogram is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Lesser General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Pyrogram is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public License
|
||||
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# from pyaes import AES
|
||||
import tgcrypto
|
||||
|
||||
BLOCK_SIZE = 16
|
||||
|
||||
|
||||
# TODO: Performance optimization
|
||||
|
||||
class IGE:
|
||||
@classmethod
|
||||
def encrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return tgcrypto.ige_encrypt(data, key, iv)
|
||||
# return cls.ige(data, key, iv, True)
|
||||
|
||||
@classmethod
|
||||
def decrypt(cls, data: bytes, key: bytes, iv: bytes) -> bytes:
|
||||
return tgcrypto.ige_decrypt(data, key, iv)
|
||||
# return cls.ige(data, key, iv, False)
|
||||
|
||||
@staticmethod
|
||||
def xor(a: bytes, b: bytes) -> bytes:
|
||||
return int.to_bytes(
|
||||
int.from_bytes(a, "big") ^ int.from_bytes(b, "big"),
|
||||
len(a),
|
||||
"big",
|
||||
)
|
||||
|
||||
# @classmethod
|
||||
# def ige(cls, data: bytes, key: bytes, iv: bytes, encrypt: bool) -> bytes:
|
||||
# cipher = AES(key)
|
||||
#
|
||||
# iv_1 = iv[:BLOCK_SIZE]
|
||||
# iv_2 = iv[BLOCK_SIZE:]
|
||||
#
|
||||
# data = [data[i: i + BLOCK_SIZE] for i in range(0, len(data), BLOCK_SIZE)]
|
||||
#
|
||||
# if encrypt:
|
||||
# for i, chunk in enumerate(data):
|
||||
# iv_1 = data[i] = cls.xor(cipher.encrypt(cls.xor(chunk, iv_1)), iv_2)
|
||||
# iv_2 = chunk
|
||||
# else:
|
||||
# for i, chunk in enumerate(data):
|
||||
# iv_2 = data[i] = cls.xor(cipher.decrypt(cls.xor(chunk, iv_2)), iv_1)
|
||||
# iv_1 = chunk
|
||||
#
|
||||
# return b"".join(data)
|
@ -25,7 +25,7 @@ from os import urandom
|
||||
from pyrogram.api import functions, types
|
||||
from pyrogram.api.core import Object, Long, Int
|
||||
from pyrogram.connection import Connection
|
||||
from pyrogram.crypto import IGE, RSA, Prime
|
||||
from pyrogram.crypto import AES, RSA, Prime
|
||||
from .internals import MsgId, DataCenter
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -152,7 +152,7 @@ class Auth:
|
||||
|
||||
server_nonce = int.from_bytes(server_nonce, "little", signed=True)
|
||||
|
||||
answer_with_hash = IGE.decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv)
|
||||
answer_with_hash = AES.ige_decrypt(encrypted_answer, tmp_aes_key, tmp_aes_iv)
|
||||
answer = answer_with_hash[20:]
|
||||
|
||||
server_dh_inner_data = Object.read(BytesIO(answer))
|
||||
@ -181,7 +181,7 @@ class Auth:
|
||||
sha = sha1(data).digest()
|
||||
padding = urandom(- (len(data) + len(sha)) % 16)
|
||||
data_with_hash = sha + data + padding
|
||||
encrypted_data = IGE.encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv)
|
||||
encrypted_data = AES.ige_encrypt(data_with_hash, tmp_aes_key, tmp_aes_iv)
|
||||
|
||||
log.debug("Send set_client_DH_params")
|
||||
set_client_dh_params_answer = self.send(
|
||||
@ -236,7 +236,7 @@ class Auth:
|
||||
log.debug("Nonce fields check: OK")
|
||||
|
||||
# Step 9
|
||||
server_salt = IGE.xor(new_nonce[:8], server_nonce[:8])
|
||||
server_salt = AES.xor(new_nonce[:8], server_nonce[:8])
|
||||
|
||||
log.debug("Server salt: {}".format(int.from_bytes(server_salt, "little")))
|
||||
|
||||
|
@ -32,7 +32,7 @@ from pyrogram.api.all import layer
|
||||
from pyrogram.api.core import Message, Object, MsgContainer, Long, FutureSalt, Int
|
||||
from pyrogram.api.errors import Error
|
||||
from pyrogram.connection import Connection
|
||||
from pyrogram.crypto import IGE, KDF
|
||||
from pyrogram.crypto import AES, KDF
|
||||
from .internals import MsgId, MsgFactory, DataCenter
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -204,14 +204,14 @@ class Session:
|
||||
msg_key = msg_key_large[8:24]
|
||||
aes_key, aes_iv = KDF(self.auth_key, msg_key, True)
|
||||
|
||||
return self.auth_key_id + msg_key + IGE.encrypt(data + padding, aes_key, aes_iv)
|
||||
return self.auth_key_id + msg_key + AES.ige_encrypt(data + padding, aes_key, aes_iv)
|
||||
|
||||
def unpack(self, b: BytesIO) -> Message:
|
||||
assert b.read(8) == self.auth_key_id, b.getvalue()
|
||||
|
||||
msg_key = b.read(16)
|
||||
aes_key, aes_iv = KDF(self.auth_key, msg_key, False)
|
||||
data = BytesIO(IGE.decrypt(b.read(), aes_key, aes_iv))
|
||||
data = BytesIO(AES.ige_decrypt(b.read(), aes_key, aes_iv))
|
||||
data.read(8)
|
||||
|
||||
# https://core.telegram.org/mtproto/security_guidelines#checking-session-id
|
||||
|
Loading…
Reference in New Issue
Block a user