# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-2018 Dan Tès # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . from base64 import b64decode, b64encode from struct import pack from pyrogram.client import types as pyrogram_types from ..api import types, functions from ..api.errors import StickersetInvalid # TODO: Organize the code better? ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", types.MessageEntityMentionName.ID: "text_mention" } def parse_entities(entities: list, users: dict) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append(pyrogram_types.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None), user=parse_user( users.get( getattr(entity, "user_id", None), None ) ) )) return output_entities def parse_chat_photo(photo): if not isinstance(photo, (types.UserProfilePhoto, types.ChatPhoto)): return None if not isinstance(photo.photo_small, types.FileLocation): return None if not isinstance(photo.photo_big, types.FileLocation): return None loc_small = photo.photo_small loc_big = photo.photo_big return pyrogram_types.ChatPhoto( small_file_id=encode( pack( " pyrogram_types.User or None: return pyrogram_types.User( id=user.id, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code, phone_number=user.phone, photo=parse_chat_photo(user.photo) ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name, photo=parse_chat_photo(user.photo) ) def parse_chat_chat(chat: types.Chat) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=not chat.admins_enabled, photo=parse_chat_photo(chat.photo) ) def parse_channel_chat(channel: types.Channel) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=getattr(channel, "username", None), photo=parse_chat_photo(getattr(channel, "photo", None)) ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram_types.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram_types.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=") # TODO: Reorganize code, maybe split parts as well def parse_message( client, message: types.Message, users: dict, chats: dict, replies: int = 1 ) -> pyrogram_types.Message: entities = parse_entities(message.entities, users) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None venue = None audio = None voice = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " pyrogram_types.Message: action = message.action new_chat_members = None left_chat_member = None new_chat_title = None delete_chat_photo = None migrate_to_chat_id = None migrate_from_chat_id = None group_chat_created = None channel_chat_created = None new_chat_photo = None if isinstance(action, types.MessageActionChatAddUser): new_chat_members = [parse_user(users[i]) for i in action.users] elif isinstance(action, types.MessageActionChatJoinedByLink): new_chat_members = [parse_user(users[message.from_id])] elif isinstance(action, types.MessageActionChatDeleteUser): left_chat_member = parse_user(users[action.user_id]) elif isinstance(action, types.MessageActionChatEditTitle): new_chat_title = action.title elif isinstance(action, types.MessageActionChatDeletePhoto): delete_chat_photo = True elif isinstance(action, types.MessageActionChatMigrateTo): migrate_to_chat_id = action.channel_id elif isinstance(action, types.MessageActionChannelMigrateFrom): migrate_from_chat_id = action.chat_id elif isinstance(action, types.MessageActionChatCreate): group_chat_created = True elif isinstance(action, types.MessageActionChannelCreate): channel_chat_created = True elif isinstance(action, types.MessageActionChatEditPhoto): photo = action.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " pyrogram_types.Message: return pyrogram_types.Message(message_id=message.id, client=client) def get_peer_id(input_peer) -> int: return ( input_peer.user_id if isinstance(input_peer, types.InputPeerUser) else -input_peer.chat_id if isinstance(input_peer, types.InputPeerChat) else int("-100" + str(input_peer.channel_id)) ) def get_input_peer(peer_id: int, access_hash: int): return ( types.InputPeerUser(peer_id, access_hash) if peer_id > 0 else types.InputPeerChannel(int(str(peer_id)[4:]), access_hash) if (str(peer_id).startswith("-100") and access_hash) else types.InputPeerChat(-peer_id) ) def get_offset_date(dialogs): for m in reversed(dialogs.messages): if isinstance(m, types.MessageEmpty): continue else: return m.date else: return 0 def parse_photos(photos): if isinstance(photos, types.photos.Photos): total_count = len(photos.photos) else: total_count = photos.count user_profile_photos = [] for photo in photos.photos: if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( "