# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-2018 Dan Tès # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . from base64 import b64decode, b64encode from struct import pack from weakref import proxy from pyrogram.client import types as pyrogram_types from ...api import types, functions from ...api.errors import StickersetInvalid # TODO: Organize the code better? ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", types.MessageEntityMentionName.ID: "text_mention" } def parse_entities(entities: list, users: dict) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append(pyrogram_types.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None), user=parse_user( users.get( getattr(entity, "user_id", None), None ) ) )) return output_entities def parse_chat_photo(photo): if not isinstance(photo, (types.UserProfilePhoto, types.ChatPhoto)): return None if not isinstance(photo.photo_small, types.FileLocation): return None if not isinstance(photo.photo_big, types.FileLocation): return None photo_id = getattr(photo, "photo_id", 0) loc_small = photo.photo_small loc_big = photo.photo_big return pyrogram_types.ChatPhoto( small_file_id=encode( pack( " pyrogram_types.User or None: return pyrogram_types.User( id=user.id, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code, phone_number=user.phone, photo=parse_chat_photo(user.photo) ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram_types.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name, photo=parse_chat_photo(user.photo) ) def parse_chat_chat(chat: types.Chat) -> pyrogram_types.Chat: admins_enabled = getattr(chat, "admins_enabled", None) if admins_enabled is not None: admins_enabled = not admins_enabled return pyrogram_types.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=admins_enabled, photo=parse_chat_photo(getattr(chat, "photo", None)) ) def parse_channel_chat(channel: types.Channel) -> pyrogram_types.Chat: return pyrogram_types.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=getattr(channel, "username", None), photo=parse_chat_photo(getattr(channel, "photo", None)) ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram_types.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram_types.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=") # TODO: Reorganize code, maybe split parts as well def parse_messages( client, messages: list or types.Message or types.MessageService or types.MessageEmpty, users: dict, chats: dict, replies: int = 1 ) -> pyrogram_types.Message: is_list = isinstance(messages, list) messages = messages if is_list else [messages] parsed_messages = [] for message in messages: if isinstance(message, types.Message): entities = parse_entities(message.entities, users) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None venue = None audio = None voice = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " int: return ( input_peer.user_id if isinstance(input_peer, types.InputPeerUser) else -input_peer.chat_id if isinstance(input_peer, types.InputPeerChat) else int("-100" + str(input_peer.channel_id)) ) def get_input_peer(peer_id: int, access_hash: int): return ( types.InputPeerUser(peer_id, access_hash) if peer_id > 0 else types.InputPeerChannel(int(str(peer_id)[4:]), access_hash) if (str(peer_id).startswith("-100") and access_hash) else types.InputPeerChat(-peer_id) ) def get_offset_date(dialogs): for m in reversed(dialogs.messages): if isinstance(m, types.MessageEmpty): continue else: return m.date else: return 0 def parse_photos(photos): if isinstance(photos, types.photos.Photos): total_count = len(photos.photos) else: total_count = photos.count user_profile_photos = [] for photo in photos.photos: if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram_types.PhotoSize( file_id=encode( pack( " pyrogram_types.Chat: if isinstance(chat_full, types.UserFull): chat = parse_user_chat(chat_full.user) chat.description = chat_full.about else: full_chat = chat_full.full_chat chat = None for i in chat_full.chats: if full_chat.id == i.id: chat = i if isinstance(full_chat, types.ChatFull): chat = parse_chat_chat(chat) else: chat = parse_channel_chat(chat) chat.description = full_chat.about or None # TODO: Add StickerSet type chat.can_set_sticker_set = full_chat.can_set_stickers chat.sticker_set_name = full_chat.stickerset if full_chat.pinned_msg_id: chat.pinned_message = client.get_messages( int("-100" + str(full_chat.id)), full_chat.pinned_msg_id ) if isinstance(full_chat.exported_invite, types.ChatInviteExported): chat.invite_link = full_chat.exported_invite.link return chat