from typing import TYPE_CHECKING from telegram.constants import ChatType from telegram.ext import ApplicationHandlerStop, filters from gram_core.dependence.redisdb import RedisDB from gram_core.plugin import Plugin, handler, HandlerData from gram_core.services.groups.services import GroupService from gram_core.services.players import PlayersService from gram_core.services.users.services import UserAdminService from plugins.tools.chat_administrators import ChatAdministrators from utils.log import logger if TYPE_CHECKING: from telegram import Update, Message from telegram.ext import ContextTypes IGNORE_UNBOUND_USER_OPEN = """成功开启 忽略未绑定用户触发部分命令 功能,派蒙将不会响应未绑定用户的部分命令 - 此功能开启后,将会导致新用户将只会出现一次绑定账号提醒,推荐在群置顶中注明 绑定链接 https://t.me/{}?start=setcookies 或者其他使用说明。 """ IGNORE_UNBOUND_USER_CLOSE = """成功关闭 忽略未绑定用户触发部分命令 功能,派蒙将开始响应未绑定用户的部分命令""" class IgnoreUnboundUserCache: def __init__(self, redis: RedisDB): self.client = redis.client self.qname = "plugins:ignore:" def get_key(self, chat_id: int) -> str: return f"{self.qname}{chat_id}" async def ismember(self, chat_id: int, user_id: int) -> bool: qname = self.get_key(chat_id) return await self.client.sismember(qname, user_id) async def set(self, chat_id: int, user_id: int) -> bool: qname = self.get_key(chat_id) return await self.client.sadd(qname, user_id) async def remove_all(self, chat_id: int) -> bool: qname = self.get_key(chat_id) return await self.client.delete(qname) class IgnoreUnboundUser(Plugin): def __init__( self, players_service: PlayersService, group_service: GroupService, user_admin_service: UserAdminService, redis: RedisDB, ): self.players_service = players_service self.group_service = group_service self.user_admin_service = user_admin_service self.cache = redis.client self.cache_client = IgnoreUnboundUserCache(redis) async def initialize(self) -> None: self.application.run_preprocessor(self.check_update) async def check_account(self, user_id: int) -> bool: return bool(await self.players_service.get_player(user_id)) async def check_group(self, group_id: int) -> bool: return await self.group_service.is_ignore(group_id) async def check_update(self, update: "Update", _, __, context: "ContextTypes.DEFAULT_TYPE", data: "HandlerData"): if not isinstance(data, HandlerData): return if not data.player: return chat = update.effective_chat if (not chat) or chat.type not in [ChatType.SUPERGROUP, ChatType.GROUP]: return chat_id = chat.id if not await self.check_group(chat_id): # 未开启此功能 return message = update.effective_message if message: text = message.text or message.caption if text and context.bot.username in text: # 机器人被提及 return uid = await self.get_real_user_id(update) if await self.check_account(uid): # 已绑定账号 return if not await self.cache_client.ismember(chat_id, uid): # 未拦截过 await self.cache_client.set(chat_id, uid) return self.log_user(update, logger.info, "群组 %s[%s] 拦截了未绑定用户触发命令", chat.title, chat.id) raise ApplicationHandlerStop async def check_permission(self, chat_id: int, user_id: int, context: "ContextTypes.DEFAULT_TYPE") -> bool: if await self.user_admin_service.is_admin(user_id): return True admins = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id) return ChatAdministrators.is_admin(admins, user_id) async def reply_and_delete(self, message: "Message", text: str): reply = await message.reply_text(text) self.add_delete_message_job(message) self.add_delete_message_job(reply) @handler.command("ignore_unbound_user", filters=filters.ChatType.SUPERGROUP | filters.ChatType.GROUP, block=False) async def ignore_unbound_user(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"): user_id = await self.get_real_user_id(update) message = update.effective_message chat_id = update.effective_chat.id if not await self.check_permission(chat_id, user_id, context): await self.reply_and_delete(message, "您没有权限执行此操作") return self.log_user(update, logger.info, "更改群组 未绑定用户触发命令 功能状态") group = await self.group_service.get_group_by_id(chat_id) if not group: await self.reply_and_delete(message, "群组信息出现错误,请尝试重新添加机器人到群组") return group.is_ignore = not group.is_ignore await self.group_service.update_group(group) if group.is_ignore: text = IGNORE_UNBOUND_USER_OPEN.format(context.bot.username) else: text = IGNORE_UNBOUND_USER_CLOSE await self.cache_client.remove_all(chat_id) await message.reply_text(text)