Support Ignore unbound user command in group

This commit is contained in:
omg-xtao 2024-03-25 20:48:19 +08:00 committed by GitHub
parent 3d3e8bf6a1
commit 76cf36fb67
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 205 additions and 40 deletions

View File

@ -0,0 +1,28 @@
"""groups_ignore
Revision ID: 369fb74daad9
Revises: cb37027ecae8
Create Date: 2024-03-25 17:29:35.378726
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "369fb74daad9"
down_revision = "cb37027ecae8"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("groups", sa.Column("is_ignore", sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("groups", "is_ignore")
# ### end Alembic commands ###

View File

@ -0,0 +1,3 @@
from gram_core.handler.hookhandler import HookHandler
__all__ = ("HookHandler",)

View File

@ -1,6 +1,13 @@
"""插件""" """插件"""
from gram_core.plugin._handler import conversation, error_handler, handler from gram_core.plugin._handler import (
conversation,
error_handler,
handler,
ConversationDataType,
ConversationData,
HandlerData,
)
from gram_core.plugin._job import TimeType, job from gram_core.plugin._job import TimeType, job
from gram_core.plugin._plugin import Plugin, PluginType, get_all_plugins from gram_core.plugin._plugin import Plugin, PluginType, get_all_plugins
@ -11,6 +18,9 @@ __all__ = (
"handler", "handler",
"error_handler", "error_handler",
"conversation", "conversation",
"ConversationDataType",
"ConversationData",
"HandlerData",
"job", "job",
"TimeType", "TimeType",
) )

@ -1 +1 @@
Subproject commit 3057fba7a6e84be60ece1ab98e6cd012aba37e78 Subproject commit 4c2eac29b1529ddffc94f9e22f6735b6d213a4bb

View File

@ -154,8 +154,8 @@ class AvatarListPlugin(Plugin):
name_card = (await self.assets_service.namecard(210001).navbar()).as_uri() name_card = (await self.assets_service.namecard(210001).navbar()).as_uri()
return name_card, avatar, nickname, rarity return name_card, avatar, nickname, rarity
@handler.command("avatars", block=False) @handler.command("avatars", cookie=True, block=False)
@handler.message(filters.Regex(r"^(全部)?练度统计$"), block=False) @handler.message(filters.Regex(r"^(全部)?练度统计$"), cookie=True, block=False)
async def avatar_list(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"): async def avatar_list(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)
user_name = self.get_real_user_name(update) user_name = self.get_real_user_name(update)

View File

@ -100,8 +100,8 @@ class DailyNotePlugin(Plugin):
[[InlineKeyboardButton(">> 设置状态提醒 <<", url=create_deep_linked_url(bot_username, "daily_note_tasks"))]] [[InlineKeyboardButton(">> 设置状态提醒 <<", url=create_deep_linked_url(bot_username, "daily_note_tasks"))]]
) )
@handler.command("dailynote", block=False) @handler.command("dailynote", cookie=True, block=False)
@handler.message(filters.Regex("^当前状态(.*)"), block=False) @handler.message(filters.Regex("^当前状态(.*)"), cookie=True, block=False)
async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> Optional[int]: async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> Optional[int]:
message = update.effective_message message = update.effective_message
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)

View File

@ -72,7 +72,7 @@ class LedgerPlugin(Plugin):
) )
return render_result return render_result
@handler.command(command="ledger", block=False) @handler.command(command="ledger", cookie=True, block=False)
@handler.message(filters=filters.Regex("^旅行札记查询(.*)"), block=False) @handler.message(filters=filters.Regex("^旅行札记查询(.*)"), block=False)
async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)

View File

@ -158,9 +158,9 @@ class PlayerCards(Plugin):
uid = player_info.player_id uid = player_info.player_id
return uid, ch_name return uid, ch_name
@handler.command(command="player_card", block=False) @handler.command(command="player_card", player=True, block=False)
@handler.command(command="player_cards", block=False) @handler.command(command="player_cards", player=True, block=False)
@handler.message(filters=filters.Regex("^角色卡片查询(.*)"), block=False) @handler.message(filters=filters.Regex("^角色卡片查询(.*)"), player=True, block=False)
async def player_cards(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: async def player_cards(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)
message = update.effective_message message = update.effective_message

View File

@ -40,8 +40,8 @@ class Redeem(Plugin):
msg = e.message msg = e.message
return msg return msg
@handler.command(command="redeem", block=False) @handler.command(command="redeem", cookie=True, block=False)
@handler.message(filters=filters.Regex("^兑换码兑换(.*)"), block=False) @handler.message(filters=filters.Regex("^兑换码兑换(.*)"), cookie=True, block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None: async def command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)
message = update.effective_message message = update.effective_message

View File

@ -66,8 +66,8 @@ class Sign(Plugin):
await self.sign_service.add(user) await self.sign_service.add(user)
return "开启自动签到成功" return "开启自动签到成功"
@handler.command(command="sign", block=False) @handler.command(command="sign", cookie=True, block=False)
@handler.message(filters=filters.Regex("^每日签到(.*)"), block=False) @handler.message(filters=filters.Regex("^每日签到(.*)"), cookie=True, block=False)
@handler.command(command="start", filters=filters.Regex("sign$"), block=False) @handler.command(command="start", filters=filters.Regex("sign$"), block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None: async def command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)

View File

@ -27,8 +27,8 @@ class PlayerStatsPlugins(Plugin):
self.template_service = template self.template_service = template
self.helper = helper self.helper = helper
@handler.command("stats", block=False) @handler.command("stats", player=True, block=False)
@handler.message(filters.Regex("^玩家统计查询(.*)"), block=False) @handler.message(filters.Regex("^玩家统计查询(.*)"), player=True, block=False)
async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> Optional[int]: async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> Optional[int]:
user_id = await self.get_real_user_id(update) user_id = await self.get_real_user_id(update)
message = update.effective_message message = update.effective_message

View File

@ -1,7 +1,7 @@
import asyncio import asyncio
import random import random
import time import time
from typing import Tuple, Union, Optional, TYPE_CHECKING, List from typing import Tuple, Union, Optional, TYPE_CHECKING
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, ChatMember, Message, User from telegram import InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, ChatMember, Message, User
from telegram.constants import ParseMode from telegram.constants import ParseMode
@ -15,6 +15,7 @@ from core.dependence.redisdb import RedisDB
from core.handler.callbackqueryhandler import CallbackQueryHandler from core.handler.callbackqueryhandler import CallbackQueryHandler
from core.plugin import Plugin, handler from core.plugin import Plugin, handler
from core.services.quiz.services import QuizService from core.services.quiz.services import QuizService
from plugins.tools.chat_administrators import ChatAdministrators
from utils.chatmember import extract_status_change from utils.chatmember import extract_status_change
from utils.log import logger from utils.log import logger
@ -63,25 +64,6 @@ class GroupCaptcha(Plugin):
return f"[{user_id}]({tg_link})" return f"[{user_id}]({tg_link})"
return f"[{escape_markdown(user_id, version=version)}]({tg_link})" return f"[{escape_markdown(user_id, version=version)}]({tg_link})"
async def get_chat_administrators(
self, context: "ContextTypes.DEFAULT_TYPE", chat_id: Union[str, int]
) -> Tuple[ChatMember]:
qname = f"plugin:group_captcha:chat_administrators:{chat_id}"
result: "List[bytes]" = await self.cache.lrange(qname, 0, -1)
if len(result) > 0:
return ChatMember.de_list([jsonlib.loads(str(_data, encoding="utf-8")) for _data in result], context.bot)
chat_administrators = await context.bot.get_chat_administrators(chat_id)
async with self.cache.pipeline(transaction=True) as pipe:
for chat_administrator in chat_administrators:
await pipe.lpush(qname, chat_administrator.to_json())
await pipe.expire(qname, self.ttl)
await pipe.execute()
return chat_administrators
@staticmethod
def is_admin(chat_administrators: Tuple[ChatMember], user_id: int) -> bool:
return any(admin.user.id == user_id for admin in chat_administrators)
async def kick_member_job(self, context: "ContextTypes.DEFAULT_TYPE"): async def kick_member_job(self, context: "ContextTypes.DEFAULT_TYPE"):
job = context.job job = context.job
logger.info("踢出用户 user_id[%s] 在 chat_id[%s]", job.user_id, job.chat_id) logger.info("踢出用户 user_id[%s] 在 chat_id[%s]", job.user_id, job.chat_id)
@ -152,8 +134,8 @@ class GroupCaptcha(Plugin):
message = callback_query.message message = callback_query.message
chat = message.chat chat = message.chat
logger.info("用户 %s[%s] 在群 %s[%s] 点击Auth管理员命令", user.full_name, user.id, chat.title, chat.id) logger.info("用户 %s[%s] 在群 %s[%s] 点击Auth管理员命令", user.full_name, user.id, chat.title, chat.id)
chat_administrators = await self.get_chat_administrators(context, chat_id=chat.id) chat_administrators = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id=chat.id)
if not self.is_admin(chat_administrators, user.id): if not ChatAdministrators.is_admin(chat_administrators, user.id):
logger.debug("用户 %s[%s] 在群 %s[%s] 非群管理", user.full_name, user.id, chat.title, chat.id) logger.debug("用户 %s[%s] 在群 %s[%s] 非群管理", user.full_name, user.id, chat.title, chat.id)
await callback_query.answer(text="你不是管理!\n" + self.user_mismatch, show_alert=True) await callback_query.answer(text="你不是管理!\n" + self.user_mismatch, show_alert=True)
return return
@ -350,8 +332,8 @@ class GroupCaptcha(Plugin):
logger.info("用户 %s[%s] 尝试加入群 %s[%s]", user.full_name, user.id, chat.title, chat.id) logger.info("用户 %s[%s] 尝试加入群 %s[%s]", user.full_name, user.id, chat.title, chat.id)
if user.is_bot: if user.is_bot:
return return
chat_administrators = await self.get_chat_administrators(context, chat_id=chat.id) chat_administrators = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id=chat.id)
if self.is_admin(chat_administrators, from_user.id): if ChatAdministrators.is_admin(chat_administrators, from_user.id):
await chat.send_message("派蒙检测到管理员邀请,自动放行了!") await chat.send_message("派蒙检测到管理员邀请,自动放行了!")
return return
question_id_list = await self.quiz_service.get_question_id_list() question_id_list = await self.quiz_service.get_question_id_list()

View File

@ -0,0 +1,101 @@
from typing import TYPE_CHECKING
from telegram.constants import ChatType
from telegram.ext import ApplicationHandlerStop, filters
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin import Plugin, handler, HandlerData
from gram_core.services.groups.services import GroupService
from gram_core.services.players import PlayersService
from gram_core.services.users.services import UserAdminService
from plugins.tools.chat_administrators import ChatAdministrators
from utils.log import logger
if TYPE_CHECKING:
from telegram import Update, Message
from telegram.ext import ContextTypes
IGNORE_UNBOUND_USER_OPEN = """成功开启 忽略未绑定用户触发部分命令 功能,派蒙将不会响应未绑定用户的部分命令
- 此功能开启后将会导致新用户无法快速绑定账号请在群规则中注明 绑定链接 https://t.me/{}?start=setcookies 或者其他使用说明
"""
IGNORE_UNBOUND_USER_CLOSE = """成功关闭 忽略未绑定用户触发部分命令 功能,派蒙将开始响应未绑定用户的部分命令"""
class IgnoreUnboundUser(Plugin):
def __init__(
self,
players_service: PlayersService,
group_service: GroupService,
user_admin_service: UserAdminService,
redis: RedisDB,
):
self.players_service = players_service
self.group_service = group_service
self.user_admin_service = user_admin_service
self.cache = redis.client
async def initialize(self) -> None:
self.application.run_preprocessor(self.check_update)
async def check_account(self, user_id: int) -> bool:
return bool(await self.players_service.get_player(user_id))
async def check_group(self, group_id: int) -> bool:
return await self.group_service.is_ignore(group_id)
async def check_update(self, update: "Update", _, __, context: "ContextTypes.DEFAULT_TYPE", data: "HandlerData"):
if not isinstance(data, HandlerData):
return
if not data.player:
return
chat = update.effective_chat
if (not chat) or chat.type not in [ChatType.SUPERGROUP, ChatType.GROUP]:
return
if not await self.check_group(chat.id):
# 未开启此功能
return
message = update.effective_message
if message:
text = message.text or message.caption
if text and context.bot.username in text:
# 机器人被提及
return
uid = await self.get_real_user_id(update)
if await self.check_account(uid):
# 已绑定账号
return
self.log_user(update, logger.info, "群组 %s[%s] 拦截了未绑定用户触发命令", chat.title, chat.id)
raise ApplicationHandlerStop
async def check_permission(self, chat_id: int, user_id: int, context: "ContextTypes.DEFAULT_TYPE") -> bool:
if await self.user_admin_service.is_admin(user_id):
return True
admins = await ChatAdministrators.get_chat_administrators(self.cache, context, chat_id)
return ChatAdministrators.is_admin(admins, user_id)
async def reply_and_delete(self, message: "Message", text: str):
reply = await message.reply_text(text)
self.add_delete_message_job(message)
self.add_delete_message_job(reply)
@handler.command("ignore_unbound_user", filters=filters.ChatType.SUPERGROUP | filters.ChatType.GROUP, block=False)
async def ignore_unbound_user(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
user_id = await self.get_real_user_id(update)
message = update.effective_message
chat_id = update.effective_chat.id
if not await self.check_permission(chat_id, user_id, context):
await self.reply_and_delete(message, "您没有权限执行此操作")
return
self.log_user(update, logger.info, "更改群组 未绑定用户触发命令 功能状态")
group = await self.group_service.get_group_by_id(chat_id)
if not group:
await self.reply_and_delete(message, "群组信息出现错误,请尝试重新添加机器人到群组")
return
group.is_ignore = not group.is_ignore
await self.group_service.update_group(group)
if group.is_ignore:
text = IGNORE_UNBOUND_USER_OPEN.format(context.bot.username)
else:
text = IGNORE_UNBOUND_USER_CLOSE
await message.reply_text(text)

View File

@ -0,0 +1,41 @@
from typing import Union, Tuple, TYPE_CHECKING, List, Any
from telegram import ChatMember
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
if TYPE_CHECKING:
from redis import Redis
from telegram.ext import ContextTypes
class ChatAdministrators:
QNAME = "plugin:group_captcha:chat_administrators"
TTL = 1 * 60 * 60
@staticmethod
async def get_chat_administrators(
cache: "Redis",
context: "ContextTypes.DEFAULT_TYPE",
chat_id: Union[str, int],
) -> Union[Tuple[ChatMember, ...], Any]:
qname = f"{ChatAdministrators.QNAME}:{chat_id}"
result: "List[bytes]" = await cache.lrange(qname, 0, -1)
if len(result) > 0:
return ChatMember.de_list([jsonlib.loads(str(_data, encoding="utf-8")) for _data in result], context.bot)
chat_administrators = await context.bot.get_chat_administrators(chat_id)
async with cache.pipeline(transaction=True) as pipe:
for chat_administrator in chat_administrators:
await pipe.lpush(qname, chat_administrator.to_json())
await pipe.expire(qname, ChatAdministrators.TTL)
await pipe.execute()
return chat_administrators
@staticmethod
def is_admin(chat_administrators: Tuple[ChatMember], user_id: int) -> bool:
return any(admin.user.id == user_id for admin in chat_administrators)