from base64 import b64encode, b64decode from struct import pack import pyrogram from pyrogram.api import types ENTITIES = { types.MessageEntityMention.ID: "mention", types.MessageEntityHashtag.ID: "hashtag", types.MessageEntityBotCommand.ID: "bot_command", types.MessageEntityUrl.ID: "url", types.MessageEntityEmail.ID: "email", types.MessageEntityBold.ID: "bold", types.MessageEntityItalic.ID: "italic", types.MessageEntityCode.ID: "code", types.MessageEntityPre.ID: "pre", types.MessageEntityTextUrl.ID: "text_link", # TODO: text_mention } def parse_entities(entities: list) -> list: output_entities = [] for entity in entities: entity_type = ENTITIES.get(entity.ID, None) if entity_type: output_entities.append(pyrogram.MessageEntity( type=entity_type, offset=entity.offset, length=entity.length, url=getattr(entity, "url", None) )) return output_entities def parse_user(user: types.User) -> pyrogram.User or None: return pyrogram.User( id=user.id, is_bot=user.bot, first_name=user.first_name, last_name=user.last_name, username=user.username, language_code=user.lang_code ) if user else None def parse_chat(message: types.Message, users: dict, chats: dict) -> pyrogram.Chat: if isinstance(message.to_id, types.PeerUser): return parse_user_chat(users[message.to_id.user_id if message.out else message.from_id]) elif isinstance(message.to_id, types.PeerChat): return parse_chat_chat(chats[message.to_id.chat_id]) else: return parse_channel_chat(chats[message.to_id.channel_id]) def parse_user_chat(user: types.User) -> pyrogram.Chat: return pyrogram.Chat( id=user.id, type="private", username=user.username, first_name=user.first_name, last_name=user.last_name ) def parse_chat_chat(chat: types.Chat) -> pyrogram.Chat: return pyrogram.Chat( id=-chat.id, type="group", title=chat.title, all_members_are_administrators=not chat.admins_enabled ) def parse_channel_chat(channel: types.Channel) -> pyrogram.Chat: return pyrogram.Chat( id=int("-100" + str(channel.id)), type="supergroup" if channel.megagroup else "channel", title=channel.title, username=channel.username ) def parse_thumb(thumb: types.PhotoSize or types.PhotoCachedSize) -> pyrogram.PhotoSize or None: if isinstance(thumb, (types.PhotoSize, types.PhotoCachedSize)): loc = thumb.location if isinstance(thumb, types.PhotoSize): file_size = thumb.size else: file_size = len(thumb.bytes) if isinstance(loc, types.FileLocation): return pyrogram.PhotoSize( file_id=encode( pack( " pyrogram.Message: entities = parse_entities(message.entities) forward_from = None forward_from_chat = None forward_from_message_id = None forward_signature = None forward_date = None forward_header = message.fwd_from # type: types.MessageFwdHeader if forward_header: forward_date = forward_header.date if forward_header.from_id: forward_from = parse_user(users[forward_header.from_id]) else: forward_from_chat = parse_channel_chat(chats[forward_header.channel_id]) forward_from_message_id = forward_header.channel_post forward_signature = forward_header.post_author photo = None location = None contact = None audio = None voice = None video = None video_note = None sticker = None document = None media = message.media if media: if isinstance(media, types.MessageMediaPhoto): photo = media.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram.PhotoSize( file_id=encode( pack( " pyrogram.Message: action = message.action new_chat_members = None left_chat_member = None new_chat_title = None delete_chat_photo = None migrate_to_chat_id = None migrate_from_chat_id = None group_chat_created = None channel_chat_created = None new_chat_photo = None if isinstance(action, types.MessageActionChatAddUser): new_chat_members = [parse_user(users[i]) for i in action.users] elif isinstance(action, types.MessageActionChatJoinedByLink): new_chat_members = [parse_user(users[action.inviter_id])] elif isinstance(action, types.MessageActionChatDeleteUser): left_chat_member = parse_user(users[action.user_id]) elif isinstance(action, types.MessageActionChatEditTitle): new_chat_title = action.title elif isinstance(action, types.MessageActionChatDeletePhoto): delete_chat_photo = True elif isinstance(action, types.MessageActionChatMigrateTo): migrate_to_chat_id = action.channel_id elif isinstance(action, types.MessageActionChannelMigrateFrom): migrate_from_chat_id = action.chat_id elif isinstance(action, types.MessageActionChatCreate): group_chat_created = True elif isinstance(action, types.MessageActionChannelCreate): channel_chat_created = True elif isinstance(action, types.MessageActionChatEditPhoto): photo = action.photo if isinstance(photo, types.Photo): sizes = photo.sizes photo_sizes = [] for size in sizes: if isinstance(size, (types.PhotoSize, types.PhotoCachedSize)): loc = size.location if isinstance(size, types.PhotoSize): file_size = size.size else: file_size = len(size.bytes) if isinstance(loc, types.FileLocation): photo_size = pyrogram.PhotoSize( file_id=encode( pack( " bytes: s = b64decode(s + "=" * (-len(s) % 4), "-_") r = b"" assert s[-1] == 2 i = 0 while i < len(s) - 1: if s[i] != 0: r += bytes([s[i]]) else: r += b"\x00" * s[i + 1] i += 1 i += 1 return r def encode(s: bytes) -> str: r = b"" n = 0 for i in s + bytes([2]): if i == 0: n += 1 else: if n: r += b"\x00" + bytes([n]) n = 0 r += bytes([i]) return b64encode(r, b"-_").decode().rstrip("=")