# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-2018 Dan Tès # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . import asyncio import logging import sys from base64 import b64decode, b64encode from concurrent.futures.thread import ThreadPoolExecutor from struct import pack from weakref import proxy from pyrogram.client import types as pyrogram_types from ...api import types, functions from ...api.errors import StickersetInvalid, MessageIdsEmpty log = logging.getLogger(__name__) # TODO: Organize the code better? class Str(str): __slots__ = "_client", "_entities" def __init__(self, *args): super().__init__() self._client = None self._entities = None def init(self, client, entities): self._client = client self._entities = entities @property def text(self): return self @property def markdown(self): return self._client.markdown.unparse(self, self._entities) @property def html(self): return self._client.html.unparse(self, self._entities) async def ainput(prompt: str = ""): print(prompt, end="", flush=True) with ThreadPoolExecutor(1) as executor: return (await asyncio.get_event_loop().run_in_executor( executor, sys.stdin.readline )).rstrip() ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityCashtag.ID: "cashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", types.MessageEntityMentionName.ID: "text_mention", types.MessageEntityPhone.ID: "phone_number" } def parse_entities(entities: list, users: dict) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append( pyrogram_types.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None), user=parse_user( users.get( getattr(entity, "user_id", None), None ) ) ) ) return output_entities def parse_chat_photo(photo): if not isinstance(photo, (types.UserProfilePhoto, types.ChatPhoto)): return None if not isinstance(photo.photo_small, types.FileLocation): return None if not isinstance(photo.photo_big, types.FileLocation): return None photo_id = getattr(photo, "photo_id", 0) loc_small = photo.photo_small loc_big = photo.photo_big return pyrogram_types.ChatPhoto( small_file_id=encode( pack( " pyrogram_types.UserStatus or None: if is_bot: return None status = pyrogram_types.UserStatus(user_id) if isinstance(user_status, types.UserStatusOnline): status.online = True status.date = user_status.expires elif isinstance(user_status, types.UserStatusOffline): status.offline = True status.date = user_status.was_online elif isinstance(user_status, types.UserStatusRecently): status.recently = True elif isinstance(user_status, types.UserStatusLastWeek): status.within_week = True elif isinstance(user_status, types.UserStatusLastMonth): status.within_month = True else: status.long_time_ago = True return status def parse_user(user: types.User) -> pyrogram_types.User or None: return pyrogram_types.User( id=user.id, is_self=user.is_self, is_contact=user.contact, is_mutual_contact=user.mutual_contact, is_deleted=user.deleted, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code, phone_number=user.phone, photo=parse_chat_photo(user.photo), status=parse_user_status(user.status, is_bot=user.bot), restriction_reason=user.restriction_reason ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name, photo=parse_chat_photo(user.photo), restriction_reason=user.restriction_reason ) def parse_chat_chat(chat: types.Chat) -> pyrogram_types.Chat: admins_enabled = getattr(chat, "admins_enabled", None) if admins_enabled is not None: admins_enabled = not admins_enabled return pyrogram_types.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=admins_enabled, photo=parse_chat_photo(getattr(chat, "photo", None)) ) def parse_channel_chat(channel: types.Channel) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=getattr(channel, "username", None), photo=parse_chat_photo(getattr(channel, "photo", None)), restriction_reason=getattr(channel, "restriction_reason", None) ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram_types.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram_types.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=") # TODO: Reorganize code, maybe split parts as well async def parse_messages( client, messages: list or types.Message or types.MessageService or types.MessageEmpty, users: dict, chats: dict, replies: int = 1 ) -> pyrogram_types.Message or list: is_list = isinstance(messages, list) messages = messages if is_list else [messages] parsed_messages = [] for message in messages: if isinstance(message, types.Message): entities = parse_entities(message.entities, users) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None venue = None audio = None voice = None animation = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " pyrogram_types.Messages: parsed_messages = [] for message in messages: parsed_messages.append( pyrogram_types.Message( message_id=message, chat=(pyrogram_types.Chat(id=int("-100" + str(channel_id)), type="channel") if channel_id is not None else None) ) ) return pyrogram_types.Messages(len(parsed_messages), parsed_messages) def get_peer_id(input_peer) -> int: return ( input_peer.user_id if isinstance(input_peer, types.InputPeerUser) else -input_peer.chat_id if isinstance(input_peer, types.InputPeerChat) else int("-100" + str(input_peer.channel_id)) ) def get_input_peer(peer_id: int, access_hash: int): return ( types.InputPeerUser(peer_id, access_hash) if peer_id > 0 else types.InputPeerChannel(int(str(peer_id)[4:]), access_hash) if (str(peer_id).startswith("-100") and access_hash) else types.InputPeerChat(-peer_id) ) def get_offset_date(dialogs): for m in reversed(dialogs.messages): if isinstance(m, types.MessageEmpty): continue else: return m.date else: return 0 def parse_profile_photos(photos): if isinstance(photos, types.photos.Photos): total_count = len(photos.photos) else: total_count = photos.count user_profile_photos = [] for photo in photos.photos: if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( "