添加用于装饰在指定函数防止过度调用的类装饰器

This commit is contained in:
洛水.山岭居室 2022-05-20 18:23:15 +08:00
parent 3871831881
commit 4ae2cccfec
3 changed files with 63 additions and 38 deletions

View File

@ -1,6 +1,9 @@
import datetime
import time
from typing import Callable, Optional
from telegram import Update
from telegram.constants import ChatType
from telegram.error import BadRequest
from telegram.ext import CallbackContext, ConversationHandler, filters
from telegram.ext._utils.types import HandlerCallback, CCT, RT
@ -9,6 +12,62 @@ from logger import Log
from service import BaseService
class RestrictsCalls(object):
"""
用于装饰在指定函数防止恶意调用的类装饰器
"""
_user_time = {}
_filters_chat = {}
_return_data = {}
_try_delete_message = {}
def __init__(self, filters_chat: ChatType = filters.ChatType.GROUPS, return_data=None,
try_delete_message: bool = False):
self.filters_chat = filters_chat
self.return_data = return_data
self.try_delete_message = try_delete_message
def __call__(self, func: Callable):
self._filters_chat[func] = self.filters_chat
self._return_data[func] = self.return_data
self._try_delete_message[func] = self.try_delete_message
async def decorator(*args, **kwargs):
update: Optional[Update] = None
for arg in args:
if isinstance(arg, Update):
update = arg
if update is None:
return await func(*args, **kwargs)
message = update.message
user = update.effective_user
_filters = self._filters_chat.get(func)
if _filters is None:
raise KeyError("RestrictsCalls: filters not find")
_return_data = self._return_data.get(func)
_try_delete_message = self._return_data.get(func, False)
if _filters.filter(message):
try:
command_time = self._user_time.get(f"{user.id}")
if command_time is None:
self._user_time[f"{user.id}"] = time.time()
else:
if time.time() - command_time <= 10:
if _try_delete_message:
try:
await message.delete()
except BadRequest as error:
Log.warning("删除消息失败", error)
return _return_data
else:
self._user_time[f"{user.id}"] = time.time()
except (ValueError, KeyError) as error:
Log.error("操作失败", error)
return await func(*args, **kwargs)
return decorator
class BasePlugins:
def __init__(self, service: BaseService):
self.service = service

View File

@ -7,7 +7,7 @@ from telegram.error import BadRequest
from telegram.ext import CallbackContext, ConversationHandler, filters
from logger import Log
from plugins.base import BasePlugins
from plugins.base import BasePlugins, RestrictsCalls
from service import BaseService
from pyppeteer import launch
from metadata.metadata import metadat
@ -28,28 +28,11 @@ class Gacha(BasePlugins):
CHECK_SERVER, COMMAND_RESULT = range(10600, 10602)
@RestrictsCalls(return_data=ConversationHandler.END, try_delete_message=True)
async def command_start(self, update: Update, context: CallbackContext) -> None:
message = update.message
user = update.effective_user
Log.info(f"用户 {user.full_name}[{user.id}] 抽卡模拟器命令请求")
if filters.ChatType.GROUPS.filter(message):
try:
command_time = self.user_time.get(f"{user.id}")
if command_time is None:
self.user_time[f"{user.id}"] = time.time()
else:
if time.time() - command_time <= 10:
try:
await message.delete()
except BadRequest as error:
Log.warning("删除消息失败", error)
pass
return
else:
self.user_time[f"{user.id}"] = time.time()
except (ValueError, KeyError) as error:
Log.error("quiz模块 user_time 操作失败", error)
pass
args = message.text.split(" ")
gacha_name = "角色活动"
if len(args) > 1:

View File

@ -7,12 +7,11 @@ from numpy.random import MT19937, Generator
from redis import DataError, ResponseError
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, Poll, \
ReplyKeyboardRemove, Message
from telegram.error import BadRequest
from telegram.ext import CallbackContext, filters, ConversationHandler
from telegram.helpers import escape_markdown
from logger import Log
from plugins.base import BasePlugins
from plugins.base import BasePlugins, RestrictsCalls
from service import BaseService
from service.base import QuestionData, AnswerData
@ -69,6 +68,7 @@ class Quiz(BasePlugins):
correct_option_id=index, is_anonymous=False,
open_period=self.time_out, type=Poll.QUIZ)
@RestrictsCalls(return_data=ConversationHandler.END, try_delete_message=True)
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
message = update.message
@ -92,23 +92,6 @@ class Quiz(BasePlugins):
else:
await self.send_poll(update)
elif filters.ChatType.GROUPS.filter(update.message):
try:
command_time = self.user_time.get(f"{user.id}")
if command_time is None:
self.user_time[f"{user.id}"] = time.time()
else:
if time.time() - command_time <= 10:
try:
await message.delete()
except BadRequest as error:
Log.warning("删除消息失败", error)
pass
return ConversationHandler.END
else:
self.user_time[f"{user.id}"] = time.time()
except (ValueError, KeyError) as error:
Log.error("quiz模块 user_time 操作失败", error)
pass
poll_message = await self.send_poll(update)
if poll_message is None:
return ConversationHandler.END