import contextlib import re from typing import Optional, List from datetime import datetime, timedelta from pyrogram.enums import ParseMode from pyrogram.types import ChatPermissions, Chat from pyrogram.types.user_and_chats.user import Link from pagermaid import bot from pagermaid.listener import listener from pagermaid.single_utils import sqlite, Message from pagermaid.scheduler import add_delete_message_job class KeywordTask: task_id: Optional[int] cid: int key: str msg: str include: bool regexp: bool exact: bool case: bool ignore_forward: bool reply: bool delete: bool ban: int restrict: int delay_delete: int source_delay_delete: Optional[int] = 0 def __init__( self, task_id: Optional[int] = None, cid: int = 0, key: str = "", msg: str = "", include: bool = True, regexp: bool = False, exact: bool = False, case: bool = False, ignore_forward: bool = False, reply: bool = True, delete: bool = False, ban: int = 0, restrict: int = 0, delay_delete: int = 0, source_delay_delete: int = 0, ): self.task_id = task_id self.cid = cid self.key = key self.msg = msg self.include = include self.regexp = regexp self.exact = exact self.case = case self.ignore_forward = ignore_forward self.reply = reply self.delete = delete self.ban = ban self.restrict = restrict self.delay_delete = delay_delete self.source_delay_delete = source_delay_delete def export(self): return { "task_id": self.task_id, "cid": self.cid, "key": self.key, "msg": self.msg, "include": self.include, "regexp": self.regexp, "exact": self.exact, "case": self.case, "ignore_forward": self.ignore_forward, "reply": self.reply, "delete": self.delete, "ban": self.ban, "restrict": self.restrict, "delay_delete": self.delay_delete, "source_delay_delete": self.source_delay_delete, } def export_str(self, show_all: bool = False): text = f"{self.task_id} - " text += f"{self.key} - " if show_all: text += f"{self.cid} - " text += f"{self.msg}" text += f"{self.source_delay_delete}" return text def save_to_file(self): data = sqlite.get("keyword_tasks", []) for i in data: if i["task_id"] == self.task_id: data.remove(i) break data.append(self.export()) sqlite["keyword_tasks"] = data def check_need_reply(self, message: Message) -> bool: if not message.text and not message.caption: return False if self.ignore_forward and message.forward_date: return False text = message.text or message.caption key = self.key if self.regexp: return bool(re.search(key, text)) if not self.case: text = text.lower() key = key.lower() if self.include and text.find(key) != -1: return True return bool(self.exact and text == key) @staticmethod def mention_chat(chat: Chat): return ( f'{chat.title}' if chat.username else f"{chat.title}" ) def replace_reply(self, message: Message): text = self.msg if message.from_user: text = text.replace( "$mention", str( Link( f"tg://user?id={message.from_user.id}", message.from_user.first_name or "Deleted Account", ParseMode.HTML, ) ), ) text = text.replace("$code_id", str(message.from_user.id)) text = text.replace( "$code_name", message.from_user.first_name or "Deleted Account" ) elif message.sender_chat: text = text.replace("$mention", self.mention_chat(message.sender_chat)) text = text.replace("$code_id", str(message.sender_chat.id)) text = text.replace("$code_name", message.sender_chat.title) else: text = text.replace("$mention", "") text = text.replace("$code_id", "") text = text.replace("$code_name", "") if self.delay_delete: text = text.replace("$delay_delete", str(self.delay_delete)) else: text = text.replace("$delay_delete", "") return text async def process_keyword(self, message: Message): msg = None text = self.replace_reply(message) reply_id = message.id if self.reply else None with contextlib.suppress(Exception): msg = await message.reply( text, parse_mode=ParseMode.HTML, reply_to_message_id=reply_id, message_thread_id=message.message_thread_id, ) if self.delete: if self.source_delay_delete > 0: add_delete_message_job(message, self.source_delay_delete) else: await message.safe_delete() uid = message.from_user.id if message.from_user else message.sender_chat.id if self.ban > 0: with contextlib.suppress(Exception): await bot.ban_chat_member( message.chat.id, uid, until_date=datetime.now() + timedelta(seconds=self.ban), ) if self.restrict > 0: with contextlib.suppress(Exception): await bot.restrict_chat_member( message.chat.id, uid, ChatPermissions(), until_date=datetime.now() + timedelta(seconds=self.restrict), ) if self.delay_delete > 0 and msg: add_delete_message_job(msg, self.delay_delete) def parse_task(self, text: str): data = text.split("\n+++\n") if len(data) < 2: raise ValueError("Invalid task format") for i in data: if i == "": raise ValueError("Invalid task format") self.key = data[0] self.msg = data[1] if len(data) > 2: temp = data[2].split(" ") for i in temp: if i.startswith("include"): self.include = True elif i.startswith("exact"): self.include = False self.exact = True elif i.startswith("regexp"): self.regexp = True elif i.startswith("case"): self.case = True elif i.startswith("ignore_forward"): self.ignore_forward = True else: raise ValueError("Invalid task format") if self.include and self.exact: raise ValueError("Invalid task format") if len(data) > 3: temp = data[3].split(" ") for i in temp: if i.startswith("reply"): self.reply = True elif i.startswith("delete"): self.delete = True elif i.startswith("ban"): self.ban = int(i.replace("ban", "")) elif i.startswith("restrict"): self.restrict = int(i.replace("restrict", "")) else: raise ValueError("Invalid task format") if len(data) > 4: self.delay_delete = int(data[4]) if ( len(data) > 5 ): # assuming the source_delay_delete is the 6th part of the task format self.source_delay_delete = int(data[5]) if ( self.ban < 0 or self.restrict < 0 or self.delay_delete < 0 or self.source_delay_delete < 0 ): raise ValueError("Invalid task format") class KeywordAlias: def __init__(self): self.aliases = sqlite.get("keyword_alias", {}) self.save() def save(self): sqlite["keyword_alias"] = self.aliases def add(self, from_cid: int, to_cid: int): self.aliases[from_cid] = to_cid self.save() def remove(self, from_cid: int): self.aliases.pop(from_cid, None) self.save() def get(self, from_cid: int) -> Optional[int]: return self.aliases.get(from_cid, None) class KeywordTasks: tasks: List[KeywordTask] def __init__(self): self.tasks = [] def add(self, task: KeywordTask): for i in self.tasks: if i.task_id == task.task_id: return self.tasks.append(task) def remove(self, task_id: int): for task in self.tasks: if task.task_id == task_id: self.tasks.remove(task) return True return False def remove_by_ids(self, task_ids: List[int]): success, failed = 0, 0 for task_id in task_ids: if self.remove(task_id): success += 1 else: failed += 1 return success, failed def get(self, task_id: int) -> Optional[KeywordTask]: return next((task for task in self.tasks if task.task_id == task_id), None) def get_all(self) -> List[KeywordTask]: return self.tasks def get_all_ids(self) -> List[int]: return [task.task_id for task in self.tasks] def print_all_tasks(self, show_all: bool = False, cid: int = 0) -> str: return "\n".join( task.export_str(show_all) for task in self.tasks if task.cid == cid or show_all ) def save_to_file(self): data = [task.export() for task in self.tasks] sqlite["keyword_tasks"] = data def load_from_file(self): data = sqlite.get("keyword_tasks", []) for i in data: self.add(KeywordTask(**i)) def get_next_task_id(self): return max(task.task_id for task in self.tasks) + 1 if self.tasks else 1 def get_tasks_for_chat(self, cid: int) -> List[KeywordTask]: return [task for task in self.tasks if task.cid == cid] async def check_and_reply(self, message: Message): if keyword_alias.get(message.chat.id): for task in self.get_tasks_for_chat(keyword_alias.get(message.chat.id)): if task.check_need_reply(message): with contextlib.suppress(Exception): await task.process_keyword(message) for task in self.get_tasks_for_chat(message.chat.id): if task.check_need_reply(message): with contextlib.suppress(Exception): await task.process_keyword(message) keyword_alias = KeywordAlias() keyword_tasks = KeywordTasks() keyword_tasks.load_from_file() async def from_msg_get_task_ids(message: Message) -> List[int]: id_list = message.parameter[1].split(",") try: id_list = [int(i) for i in id_list] except ValueError: await message.edit("请输入正确的参数") message.continue_propagation() return id_list @listener( command="keyword", parameters="指定参数", need_admin=True, description="关键词回复\n\nhttps://telegra.ph/PagerMaid-keyword-07-12", ) async def keyword_set(message: Message): if message.arguments == "h" or len(message.parameter) == 0: return await message.edit("关键词回复\n\nhttps://telegra.ph/PagerMaid-keyword-07-12") if len(message.parameter) == 1: if message.parameter[0] == "list": if keyword_tasks.get_all_ids(): return await message.edit( f"关键词任务:\n\n{keyword_tasks.print_all_tasks(show_all=False, cid=message.chat.id)}" ) else: return await message.edit("没有关键词任务。") elif message.parameter[0] == "alias": if keyword_alias.get(message.chat.id): return await message.edit( f"当前群组的关键字将继承:{keyword_alias.get(message.chat.id)}" ) else: return await message.edit("当前群组没有继承。") elif len(message.parameter) == 2: if message.parameter[0] == "rm": if id_list := await from_msg_get_task_ids(message): success, failed = keyword_tasks.remove_by_ids(id_list) keyword_tasks.save_to_file() keyword_tasks.load_from_file() return await message.edit(f"已删除任务成功 {success} 个,失败 {failed} 个。") elif message.parameter[0] == "list": if keyword_tasks.get_all_ids(): return await message.edit( f"关键词任务:\n\n{keyword_tasks.print_all_tasks(show_all=True)}" ) else: return await message.edit("没有关键词任务。") elif message.parameter[0] == "alias": if message.parameter[1] == "rm": if not keyword_alias.get(message.chat.id): return await message.edit("当前群组没有继承。") keyword_alias.remove(message.chat.id) return await message.edit("已删除继承。") else: try: cid = int(message.parameter[1]) keyword_alias.add(message.chat.id, cid) return await message.edit(f"已添加继承:{cid}") except ValueError: return await message.edit("请输入正确的参数。") # add task task = KeywordTask(keyword_tasks.get_next_task_id()) task.cid = message.chat.id try: task.parse_task(message.arguments) except Exception as e: return await message.edit(f"参数错误:{e}") keyword_tasks.add(task) keyword_tasks.save_to_file() keyword_tasks.load_from_file() await message.edit(f"已添加关键词任务 {task.task_id}") @listener(is_plugin=True, incoming=True, outgoing=False) async def process_keyword(message): with contextlib.suppress(Exception): await keyword_tasks.check_and_reply(message)