Support inline use data

This commit is contained in:
xtaodada 2024-06-18 23:59:34 +08:00
parent c478924e06
commit 9bce0f921f
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
5 changed files with 126 additions and 14 deletions

View File

@ -145,6 +145,8 @@ class ApplicationConfig(Settings):
channels: List[int] = [] channels: List[int] = []
"""文章推送群组""" """文章推送群组"""
channels_helper: Optional[int] = None
"""消息帮助频道"""
verify_groups: Set[int] = set() verify_groups: Set[int] = set()
"""启用群验证功能的群组""" """启用群验证功能的群组"""

View File

@ -6,6 +6,7 @@ from .get_chat import GetChat
from .get_real_uid_or_offset import GetRealUidOrOffset from .get_real_uid_or_offset import GetRealUidOrOffset
from .get_real_user_id import GetRealUserId from .get_real_user_id import GetRealUserId
from .get_real_user_name import GetRealUserName from .get_real_user_name import GetRealUserName
from .inline_use_data import InlineUseData
from .log_user import LogUser from .log_user import LogUser
from .migrate_data import MigrateData from .migrate_data import MigrateData
@ -19,6 +20,7 @@ class PluginFuncMethods(
GetRealUidOrOffset, GetRealUidOrOffset,
GetRealUserId, GetRealUserId,
GetRealUserName, GetRealUserName,
InlineUseData,
LogUser, LogUser,
MigrateData, MigrateData,
): ):

View File

@ -7,20 +7,30 @@ if TYPE_CHECKING:
REGEX = r"@(\d{10})|@(\d{9})|@(\d)" REGEX = r"@(\d{10})|@(\d{9})|@(\d)"
def get_real_uid_or_offset_by_text(text: str) -> Tuple[Optional[int], Optional[int]]:
if matches := re.findall(REGEX, text):
if numbers := [int(num) for match in matches for num in match if num != ""]:
if 1 < numbers[0] < 10:
return None, numbers[0] - 1
elif numbers[0] in [0, 1]:
return None, None
return numbers[0], None
return None, None
class GetRealUidOrOffset: class GetRealUidOrOffset:
@staticmethod @staticmethod
def get_real_uid_or_offset(update: "Update") -> Tuple[Optional[int], Optional[int]]: def get_real_uid_or_offset(update: "Update") -> Tuple[Optional[int], Optional[int]]:
message = update.effective_message message = update.effective_message
ilq = update.inline_query
text = None
if not message: if not message:
return None, None if ilq:
text = message.text or message.caption text = ilq.query
else:
return None, None
else:
text = message.text or message.caption
if not text: if not text:
return None, None return None, None
if matches := re.findall(REGEX, text): return get_real_uid_or_offset_by_text(text)
if numbers := [int(num) for match in matches for num in match if num != ""]:
if 1 < numbers[0] < 10:
return None, numbers[0] - 1
elif numbers[0] in [0, 1]:
return None, None
return numbers[0], None
return None, None

View File

@ -0,0 +1,49 @@
from dataclasses import dataclass
from typing import Optional, List
from telegram.ext import ContextTypes
from telegram.ext._utils.types import HandlerCallback, UT, CCT, RT
@dataclass
class IInlineUseData:
"""Inline 使用数据"""
text: str
hash: str
callback: HandlerCallback[UT, CCT, RT]
cookie: bool = False
player: bool = False
UID_KEY: str = "inline_uid"
def is_show(self, has_cookie: bool, has_player: bool) -> bool:
"""是否显示"""
if self.cookie and not has_cookie:
return False
if self.player and not has_player:
return False
return True
def get_button_callback_data(self, start: str) -> str:
"""获取按钮数据"""
return f"{start}|{self.hash}"
@staticmethod
def set_uid_to_context(context: "ContextTypes.DEFAULT_TYPE", uid: int) -> None:
"""设置 UID 到 Update"""
user_data = context.user_data
user_data[IInlineUseData.UID_KEY] = uid
@staticmethod
def get_uid_from_context(context: "ContextTypes.DEFAULT_TYPE") -> int:
"""从 Update 中获取 UID"""
user_data = context.user_data
if not user_data:
return 0
return user_data.get(IInlineUseData.UID_KEY, 0)
class InlineUseData:
async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]:
"""获取 Inline 使用数据"""
return []

View File

@ -1,9 +1,10 @@
from enum import Enum from enum import Enum
from typing import List, Optional, Union from typing import List, Optional, Union
from telegram import InputMediaDocument, InputMediaPhoto, Message from telegram import InputMediaDocument, InputMediaPhoto, Message, CallbackQuery, Bot
from telegram.error import BadRequest from telegram.error import BadRequest
from gram_core.config import config
from gram_core.services.template.cache import HtmlToFileIdCache from gram_core.services.template.cache import HtmlToFileIdCache
from gram_core.services.template.error import ErrorFileType, FileIdNotFound from gram_core.services.template.error import ErrorFileType, FileIdNotFound
@ -106,10 +107,51 @@ class RenderResult:
return edit_media return edit_media
async def cache_file_id(self, reply: Message): async def send_photo_to_helper_channel(self, bot: "Bot", filename: Optional[str] = None):
"""缓存 telegram 返回的 file_id""" try:
if self.file_type == FileType.PHOTO:
reply = await bot.send_photo(config.channels_helper, photo=self.photo)
elif self.file_type == FileType.DOCUMENT:
reply = await bot.send_document(config.channels_helper, document=self.photo, filename=filename)
else:
return
except BadRequest as exc:
if "Wrong file identifier" in exc.message and isinstance(self.photo, str):
await self._cache.delete_data(self.html, self.file_type.name)
raise BadRequest(message="Wrong file identifier specified")
raise exc
await self.cache_file_id(reply)
return reply
async def edit_inline_media(self, callback_query: CallbackQuery, filename: str = None, *args, **kwargs) -> bool:
"""是 `message.edit_media` 的封装,上传成功后,缓存 telegram 返回的 file_id方便重复使用"""
bot = callback_query.get_bot()
reply = await self.send_photo_to_helper_channel(bot, filename)
file_id = self.get_file_id(reply)
if self.file_type == FileType.DOCUMENT:
media = InputMediaDocument(
media=file_id, caption=self.caption, parse_mode=self.parse_mode, filename=self.filename
)
else:
media = InputMediaPhoto(
media=file_id, caption=self.caption, parse_mode=self.parse_mode, filename=self.filename
)
try:
return await callback_query.edit_message_media(media, *args, **kwargs)
except BadRequest as exc:
if "Wrong file identifier" in exc.message and isinstance(self.photo, str):
await self._cache.delete_data(self.html, self.file_type.name)
raise BadRequest(message="Wrong file identifier specified")
raise exc
def get_file_id(self, reply: Message) -> str:
if self.is_file_id(): if self.is_file_id():
return return self.photo
if self.file_type == FileType.PHOTO and reply.photo: if self.file_type == FileType.PHOTO and reply.photo:
file_id = reply.photo[0].file_id file_id = reply.photo[0].file_id
@ -117,6 +159,13 @@ class RenderResult:
file_id = reply.document.file_id file_id = reply.document.file_id
else: else:
raise FileIdNotFound raise FileIdNotFound
return file_id
async def cache_file_id(self, reply: Message):
"""缓存 telegram 返回的 file_id"""
if self.is_file_id():
return
file_id = self.get_file_id(reply)
await self._cache.set_data(self.html, self.file_type.name, file_id, self.ttl) await self._cache.set_data(self.html, self.file_type.name, file_id, self.ttl)
def is_file_id(self) -> bool: def is_file_id(self) -> bool: