PaiGram/plugins/genshin/map.py

196 lines
8.9 KiB
Python
Raw Normal View History

2023-02-19 10:12:47 +00:00
from io import BytesIO
from typing import Union, Optional, List, Tuple
from telegram import Update, Message, InputMediaDocument, InputMediaPhoto, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
2023-03-16 15:00:04 +00:00
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackContext
2023-02-19 10:12:47 +00:00
from core.config import config
from core.dependence.redisdb import RedisDB
2023-03-16 15:00:04 +00:00
from core.handler.callbackqueryhandler import CallbackQueryHandler
2023-02-19 10:12:47 +00:00
from core.plugin import handler, Plugin
from modules.apihelper.client.components.map import MapHelper, MapException
from utils.log import logger
class Map(Plugin):
2023-02-19 10:12:47 +00:00
"""资源点查询"""
def __init__(self, redis: RedisDB):
2023-02-19 10:12:47 +00:00
self.cache = redis.client
self.cache_photo_key = "plugin:map:photo:"
self.cache_doc_key = "plugin:map:doc:"
self.map_helper = MapHelper()
self.temp_photo_path = "resources/img/map.png"
self.temp_photo = None
async def get_photo_cache(self, map_id: Union[str, int], name: str) -> Optional[str]:
if file_id := await self.cache.get(f"{self.cache_photo_key}{map_id}:{name}"):
return file_id.decode("utf-8")
return None
async def get_doc_cache(self, map_id: Union[str, int], name: str) -> Optional[str]:
if file_id := await self.cache.get(f"{self.cache_doc_key}{map_id}:{name}"):
return file_id.decode("utf-8")
return None
async def set_photo_cache(self, map_id: Union[str, int], name: str, file_id: str) -> None:
await self.cache.set(f"{self.cache_photo_key}{map_id}:{name}", file_id)
async def set_doc_cache(self, map_id: Union[str, int], name: str, file_id: str) -> None:
await self.cache.set(f"{self.cache_doc_key}{map_id}:{name}", file_id)
async def clear_cache(self) -> None:
for i in await self.cache.keys(f"{self.cache_photo_key}*"):
await self.cache.delete(i)
for i in await self.cache.keys(f"{self.cache_doc_key}*"):
await self.cache.delete(i)
async def edit_media(self, message: Message, map_id: str, name: str) -> None:
caption = self.gen_caption(map_id, name)
if cache := await self.get_photo_cache(map_id, name):
media = InputMediaPhoto(media=cache, caption=caption)
await message.edit_media(media)
return
if cache := await self.get_doc_cache(map_id, name):
media = InputMediaDocument(media=cache, caption=caption)
await message.edit_media(media)
return
data = await self.map_helper.get_map(map_id, name)
if len(data) > (1024 * 1024):
data = BytesIO(data)
data.name = "map.jpg"
media = InputMediaDocument(media=data, caption=caption)
msg = await message.edit_media(media)
await self.set_doc_cache(map_id, name, msg.document.file_id)
else:
media = InputMediaPhoto(media=data, caption=caption)
msg = await message.edit_media(media)
await self.set_photo_cache(map_id, name, msg.photo[0].file_id)
def get_show_map(self, name: str) -> List[int]:
return [
idx
for idx, map_id in enumerate(self.map_helper.MAP_ID_LIST)
if self.map_helper.get_label_count(map_id, name) > 0
]
async def gen_map_button(
self, maps: List[int], user_id: Union[str, int], name: str
) -> List[List[InlineKeyboardButton]]:
return [
[
InlineKeyboardButton(
self.map_helper.MAP_NAME_LIST[idx],
callback_data=f"get_map|{user_id}|{self.map_helper.MAP_ID_LIST[idx]}|{name}",
)
for idx in maps
]
]
async def send_media(self, message: Message, map_id: Union[str, int], name: str) -> None:
caption = self.gen_caption(map_id, name)
if cache := await self.get_photo_cache(map_id, name):
await message.reply_photo(photo=cache, caption=caption)
return
if cache := await self.get_doc_cache(map_id, name):
await message.reply_document(document=cache, caption=caption)
return
try:
data = await self.map_helper.get_map(map_id, name)
except MapException as e:
await message.reply_text(e.message)
return
if len(data) > (1024 * 1024):
data = BytesIO(data)
data.name = "map.jpg"
msg = await message.reply_document(document=data, caption=caption)
await self.set_doc_cache(map_id, name, msg.document.file_id)
else:
msg = await message.reply_photo(photo=data, caption=caption)
await self.set_photo_cache(map_id, name, msg.photo[0].file_id)
def gen_caption(self, map_id: Union[int, str], name: str) -> str:
count = self.map_helper.get_label_count(map_id, name)
return f"派蒙一共找到了 {name}{count} 个位置点\n* 数据来源于米游社wiki"
@handler(CommandHandler, command="map", block=False)
@handler(MessageHandler, filters=filters.Regex("^(?P<name>.*)(在哪里|在哪|哪里有|哪儿有|哪有|在哪儿)$"), block=False)
@handler(MessageHandler, filters=filters.Regex("^(哪里有|哪儿有|哪有)(?P<name>.*)$"), block=False)
async def command_start(self, update: Update, context: CallbackContext):
message = update.effective_message
args = context.args
group_dict = context.match and context.match.groupdict()
user = update.effective_user
resource_name = None
await message.reply_chat_action(ChatAction.TYPING)
if args and len(args) >= 1:
resource_name = args[0]
elif group_dict:
resource_name = group_dict.get("name", None)
if not resource_name:
if group_dict:
return
await message.reply_text("请指定要查找的资源名称。", parse_mode="Markdown")
return
logger.info("用户: %s [%s] 使用 map 命令查询了 %s", user.username, user.id, resource_name)
if resource_name not in self.map_helper.query_map:
2023-02-22 06:38:28 +00:00
# 消息来源于群组中并且无法找到默认不回复即可
if filters.ChatType.GROUPS.filter(message) and group_dict is not None:
return
2023-02-19 10:12:47 +00:00
await message.reply_text("没有找到该资源。", parse_mode="Markdown")
return
maps = self.get_show_map(resource_name)
if len(maps) == 0:
2023-02-22 06:38:28 +00:00
if filters.ChatType.GROUPS.filter(message) and group_dict is not None:
return
2023-02-19 10:12:47 +00:00
await message.reply_text("没有找到该资源。", parse_mode="Markdown")
return
if len(maps) == 1:
map_id = self.map_helper.MAP_ID_LIST[maps[0]]
await self.send_media(message, map_id, resource_name)
return
buttons = await self.gen_map_button(maps, user.id, resource_name)
if isinstance(self.temp_photo, str):
photo = self.temp_photo
else:
photo = open(self.temp_photo_path, "rb")
reply_message = await message.reply_photo(
photo=photo, caption="请选择你要查询的地图", reply_markup=InlineKeyboardMarkup(buttons)
)
if reply_message.photo:
self.temp_photo = reply_message.photo[-1].file_id
@handler(CallbackQueryHandler, pattern=r"^get_map\|", block=False)
async def get_maps(self, update: Update, _: CallbackContext) -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_map_callback(callback_query_data: str) -> Tuple[int, str, str]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_map_id = _data[2]
_name = _data[3]
logger.debug("callback_query_data 函数返回 user_id[%s] map_id[%s] name[%s]", _user_id, _map_id, _name)
return _user_id, _map_id, _name
user_id, map_id, name = await get_map_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
await callback_query.answer(text="正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
try:
await self.edit_media(message, map_id, name)
except MapException as e:
await message.reply_text(e.message)
@handler.command("refresh_map", admin=True)
2023-02-19 10:12:47 +00:00
async def refresh_map(self, update: Update, _: CallbackContext):
message = update.effective_message
msg = await message.reply_text("正在刷新地图数据,请耐心等待...")
await self.map_helper.refresh_query_map()
await self.map_helper.refresh_label_count()
await self.clear_cache()
await msg.edit_text("正在刷新地图数据,请耐心等待...\n刷新成功")