PamGram/plugins/group/ignore_unbound_user.py
2024-05-21 21:47:04 +08:00

130 lines
5.4 KiB
Python

from typing import TYPE_CHECKING
from telegram.constants import ChatType
from telegram.ext import ApplicationHandlerStop, filters
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin import Plugin, handler, HandlerData
from gram_core.services.groups.services import GroupService
from gram_core.services.players import PlayersService
from gram_core.services.users.services import UserAdminService
from plugins.tools.chat_administrators import ChatAdministrators
from utils.log import logger
if TYPE_CHECKING:
from telegram import Update, Message
from telegram.ext import ContextTypes
IGNORE_UNBOUND_USER_OPEN = """成功开启 忽略未绑定用户触发部分命令 功能,彦卿将不会响应未绑定用户的部分命令
- 此功能开启后,将会导致新用户将只会出现一次绑定账号提醒,推荐在群置顶中注明 绑定链接 https://t.me/{}?start=setcookies 或者其他使用说明。
"""
IGNORE_UNBOUND_USER_CLOSE = """成功关闭 忽略未绑定用户触发部分命令 功能,彦卿将开始响应未绑定用户的部分命令"""
class IgnoreUnboundUserCache:
def __init__(self, redis: RedisDB):
self.client = redis.client
self.qname = "plugins:ignore:"
def get_key(self, chat_id: int) -> str:
return f"{self.qname}{chat_id}"
async def ismember(self, chat_id: int, user_id: int) -> bool:
qname = self.get_key(chat_id)
return await self.client.sismember(qname, user_id)
async def set(self, chat_id: int, user_id: int) -> bool:
qname = self.get_key(chat_id)
return await self.client.sadd(qname, user_id)
async def remove_all(self, chat_id: int) -> bool:
qname = self.get_key(chat_id)
return await self.client.delete(qname)
class IgnoreUnboundUser(Plugin):
def __init__(
self,
players_service: PlayersService,
group_service: GroupService,
user_admin_service: UserAdminService,
redis: RedisDB,
):
self.players_service = players_service
self.group_service = group_service
self.user_admin_service = user_admin_service
self.cache = redis.client
self.cache_client = IgnoreUnboundUserCache(redis)
async def initialize(self) -> None:
self.application.run_preprocessor(self.check_update)
async def check_account(self, user_id: int) -> bool:
return bool(await self.players_service.get_player(user_id))
async def check_group(self, group_id: int) -> bool:
return await self.group_service.is_ignore(group_id)
async def check_update(self, update: "Update", _, __, context: "ContextTypes.DEFAULT_TYPE", data: "HandlerData"):
if not isinstance(data, HandlerData):
return
if not data.player:
return
chat = update.effective_chat
if (not chat) or chat.type not in [ChatType.SUPERGROUP, ChatType.GROUP]:
return
chat_id = chat.id
if not await self.check_group(chat_id):
# 未开启此功能
return
message = update.effective_message
if message:
text = message.text or message.caption
if text and context.bot.username in text:
# 机器人被提及
return
uid = await self.get_real_user_id(update)
if await self.check_account(uid):
# 已绑定账号
return
if not await self.cache_client.ismember(chat_id, uid):
# 未拦截过
await self.cache_client.set(chat_id, uid)
return
self.log_user(update, logger.info, "群组 %s[%s] 拦截了未绑定用户触发命令", chat.title, chat.id)
raise ApplicationHandlerStop
async def check_permission(self, chat_id: int, user_id: int, context: "ContextTypes.DEFAULT_TYPE") -> bool:
if await self.user_admin_service.is_admin(user_id):
return True
admins = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id)
return ChatAdministrators.is_admin(admins, user_id)
async def reply_and_delete(self, message: "Message", text: str):
reply = await message.reply_text(text)
self.add_delete_message_job(message)
self.add_delete_message_job(reply)
@handler.command("ignore_unbound_user", filters=filters.ChatType.SUPERGROUP | filters.ChatType.GROUP, block=False)
async def ignore_unbound_user(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
user_id = await self.get_real_user_id(update)
message = update.effective_message
chat_id = update.effective_chat.id
if not await self.check_permission(chat_id, user_id, context):
await self.reply_and_delete(message, "您没有权限执行此操作")
return
self.log_user(update, logger.info, "更改群组 未绑定用户触发命令 功能状态")
group = await self.group_service.get_group_by_id(chat_id)
if not group:
await self.reply_and_delete(message, "群组信息出现错误,请尝试重新添加机器人到群组")
return
group.is_ignore = not group.is_ignore
await self.group_service.update_group(group)
if group.is_ignore:
text = IGNORE_UNBOUND_USER_OPEN.format(context.bot.username)
else:
text = IGNORE_UNBOUND_USER_CLOSE
await self.cache_client.remove_all(chat_id)
await message.reply_text(text)