# -*- coding: utf-8 -*- import contextlib from asyncio import sleep from typing import Optional, List, Dict, Tuple, Union from functools import reduce from pyrogram.enums import MessageEntityType, ParseMode from pyrogram.raw.functions.messages import SendReaction from pyrogram.raw.types import ReactionEmoji, ReactionCustomEmoji, User from pyrogram.types import MessageEntity from pagermaid.dependence import sqlite from pagermaid.listener import listener from pagermaid.enums import Client, Message from pagermaid.utils import pip_install pip_install("emoji") import emoji NATIVE_EMOJI = b"\xf0\x9f\x91\x8d\xf0\x9f\x91\x8e\xe2\x9d\xa4\xef\xb8\x8f\xf0\x9f\x94\xa5\xf0\x9f\xa5\xb0\xf0\x9f\x91\x8f\xf0\x9f\x98\x81\xf0\x9f\xa4\x94\xf0\x9f\xa4\xaf\xf0\x9f\x98\xb1\xf0\x9f\xa4\xac\xf0\x9f\x98\xa2\xf0\x9f\x8e\x89\xf0\x9f\xa4\xa9\xf0\x9f\xa4\xae\xf0\x9f\x92\xa9\xf0\x9f\x99\x8f\xf0\x9f\x91\x8c\xf0\x9f\x95\x8a\xf0\x9f\xa4\xa1\xf0\x9f\xa5\xb1\xf0\x9f\xa5\xb4\xf0\x9f\x98\x8d\xf0\x9f\x90\xb3\xe2\x9d\xa4\xef\xb8\x8f\xe2\x80\x8d\xf0\x9f\x94\xa5\xf0\x9f\x8c\x9a\xf0\x9f\x8c\xad\xf0\x9f\x92\xaf\xf0\x9f\xa4\xa3\xe2\x9a\xa1\xef\xb8\x8f\xf0\x9f\x8d\x8c\xf0\x9f\x8f\x86\xf0\x9f\x92\x94\xf0\x9f\xa4\xa8\xf0\x9f\x98\x90\xf0\x9f\x8d\x93\xf0\x9f\x8d\xbe\xf0\x9f\x92\x8b\xf0\x9f\x96\x95\xf0\x9f\x98\x88\xf0\x9f\x98\x82\xf0\x9f\x98\xad".decode() SPECIAL_EMOJI = "❤⬅↔➡⬆↕⬇" # TO BE ADDED USAGE = f"""```Usage: Reply to a message: Trace : .trace 👍👎🥰 Untrace : .trace Trace keyword: .trace kw add 👍👎🥰 Del keyword : .trace kw del List all : .trace status Untrace all: .trace clean Keep log : .trace log [true|false] Toggle big : .trace big [true|false] Use with caution: Reset all : .trace resettrace ``` **Available native emojis:** {NATIVE_EMOJI} """ cached_sqlite = { key: value for key, value in sqlite.items() if key.startswith("trace.") } if cached_sqlite.get("trace.config.keep_log", None) is None: sqlite["trace.config.keep_log"] = True cached_sqlite["trace.config.keep_log"] = True if cached_sqlite.get("trace.config.big", None) is None: sqlite["trace.config.big"] = True cached_sqlite["trace.config.big"] = True async def edit_and_delete( message: Message, text: str, entities: List[MessageEntity] = None, seconds=5, parse_mode: ParseMode = ParseMode.DEFAULT, ): if entities is None: entities = [] await message.edit(text, entities=entities, parse_mode=parse_mode) if seconds == -1 or cached_sqlite["trace.config.keep_log"]: return await sleep(seconds) return await message.delete() async def print_usage(message: Message): return await edit_and_delete(message, USAGE, [], 15, ParseMode.MARKDOWN) async def get_users_by_userids(client: Client, uids) -> List[User]: return await client.get_users(uids) async def get_all_traced(client: Client) -> Dict: uid_reactions = { int(key.split("trace.user_id.")[1]): {"reactions": value} for key, value in cached_sqlite.items() if key.startswith("trace.user_id.") } user_info = await get_users_by_userids(client, uid_reactions.keys()) for user in user_info: uid_reactions[user.id]["user"] = user return uid_reactions def count_offset(text: str) -> int: return sum( 1 if c in SPECIAL_EMOJI or c not in SPECIAL_EMOJI and not emoji.is_emoji(c) else 2 for c in text ) def append_emoji_to_text( text: str, reaction_list: List[Union[ReactionEmoji, ReactionCustomEmoji]], entities: List[MessageEntity], ): if reaction_list is None: return text, entities text += "[" for reaction in reaction_list: if type(reaction) is ReactionEmoji: text += f"{reaction.emoticon}, " elif type(reaction) is ReactionCustomEmoji: entities.append( MessageEntity( type=MessageEntityType.CUSTOM_EMOJI, offset=count_offset(text), length=2, custom_emoji_id=reaction.document_id, ) ) text += "👋, " else: # Would it reach here? text += str(reaction) text = text[:-2] + "]\n" return text, entities def get_keyword_emojis_from_message(message) -> Tuple[str, List[Union[str, int]]]: return (message.parameter[0], get_emojis_from_message(message)) if message else None def get_emojis_from_message(message: Message) -> Optional[List[Union[str, int]]]: if not message: return None emoji_list = [] index = 0 entity_i = 0 # Parse input to preserve order. # TODO: Can be more elegant for c in message.text: if len(emoji_list) == 3: break if emoji.is_emoji(c): if ( message.entities and len(message.entities) - 1 >= entity_i and message.entities[entity_i].type == MessageEntityType.CUSTOM_EMOJI and message.entities[entity_i].offset == index ): emoji_list.append(message.entities[entity_i].custom_emoji_id) entity_i += 1 else: emoji_list.append(c) index += 2 if c in SPECIAL_EMOJI: index -= 1 else: index += 1 return emoji_list def get_name_and_username_from_message(message: Message): other_name = "" if message.reply_to_message.from_user.first_name: other_name += message.reply_to_message.from_user.first_name if message.reply_to_message.from_user.last_name: other_name += message.reply_to_message.from_user.last_name other_username = message.reply_to_message.from_user.username return other_name, other_username def append_username_to_text( text: str, other_name: str, other_username: str, entities: List[MessageEntity], message: Message, user: Optional[User] = None, ): if other_username: entities.append( MessageEntity( type=MessageEntityType.MENTION, offset=count_offset(text) + 2, length=count_offset(other_username), ) ) text += f" @{other_username}" elif other_name: if user: entities.append( MessageEntity( type=MessageEntityType.TEXT_MENTION, offset=count_offset(text) + 2, length=count_offset(other_name), user=user, ) ) else: entities.append( MessageEntity( type=MessageEntityType.TEXT_MENTION, offset=count_offset(text) + 2, length=count_offset(other_name), user=message.reply_to_message.from_user, ) ) text += f" {other_name}" else: text += "Some unknown ghost" text += ": " return text, entities def new_bold_string_entities(text: str) -> Tuple[str, List[MessageEntity]]: return append_bold_string("", text, []) def append_bold_string( text: str, append_text: str, entities: List[MessageEntity] ) -> Tuple[str, List[MessageEntity]]: entities.append( MessageEntity( type=MessageEntityType.BOLD, offset=count_offset(text), length=count_offset(append_text), ) ) text += append_text return text, entities async def gen_reaction_list(emojis, bot: Client): me = bot.me or await bot.get_me() reaction_list = [] if not me.is_premium: # Remove custom emojis if not premium (will it happen?) emojis = [x for x in emojis if type(x) is not int] emojis = reduce( lambda x, y: x if y in x else x + [y], [ [], ] + emojis, ) # Remove replicated for emoji in emojis: if type(emoji) is int: reaction_list.append(ReactionCustomEmoji(document_id=emoji)) elif type(emoji) is str and emoji in NATIVE_EMOJI: reaction_list.append(ReactionEmoji(emoticon=emoji)) return reaction_list def append_config( text: str, entities: List[MessageEntity] ) -> Tuple[str, List[MessageEntity]]: entities.append( MessageEntity( type=MessageEntityType.BOLD, offset=count_offset(text), length=len(f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}"), ) ) text += f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}" entities.append( MessageEntity( type=MessageEntityType.BOLD, offset=count_offset(text), length=len(f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}"), ) ) text += f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}" return text, entities @listener(command="trace", need_admin=True, diagnostics=False, description=USAGE) async def trace(bot: Client, message: Message): """ # For debug use if len(message.parameter) and message.parameter[0] == "magicword": return await message.edit(str(message)) """ if len(message.parameter) == 0: # Either untrace someone or throw error if ( message.reply_to_message is None or message.reply_to_message.from_user is None ): return await print_usage(message) other_id = message.reply_to_message.from_user.id if not cached_sqlite.get(f"trace.user_id.{other_id}", None): return await edit_and_delete( message, "This user is not in the traced list." ) prev_emojis = cached_sqlite.get(f"trace.user_id.{other_id}", None) del sqlite[f"trace.user_id.{other_id}"] del cached_sqlite[f"trace.user_id.{other_id}"] text, entities = new_bold_string_entities("Successfully untraced: \n") other_name, other_username = get_name_and_username_from_message(message) text, entities = append_username_to_text( text, other_name, other_username, entities, message ) text, entities = append_emoji_to_text(text, prev_emojis, entities) return await edit_and_delete( message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN ) elif len(message.parameter) == 1: if message.parameter[0] in ["status", "clean"]: # Get all traced info traced_uids = await get_all_traced(bot) text, entities = ( new_bold_string_entities("Traced userlist:\n") if message.parameter[0] == "status" else new_bold_string_entities("Successfully untraced: \n") ) for traced_uid in traced_uids.keys(): other_name = "" if traced_uids[traced_uid]["user"].first_name: other_name += traced_uids[traced_uid]["user"].first_name if traced_uids[traced_uid]["user"].last_name: other_name += traced_uids[traced_uid]["user"].last_name other_username = traced_uids[traced_uid]["user"].username text, entities = append_username_to_text( text, other_name, other_username, entities, message, traced_uids[traced_uid]["user"], ) text, entities = append_emoji_to_text( text, traced_uids[traced_uid]["reactions"], entities ) text, entities = append_bold_string(text, "\nTraced keywords: \n", entities) if traced_keywords := cached_sqlite.get("trace.keywordlist", None): for keyword in traced_keywords: reaction_list = cached_sqlite.get( f"trace.keyword.{keyword.encode().hex()}", None ) text += f" {keyword}: " text, entities = append_emoji_to_text(text, reaction_list, entities) if message.parameter[0] == "status": text, entities = append_config(text, entities) if message.parameter[0] == "clean": for k, v in cached_sqlite: if k.startswith("trace."): del cached_sqlite[k] del sqlite[k] return await edit_and_delete( message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN, ) elif message.parameter[0] == "resettrace": for k, v in cached_sqlite: if k.startswith("trace."): del cached_sqlite[k] del sqlite[k] return await edit_and_delete( message, "**Database has been reset.**", seconds=5, parse_mode=ParseMode.MARKDOWN, ) else: if emojis := get_emojis_from_message(message): reaction_list = await gen_reaction_list(emojis, bot) if reaction_list: sqlite[ f"trace.user_id.{message.reply_to_message.from_user.id}" ] = reaction_list cached_sqlite[ f"trace.user_id.{message.reply_to_message.from_user.id}" ] = reaction_list await bot.invoke( SendReaction( peer=await bot.resolve_peer(int(message.chat.id)), msg_id=message.reply_to_message_id, reaction=reaction_list, big=cached_sqlite["trace.config.big"], ) ) text = "Successfully traced: \n" # TODO: Add username text, entities = append_emoji_to_text(text, reaction_list, []) return await edit_and_delete( message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN, ) return await edit_and_delete(message, "No valid emojis found!") return await print_usage(message) elif len(message.parameter) == 2: # log t|f; kw del if message.parameter[0] == "log": if message.parameter[1] == "true": sqlite["trace.config.keep_log"] = True cached_sqlite["trace.config.keep_log"] = True elif message.parameter[1] == "false": sqlite["trace.config.keep_log"] = False cached_sqlite["trace.config.keep_log"] = False else: return await print_usage(message) return await message.edit( str(f"**Keep log: \n {cached_sqlite['trace.config.keep_log']}**") ) if message.parameter[0] == "big": if message.parameter[1] == "true": sqlite["trace.config.big"] = True cached_sqlite["trace.config.big"] = True elif message.parameter[1] == "false": sqlite["trace.config.big"] = False cached_sqlite["trace.config.big"] = False else: return await print_usage(message) return await message.edit( str(f"**Use big : \n {cached_sqlite['trace.config.big']}**") ) elif message.parameter[1] == "del": keyword = message.parameter[0] keyword_encoded_hex = keyword.encode().hex() keywordlist = cached_sqlite["trace.keywordlist"] if keyword not in keywordlist: return await edit_and_delete( message, f'Keyword "{keyword}" is not traced.\n{keywordlist}' ) if not cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}"): return await edit_and_delete( message, f'Keyword "{keyword}" is not traced.' ) prev_emojis = cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}") keywordlist.remove(keyword) sqlite["trace.keywordlist"] = keywordlist cached_sqlite["trace.keywordlist"] = keywordlist del sqlite[f"trace.keyword.{keyword_encoded_hex}"] del cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"] text, entities = new_bold_string_entities( "Successfully untraced keyword: \n" ) text += f" {keyword}: " text, entities = append_emoji_to_text(text, prev_emojis, entities) return await edit_and_delete( message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN, ) else: return await print_usage(message) elif len(message.parameter) == 3: keyword, emojis = get_keyword_emojis_from_message(message) keyword_encoded_hex = keyword.encode().hex() if keyword and len(emojis) != 0: reaction_list = await gen_reaction_list(emojis, bot) if reaction_list: sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list if cached_sqlite.get("trace.keywordlist", None) is None: sqlite["trace.keywordlist"] = [keyword] cached_sqlite["trace.keywordlist"] = [keyword] elif keyword not in cached_sqlite["trace.keywordlist"]: cached_sqlite["trace.keywordlist"].append(keyword) sqlite["trace.keywordlist"] = cached_sqlite["trace.keywordlist"] text, entities = new_bold_string_entities( "Successfully traced keyword: \n" ) text += f" {keyword}: " text, entities = append_emoji_to_text(text, reaction_list, entities) return await edit_and_delete( message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN, ) return await edit_and_delete(message, "No valid emojis found!") else: return await print_usage(message) @listener(incoming=True, outgoing=False, ignore_edited=True) async def trace_user(client: Client, message: Message): if message.from_user is None: return with contextlib.suppress(Exception): if reaction_list := cached_sqlite.get( f"trace.user_id.{message.from_user.id}", None ): await client.invoke( SendReaction( peer=await client.resolve_peer(int(message.chat.id)), msg_id=message.id, reaction=reaction_list, big=cached_sqlite["trace.config.big"], ) ) @listener(incoming=True, outgoing=True, ignore_edited=True) async def trace_keyword(client: Client, message: Message): if message.from_user is None: return with contextlib.suppress(Exception): if message.text: if keyword_list := cached_sqlite.get("trace.keywordlist", None): for keyword in keyword_list: if keyword in message.text: if reaction_list := cached_sqlite.get( f"trace.keyword.{keyword.encode().hex()}", None ): await client.invoke( SendReaction( peer=await client.resolve_peer( int(message.chat.id) ), msg_id=message.id, reaction=reaction_list, big=cached_sqlite["trace.config.big"], ) )