# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-2018 Dan Tès # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . import asyncio import logging import sys import time from base64 import b64decode, b64encode from concurrent.futures.thread import ThreadPoolExecutor from struct import pack from weakref import proxy from pyrogram.api.errors import FloodWait from pyrogram.client import types as pyrogram_types from ...api import types, functions from ...api.errors import StickersetInvalid log = logging.getLogger(__name__) # TODO: Organize the code better? class Str(str): __slots__ = "_client", "_entities" def __init__(self, *args): super().__init__() self._client = None self._entities = None def init(self, client, entities): self._client = client self._entities = entities @property def text(self): return self @property def markdown(self): return self._client.markdown.unparse(self, self._entities) @property def html(self): return self._client.html.unparse(self, self._entities) async def ainput(prompt: str = ""): print(prompt, end="", flush=True) with ThreadPoolExecutor(1) as executor: return (await asyncio.get_event_loop().run_in_executor( executor, sys.stdin.readline )).rstrip() ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityCashtag.ID: "cashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", types.MessageEntityMentionName.ID: "text_mention", types.MessageEntityPhone.ID: "phone_number" } def parse_entities(entities: list, users: dict) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append( pyrogram_types.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None), user=parse_user( users.get( getattr(entity, "user_id", None), None ) ) ) ) return output_entities def parse_chat_photo(photo): if not isinstance(photo, (types.UserProfilePhoto, types.ChatPhoto)): return None if not isinstance(photo.photo_small, types.FileLocation): return None if not isinstance(photo.photo_big, types.FileLocation): return None photo_id = getattr(photo, "photo_id", 0) loc_small = photo.photo_small loc_big = photo.photo_big return pyrogram_types.ChatPhoto( small_file_id=encode( pack( " pyrogram_types.UserStatus or None: if is_bot: return None status = pyrogram_types.UserStatus(user_id) if isinstance(user_status, types.UserStatusOnline): status.online = True status.date = user_status.expires elif isinstance(user_status, types.UserStatusOffline): status.offline = True status.date = user_status.was_online elif isinstance(user_status, types.UserStatusRecently): status.recently = True elif isinstance(user_status, types.UserStatusLastWeek): status.within_week = True elif isinstance(user_status, types.UserStatusLastMonth): status.within_month = True else: status.long_time_ago = True return status def parse_user(user: types.User) -> pyrogram_types.User or None: return pyrogram_types.User( id=user.id, is_self=user.is_self, is_contact=user.contact, is_mutual_contact=user.mutual_contact, is_deleted=user.deleted, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code, phone_number=user.phone, photo=parse_chat_photo(user.photo), status=parse_user_status(user.status, is_bot=user.bot), restriction_reason=user.restriction_reason ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name, photo=parse_chat_photo(user.photo), restriction_reason=user.restriction_reason ) def parse_chat_chat(chat: types.Chat) -> pyrogram_types.Chat: admins_enabled = getattr(chat, "admins_enabled", None) if admins_enabled is not None: admins_enabled = not admins_enabled return pyrogram_types.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=admins_enabled, photo=parse_chat_photo(getattr(chat, "photo", None)) ) def parse_channel_chat(channel: types.Channel) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=getattr(channel, "username", None), photo=parse_chat_photo(getattr(channel, "photo", None)), restriction_reason=getattr(channel, "restriction_reason") ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram_types.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram_types.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=") # TODO: Reorganize code, maybe split parts as well async def parse_messages( client, messages: list or types.Message or types.MessageService or types.MessageEmpty, users: dict, chats: dict, replies: int = 1 ) -> pyrogram_types.Message or list: is_list = isinstance(messages, list) messages = messages if is_list else [messages] parsed_messages = [] for message in messages: if isinstance(message, types.Message): entities = parse_entities(message.entities, users) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None venue = None audio = None voice = None animation = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " pyrogram_types.Messages: parsed_messages = [] for message in messages: parsed_messages.append( pyrogram_types.Message( message_id=message, chat=(pyrogram_types.Chat(id=int("-100" + str(channel_id)), type="channel") if channel_id is not None else None) ) ) return pyrogram_types.Messages(len(parsed_messages), parsed_messages) def get_peer_id(input_peer) -> int: return ( input_peer.user_id if isinstance(input_peer, types.InputPeerUser) else -input_peer.chat_id if isinstance(input_peer, types.InputPeerChat) else int("-100" + str(input_peer.channel_id)) ) def get_input_peer(peer_id: int, access_hash: int): return ( types.InputPeerUser(peer_id, access_hash) if peer_id > 0 else types.InputPeerChannel(int(str(peer_id)[4:]), access_hash) if (str(peer_id).startswith("-100") and access_hash) else types.InputPeerChat(-peer_id) ) def get_offset_date(dialogs): for m in reversed(dialogs.messages): if isinstance(m, types.MessageEmpty): continue else: return m.date else: return 0 def parse_profile_photos(photos): if isinstance(photos, types.photos.Photos): total_count = len(photos.photos) else: total_count = photos.count user_profile_photos = [] for photo in photos.photos: if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( "