MTPyroger/pyrogram/utils.py

317 lines
9.1 KiB
Python
Raw Normal View History

2020-03-21 14:43:32 +00:00
# Pyrogram - Telegram MTProto API Client Library for Python
# Copyright (C) 2017-2020 Dan <https://github.com/delivrance>
2018-04-09 22:25:51 +00:00
#
2020-03-21 14:43:32 +00:00
# This file is part of Pyrogram.
2018-04-09 22:25:51 +00:00
#
2020-03-21 14:43:32 +00:00
# Pyrogram is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
2018-04-09 22:25:51 +00:00
#
2020-03-21 14:43:32 +00:00
# Pyrogram is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
2018-04-09 22:25:51 +00:00
#
2020-03-21 14:43:32 +00:00
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
2018-04-09 22:25:51 +00:00
import asyncio
import base64
import functools
import hashlib
import os
import struct
from concurrent.futures.thread import ThreadPoolExecutor
from getpass import getpass
from typing import Union, List, Dict, Optional
2018-04-08 11:23:26 +00:00
2020-11-29 14:48:29 +00:00
import pyrogram
from pyrogram import raw
from pyrogram import types
from pyrogram.file_id import FileId, FileType, PHOTO_TYPES, DOCUMENT_TYPES
async def ainput(prompt: str = "", *, hide: bool = False):
"""Just like the built-in input, but async"""
with ThreadPoolExecutor(1) as executor:
func = functools.partial(getpass if hide else input, prompt)
return await asyncio.get_event_loop().run_in_executor(executor, func)
def get_input_media_from_file_id(
file_id: str,
expected_file_type: FileType = None
) -> Union["raw.types.InputMediaPhoto", "raw.types.InputMediaDocument"]:
try:
decoded = FileId.decode(file_id)
except Exception:
raise ValueError(f'Failed to decode "{file_id}". The value does not represent an existing local file, '
f'HTTP URL, or valid file id.')
file_type = decoded.file_type
if expected_file_type is not None and file_type != expected_file_type:
raise ValueError(f'Expected: "{expected_file_type}", got "{file_type}" file_id instead')
if file_type in (FileType.THUMBNAIL, FileType.CHAT_PHOTO):
raise ValueError(f"This file_id can only be used for download: {file_id}")
if file_type in PHOTO_TYPES:
return raw.types.InputMediaPhoto(
id=raw.types.InputPhoto(
id=decoded.media_id,
access_hash=decoded.access_hash,
file_reference=decoded.file_reference
)
)
if file_type in DOCUMENT_TYPES:
return raw.types.InputMediaDocument(
id=raw.types.InputDocument(
id=decoded.media_id,
access_hash=decoded.access_hash,
file_reference=decoded.file_reference
)
)
raise ValueError(f"Unknown file id: {file_id}")
2019-06-08 13:13:52 +00:00
async def parse_messages(client, messages: "raw.types.messages.Messages", replies: int = 1) -> List["types.Message"]:
2019-06-08 13:13:52 +00:00
users = {i.id: i for i in messages.users}
chats = {i.id: i for i in messages.chats}
if not messages.messages:
return types.List()
2019-06-08 13:13:52 +00:00
parsed_messages = []
for message in messages.messages:
parsed_messages.append(await types.Message._parse(client, message, users, chats, replies=0))
2019-06-08 13:13:52 +00:00
if replies:
messages_with_replies = {i.id: getattr(i, "reply_to_msg_id", None) for i in messages.messages}
reply_message_ids = [i[0] for i in filter(lambda x: x[1] is not None, messages_with_replies.items())]
if reply_message_ids:
# We need a chat id, but some messages might be empty (no chat attribute available)
# Scan until we find a message with a chat available (there must be one, because we are fetching replies)
for m in parsed_messages:
if m.chat:
chat_id = m.chat.id
break
else:
chat_id = 0
reply_messages = await client.get_messages(
chat_id,
2019-06-08 13:13:52 +00:00
reply_to_message_ids=reply_message_ids,
replies=replies - 1
)
for message in parsed_messages:
reply_id = messages_with_replies[message.message_id]
for reply in reply_messages:
if reply.message_id == reply_id:
message.reply_to_message = reply
return types.List(parsed_messages)
2019-06-08 13:13:52 +00:00
def parse_deleted_messages(client, update) -> List["types.Message"]:
2019-06-08 13:13:52 +00:00
messages = update.messages
channel_id = getattr(update, "channel_id", None)
parsed_messages = []
for message in messages:
parsed_messages.append(
types.Message(
2019-06-08 13:13:52 +00:00
message_id=message,
chat=types.Chat(
id=get_channel_id(channel_id),
2019-06-08 13:13:52 +00:00
type="channel",
client=client
) if channel_id is not None else None,
client=client
)
)
return types.List(parsed_messages)
def unpack_inline_message_id(inline_message_id: str) -> "raw.types.InputBotInlineMessageID":
r = inline_message_id + "=" * (-len(inline_message_id) % 4)
r = struct.unpack("<iqq", base64.b64decode(r, altchars=b"-_"))
return raw.types.InputBotInlineMessageID(
dc_id=r[0],
id=r[1],
access_hash=r[2]
)
MIN_CHANNEL_ID = -1002147483647
MAX_CHANNEL_ID = -1000000000000
MIN_CHAT_ID = -2147483647
MAX_USER_ID = 2147483647
def get_raw_peer_id(peer: raw.base.Peer) -> Optional[int]:
2020-09-30 18:26:57 +00:00
"""Get the raw peer id from a Peer object"""
if isinstance(peer, raw.types.PeerUser):
return peer.user_id
if isinstance(peer, raw.types.PeerChat):
return peer.chat_id
if isinstance(peer, raw.types.PeerChannel):
return peer.channel_id
return None
def get_peer_id(peer: raw.base.Peer) -> int:
2020-09-30 18:26:57 +00:00
"""Get the non-raw peer id from a Peer object"""
if isinstance(peer, raw.types.PeerUser):
return peer.user_id
if isinstance(peer, raw.types.PeerChat):
return -peer.chat_id
if isinstance(peer, raw.types.PeerChannel):
return MAX_CHANNEL_ID - peer.channel_id
raise ValueError(f"Peer type invalid: {peer}")
2019-09-14 17:30:07 +00:00
def get_peer_type(peer_id: int) -> str:
if peer_id < 0:
if MIN_CHAT_ID <= peer_id:
return "chat"
if MIN_CHANNEL_ID <= peer_id < MAX_CHANNEL_ID:
return "channel"
elif 0 < peer_id <= MAX_USER_ID:
return "user"
raise ValueError(f"Peer id invalid: {peer_id}")
def get_channel_id(peer_id: int) -> int:
return MAX_CHANNEL_ID - peer_id
def btoi(b: bytes) -> int:
return int.from_bytes(b, "big")
def itob(i: int) -> bytes:
return i.to_bytes(256, "big")
def sha256(data: bytes) -> bytes:
return hashlib.sha256(data).digest()
def xor(a: bytes, b: bytes) -> bytes:
return bytes(i ^ j for i, j in zip(a, b))
def compute_password_hash(algo: raw.types.PasswordKdfAlgoSHA256SHA256PBKDF2HMACSHA512iter100000SHA256ModPow,
password: str) -> bytes:
hash1 = sha256(algo.salt1 + password.encode() + algo.salt1)
hash2 = sha256(algo.salt2 + hash1 + algo.salt2)
hash3 = hashlib.pbkdf2_hmac("sha512", hash2, algo.salt1, 100000)
return sha256(algo.salt2 + hash3 + algo.salt2)
# noinspection PyPep8Naming
def compute_password_check(r: raw.types.account.Password, password: str) -> raw.types.InputCheckPasswordSRP:
algo = r.current_algo
p_bytes = algo.p
p = btoi(algo.p)
g_bytes = itob(algo.g)
g = algo.g
B_bytes = r.srp_B
B = btoi(B_bytes)
srp_id = r.srp_id
x_bytes = compute_password_hash(algo, password)
x = btoi(x_bytes)
g_x = pow(g, x, p)
k_bytes = sha256(p_bytes + g_bytes)
k = btoi(k_bytes)
kg_x = (k * g_x) % p
while True:
a_bytes = os.urandom(256)
a = btoi(a_bytes)
A = pow(g, a, p)
A_bytes = itob(A)
u = btoi(sha256(A_bytes + B_bytes))
if u > 0:
break
g_b = (B - kg_x) % p
ux = u * x
a_ux = a + ux
S = pow(g_b, a_ux, p)
S_bytes = itob(S)
K_bytes = sha256(S_bytes)
M1_bytes = sha256(
xor(sha256(p_bytes), sha256(g_bytes))
+ sha256(algo.salt1)
+ sha256(algo.salt2)
+ A_bytes
+ B_bytes
+ K_bytes
)
return raw.types.InputCheckPasswordSRP(srp_id=srp_id, A=A_bytes, M1=M1_bytes)
2020-11-29 14:48:29 +00:00
async def parse_text_entities(
client: "pyrogram.Client",
text: str,
parse_mode: str,
entities: List["types.MessageEntity"]
) -> Dict[str, raw.base.MessageEntity]:
if entities:
# Inject the client instance because parsing user mentions requires it
for entity in entities:
entity._client = client
text, entities = text, [await entity.write() for entity in entities]
else:
text, entities = (await client.parser.parse(text, parse_mode)).values()
return {
"message": text,
"entities": entities
}
async def maybe_run_in_executor(func, data, length, loop, *args):
return (
func(data, *args)
if length <= pyrogram.CRYPTO_EXECUTOR_SIZE_THRESHOLD
else await loop.run_in_executor(pyrogram.crypto_executor, func, data, *args)
)