mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-21 13:48:19 +00:00
130 lines
5.4 KiB
Python
130 lines
5.4 KiB
Python
from typing import TYPE_CHECKING
|
|
|
|
from telegram.constants import ChatType
|
|
from telegram.ext import ApplicationHandlerStop, filters
|
|
|
|
from gram_core.dependence.redisdb import RedisDB
|
|
from gram_core.plugin import Plugin, handler, HandlerData
|
|
from gram_core.services.groups.services import GroupService
|
|
from gram_core.services.players import PlayersService
|
|
from gram_core.services.users.services import UserAdminService
|
|
from plugins.tools.chat_administrators import ChatAdministrators
|
|
from utils.log import logger
|
|
|
|
if TYPE_CHECKING:
|
|
from telegram import Update, Message
|
|
from telegram.ext import ContextTypes
|
|
|
|
IGNORE_UNBOUND_USER_OPEN = """成功开启 忽略未绑定用户触发部分命令 功能,彦卿将不会响应未绑定用户的部分命令
|
|
|
|
- 此功能开启后,将会导致新用户将只会出现一次绑定账号提醒,推荐在群置顶中注明 绑定链接 https://t.me/{}?start=setcookies 或者其他使用说明。
|
|
"""
|
|
IGNORE_UNBOUND_USER_CLOSE = """成功关闭 忽略未绑定用户触发部分命令 功能,彦卿将开始响应未绑定用户的部分命令"""
|
|
|
|
|
|
class IgnoreUnboundUserCache:
|
|
def __init__(self, redis: RedisDB):
|
|
self.client = redis.client
|
|
self.qname = "plugins:ignore:"
|
|
|
|
def get_key(self, chat_id: int) -> str:
|
|
return f"{self.qname}{chat_id}"
|
|
|
|
async def ismember(self, chat_id: int, user_id: int) -> bool:
|
|
qname = self.get_key(chat_id)
|
|
return await self.client.sismember(qname, user_id)
|
|
|
|
async def set(self, chat_id: int, user_id: int) -> bool:
|
|
qname = self.get_key(chat_id)
|
|
return await self.client.sadd(qname, user_id)
|
|
|
|
async def remove_all(self, chat_id: int) -> bool:
|
|
qname = self.get_key(chat_id)
|
|
return await self.client.delete(qname)
|
|
|
|
|
|
class IgnoreUnboundUser(Plugin):
|
|
def __init__(
|
|
self,
|
|
players_service: PlayersService,
|
|
group_service: GroupService,
|
|
user_admin_service: UserAdminService,
|
|
redis: RedisDB,
|
|
):
|
|
self.players_service = players_service
|
|
self.group_service = group_service
|
|
self.user_admin_service = user_admin_service
|
|
self.cache = redis.client
|
|
self.cache_client = IgnoreUnboundUserCache(redis)
|
|
|
|
async def initialize(self) -> None:
|
|
self.application.run_preprocessor(self.check_update)
|
|
|
|
async def check_account(self, user_id: int) -> bool:
|
|
return bool(await self.players_service.get_player(user_id))
|
|
|
|
async def check_group(self, group_id: int) -> bool:
|
|
return await self.group_service.is_ignore(group_id)
|
|
|
|
async def check_update(self, update: "Update", _, __, context: "ContextTypes.DEFAULT_TYPE", data: "HandlerData"):
|
|
if not isinstance(data, HandlerData):
|
|
return
|
|
if not data.player:
|
|
return
|
|
chat = update.effective_chat
|
|
if (not chat) or chat.type not in [ChatType.SUPERGROUP, ChatType.GROUP]:
|
|
return
|
|
chat_id = chat.id
|
|
if not await self.check_group(chat_id):
|
|
# 未开启此功能
|
|
return
|
|
message = update.effective_message
|
|
if message:
|
|
text = message.text or message.caption
|
|
if text and context.bot.username in text:
|
|
# 机器人被提及
|
|
return
|
|
uid = await self.get_real_user_id(update)
|
|
if await self.check_account(uid):
|
|
# 已绑定账号
|
|
return
|
|
if not await self.cache_client.ismember(chat_id, uid):
|
|
# 未拦截过
|
|
await self.cache_client.set(chat_id, uid)
|
|
return
|
|
self.log_user(update, logger.info, "群组 %s[%s] 拦截了未绑定用户触发命令", chat.title, chat.id)
|
|
raise ApplicationHandlerStop
|
|
|
|
async def check_permission(self, chat_id: int, user_id: int, context: "ContextTypes.DEFAULT_TYPE") -> bool:
|
|
if await self.user_admin_service.is_admin(user_id):
|
|
return True
|
|
admins = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id)
|
|
return ChatAdministrators.is_admin(admins, user_id)
|
|
|
|
async def reply_and_delete(self, message: "Message", text: str):
|
|
reply = await message.reply_text(text)
|
|
self.add_delete_message_job(message)
|
|
self.add_delete_message_job(reply)
|
|
|
|
@handler.command("ignore_unbound_user", filters=filters.ChatType.SUPERGROUP | filters.ChatType.GROUP, block=False)
|
|
async def ignore_unbound_user(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
|
|
user_id = await self.get_real_user_id(update)
|
|
message = update.effective_message
|
|
chat_id = update.effective_chat.id
|
|
if not await self.check_permission(chat_id, user_id, context):
|
|
await self.reply_and_delete(message, "您没有权限执行此操作")
|
|
return
|
|
self.log_user(update, logger.info, "更改群组 未绑定用户触发命令 功能状态")
|
|
group = await self.group_service.get_group_by_id(chat_id)
|
|
if not group:
|
|
await self.reply_and_delete(message, "群组信息出现错误,请尝试重新添加机器人到群组")
|
|
return
|
|
group.is_ignore = not group.is_ignore
|
|
await self.group_service.update_group(group)
|
|
if group.is_ignore:
|
|
text = IGNORE_UNBOUND_USER_OPEN.format(context.bot.username)
|
|
else:
|
|
text = IGNORE_UNBOUND_USER_CLOSE
|
|
await self.cache_client.remove_all(chat_id)
|
|
await message.reply_text(text)
|