Support starrail relics

This commit is contained in:
xtaodada 2023-05-22 22:32:37 +08:00
parent d35a88ef11
commit f7bb19a646
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
9 changed files with 135 additions and 13 deletions

View File

@ -3,7 +3,7 @@ from typing import List
from core.base_service import BaseService from core.base_service import BaseService
from core.dependence.redisdb import RedisDB from core.dependence.redisdb import RedisDB
__all__ = ["GameCache", "GameCacheForStrategy", "GameCacheForMaterial", "GameCacheForLightCone"] __all__ = ["GameCache", "GameCacheForStrategy", "GameCacheForMaterial", "GameCacheForLightCone", "GameCacheForRelics"]
class GameCache: class GameCache:
@ -44,3 +44,7 @@ class GameCacheForMaterial(BaseService.Component, GameCache):
class GameCacheForLightCone(BaseService.Component, GameCache): class GameCacheForLightCone(BaseService.Component, GameCache):
qname = "game:lightcone" qname = "game:lightcone"
class GameCacheForRelics(BaseService.Component, GameCache):
qname = "game:relics"

View File

@ -1,5 +1,10 @@
from core.base_service import BaseService from core.base_service import BaseService
from core.services.game.cache import GameCacheForStrategy, GameCacheForMaterial, GameCacheForLightCone from core.services.game.cache import (
GameCacheForStrategy,
GameCacheForMaterial,
GameCacheForLightCone,
GameCacheForRelics,
)
__all__ = "GameCacheService" __all__ = "GameCacheService"
@ -10,10 +15,12 @@ class GameCacheService(BaseService):
strategy_cache: GameCacheForStrategy, strategy_cache: GameCacheForStrategy,
material_cache: GameCacheForMaterial, material_cache: GameCacheForMaterial,
light_cone_cache: GameCacheForLightCone, light_cone_cache: GameCacheForLightCone,
relics_cache: GameCacheForRelics,
): ):
self.strategy_cache = strategy_cache self.strategy_cache = strategy_cache
self.material_cache = material_cache self.material_cache = material_cache
self.light_cone_cache = light_cone_cache self.light_cone_cache = light_cone_cache
self.relics_cache = relics_cache
async def get_strategy_cache(self, character_name: str) -> str: async def get_strategy_cache(self, character_name: str) -> str:
cache = await self.strategy_cache.get_file(character_name) cache = await self.strategy_cache.get_file(character_name)
@ -38,3 +45,11 @@ class GameCacheService(BaseService):
async def set_light_cone_cache(self, light_cone_name: str, file: str) -> None: async def set_light_cone_cache(self, light_cone_name: str, file: str) -> None:
await self.light_cone_cache.set_file(light_cone_name, file) await self.light_cone_cache.set_file(light_cone_name, file)
async def get_relics_cache(self, relics_name: str) -> str:
cache = await self.relics_cache.get_file(relics_name)
if cache is not None:
return cache.decode("utf-8")
async def set_relics_cache(self, relics_name: str, file: str) -> None:
await self.relics_cache.set_file(relics_name, file)

View File

@ -13,6 +13,7 @@ __all__ = [
"lightConeToId", "lightConeToId",
"not_real_roles", "not_real_roles",
"roleToTag", "roleToTag",
"lightConeToTag",
] ]
# noinspection SpellCheckingInspection # noinspection SpellCheckingInspection

View File

@ -127,7 +127,7 @@ class PlayerCards:
try: try:
user = await self.client.get(self.url2 + uid, timeout=30, headers=self.headers) user = await self.client.get(self.url2 + uid, timeout=30, headers=self.headers)
if user.status_code != 200: if user.status_code != 200:
raise PlayerCardsError("请求异常,错误代码 %s" % user.status_code) raise PlayerCardsError("请求异常")
data = ujson.loads(user.text) data = ujson.loads(user.text)
characters = data.get("characters", []) characters = data.get("characters", [])
for character in characters: for character in characters:

View File

@ -9,22 +9,26 @@ class Raider(WikiModel):
raider_role_path = WikiModel.BASE_PATH / "raiders" / "role" raider_role_path = WikiModel.BASE_PATH / "raiders" / "role"
raider_light_cone_path = WikiModel.BASE_PATH / "raiders" / "light_cone" raider_light_cone_path = WikiModel.BASE_PATH / "raiders" / "light_cone"
raider_role_material_path = WikiModel.BASE_PATH / "raiders" / "role_material" raider_role_material_path = WikiModel.BASE_PATH / "raiders" / "role_material"
raider_relic_path = WikiModel.BASE_PATH / "raiders" / "relic"
raider_info_path = WikiModel.BASE_PATH / "raiders" / "path.json" raider_info_path = WikiModel.BASE_PATH / "raiders" / "path.json"
raider_role_path.mkdir(parents=True, exist_ok=True) raider_role_path.mkdir(parents=True, exist_ok=True)
raider_light_cone_path.mkdir(parents=True, exist_ok=True) raider_light_cone_path.mkdir(parents=True, exist_ok=True)
raider_role_material_path.mkdir(parents=True, exist_ok=True) raider_role_material_path.mkdir(parents=True, exist_ok=True)
name_map = {"role": "role", "lightcone": "light_cone", "material for role": "role_material"} raider_relic_path.mkdir(parents=True, exist_ok=True)
name_map = {"role": "role", "lightcone": "light_cone", "material for role": "role_material", "relic": "relic"}
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.all_role_raiders: List[str] = [] self.all_role_raiders: List[str] = []
self.all_light_cone_raiders: List[str] = [] self.all_light_cone_raiders: List[str] = []
self.all_role_material_raiders: List[str] = [] self.all_role_material_raiders: List[str] = []
self.all_relic_raiders: List[str] = []
def clear_class_data(self) -> None: def clear_class_data(self) -> None:
self.all_role_raiders.clear() self.all_role_raiders.clear()
self.all_light_cone_raiders.clear() self.all_light_cone_raiders.clear()
self.all_role_material_raiders.clear() self.all_role_material_raiders.clear()
self.all_relic_raiders.clear()
async def refresh_task(self, name: str, path: str = "", start: str = ""): async def refresh_task(self, name: str, path: str = "", start: str = ""):
photo = await self.remote_get(f"{self.raider_url}{path}") photo = await self.remote_get(f"{self.raider_url}{path}")
@ -47,8 +51,9 @@ class Raider(WikiModel):
if not self.raider_info_path.exists(): if not self.raider_info_path.exists():
await self.refresh() await self.refresh()
return return
datas: Dict[str, List] = await WikiModel.read(self.raider_info_path) datas: Dict[str, List] = await WikiModel.read(self.raider_info_path) # noqa
self.clear_class_data() self.clear_class_data()
self.all_role_raiders.extend(datas["role"]) self.all_role_raiders.extend(datas["role"])
self.all_light_cone_raiders.extend(datas["light_cone"]) self.all_light_cone_raiders.extend(datas["light_cone"])
self.all_role_material_raiders.extend(datas["role_material"]) self.all_role_material_raiders.extend(datas["role_material"])
self.all_relic_raiders.extend(datas["relic"])

View File

@ -36,6 +36,7 @@ class Inline(Plugin):
self.characters_list: List[Dict[str, str]] = [] self.characters_list: List[Dict[str, str]] = []
self.characters_material_list: List[Dict[str, str]] = [] self.characters_material_list: List[Dict[str, str]] = []
self.light_cone_list: List[Dict[str, str]] = [] self.light_cone_list: List[Dict[str, str]] = []
self.relics_list: List[Dict[str, str]] = []
self.refresh_task: List[Awaitable] = [] self.refresh_task: List[Awaitable] = []
self.search_service = search_service self.search_service = search_service
@ -53,6 +54,18 @@ class Inline(Plugin):
logger.warning(f"未找到光锥 {light_cone} 的图标inline 不显示此光锥") logger.warning(f"未找到光锥 {light_cone} 的图标inline 不显示此光锥")
logger.success("Inline 模块获取光锥列表完成") logger.success("Inline 模块获取光锥列表完成")
async def task_relics():
logger.info("Inline 模块正在获取遗器列表")
relics_datas: Dict[str, str] = {}
for relics in self.wiki_service.relic.all_relics:
relics_datas[relics.name] = relics.icon
for relics in self.wiki_service.raider.all_relic_raiders:
if relics in relics_datas:
self.relics_list.append({"name": relics, "icon": relics_datas[relics]})
else:
logger.warning(f"未找到遗器 {relics} 的图标inline 不显示此遗器")
logger.success("Inline 模块获取遗器列表完成")
async def task_characters(): async def task_characters():
logger.info("Inline 模块正在获取角色列表") logger.info("Inline 模块正在获取角色列表")
datas: Dict[str, str] = {} datas: Dict[str, str] = {}
@ -80,6 +93,7 @@ class Inline(Plugin):
self.refresh_task.append(asyncio.create_task(task_characters())) self.refresh_task.append(asyncio.create_task(task_characters()))
self.refresh_task.append(asyncio.create_task(task_light_cone())) self.refresh_task.append(asyncio.create_task(task_light_cone()))
self.refresh_task.append(asyncio.create_task(task_relics()))
@handler.inline_query(block=False) @handler.inline_query(block=False)
async def inline_query(self, update: Update, _: CallbackContext) -> None: async def inline_query(self, update: Update, _: CallbackContext) -> None:
@ -91,7 +105,11 @@ class Inline(Plugin):
results_list = [] results_list = []
args = query.split(" ") args = query.split(" ")
if args[0] == "": if args[0] == "":
temp_data = [("光锥图鉴查询", "输入光锥名称即可查询光锥图鉴"), ("角色攻略查询", "输入角色名即可查询角色攻略图鉴")] temp_data = [
("光锥图鉴查询", "输入光锥名称即可查询光锥图鉴"),
("角色攻略查询", "输入角色名即可查询角色攻略图鉴"),
("遗器套装查询", "输入遗器套装名称即可查询遗器套装图鉴"),
]
for i in temp_data: for i in temp_data:
results_list.append( results_list.append(
InlineQueryResultArticle( InlineQueryResultArticle(
@ -102,11 +120,12 @@ class Inline(Plugin):
) )
) )
else: else:
if args[0] in ["查看角色攻略列表并查询", "查看角色培养素材列表并查询", "查看光锥列表并查询"]: if args[0] in ["查看角色攻略列表并查询", "查看角色培养素材列表并查询", "查看光锥列表并查询", "查看遗器套装列表并查询"]:
temp_data = { temp_data = {
"查看角色攻略列表并查询": (self.characters_list, "角色攻略查询"), "查看角色攻略列表并查询": (self.characters_list, "角色攻略查询"),
"查看角色培养素材列表并查询": (self.characters_material_list, "角色培养素材查询"), "查看角色培养素材列表并查询": (self.characters_material_list, "角色培养素材查询"),
"查看光锥列表并查询": (self.light_cone_list, "光锥查询"), "查看光锥列表并查询": (self.light_cone_list, "光锥查询"),
"查看遗器套装列表并查询": (self.relics_list, "遗器套装查询"),
}[args[0]] }[args[0]]
for character in temp_data[0]: for character in temp_data[0]:
name = character["name"] name = character["name"]

View File

@ -111,10 +111,6 @@ class PlayerCards(Plugin):
else: else:
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons)) await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons))
return return
# 暂时只支持国服
if not (100000000 < uid < 200000000):
await message.reply_text("此功能暂时只支持国服")
return
data = await self._load_history(uid) data = await self._load_history(uid)
if data is None or len(data.AvatarList) == 0: if data is None or len(data.AvatarList) == 0:
if isinstance(self.kitsune, str): if isinstance(self.kitsune, str):
@ -382,7 +378,7 @@ class PlayerCards(Plugin):
) )
except AssetsCouldNotFound: except AssetsCouldNotFound:
logger.warning("角色 %s 的头像资源获取失败", cid) logger.warning("角色 %s 的头像资源获取失败", cid)
if idx > 6: if idx > 3:
break break
return { return {
"uid": data.UID, "uid": data.UID,

View File

@ -0,0 +1,83 @@
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters
from core.plugin import Plugin, handler
from core.services.game.services import GameCacheService
from core.services.search.models import StrategyEntry
from core.services.search.services import SearchServices
from core.services.wiki.services import WikiService
from utils.log import logger
class RelicsPlugin(Plugin):
"""遗器图鉴查询"""
KEYBOARD = [[InlineKeyboardButton(text="查看遗器套装列表并查询", switch_inline_query_current_chat="查看遗器套装列表并查询")]]
def __init__(
self,
cache_service: GameCacheService = None,
wiki_service: WikiService = None,
search_service: SearchServices = None,
):
self.cache_service = cache_service
self.wiki_service = wiki_service
self.search_service = search_service
@handler.command(command="relics", block=False)
@handler.message(filters=filters.Regex("^遗器套装查询(.*)"), block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None:
message = update.effective_message
user = update.effective_user
args = self.get_args(context)
if len(args) >= 1:
relics_name = args[0]
else:
reply_message = await message.reply_text("请回复你要查询的遗器名称", reply_markup=InlineKeyboardMarkup(self.KEYBOARD))
if filters.ChatType.GROUPS.filter(reply_message):
self.add_delete_message_job(message)
self.add_delete_message_job(reply_message)
return
file_path = self.wiki_service.raider.raider_relic_path / f"{relics_name}.png"
if not file_path.exists():
reply_message = await message.reply_text(
f"没有找到 {relics_name} 的遗器图鉴", reply_markup=InlineKeyboardMarkup(self.KEYBOARD)
)
if filters.ChatType.GROUPS.filter(reply_message):
self.add_delete_message_job(message)
self.add_delete_message_job(reply_message)
return
logger.info("用户 %s[%s] 查询遗器图鉴命令请求 || 参数 %s", user.full_name, user.id, relics_name)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
caption = "From 米游社@听语惊花"
if file_id := await self.cache_service.get_relics_cache(relics_name):
await message.reply_photo(
photo=file_id,
caption=caption,
filename=f"{relics_name}.png",
allow_sending_without_reply=True,
parse_mode=ParseMode.HTML,
)
else:
reply_photo = await message.reply_photo(
photo=open(file_path, "rb"),
caption=caption,
filename=f"{relics_name}.png",
allow_sending_without_reply=True,
parse_mode=ParseMode.HTML,
)
if reply_photo.photo:
tags = [relics_name]
photo_file_id = reply_photo.photo[0].file_id
await self.cache_service.set_relics_cache(relics_name, photo_file_id)
entry = StrategyEntry(
key=f"plugin:relics:{relics_name}",
title=relics_name,
description=f"{relics_name} 遗器图鉴",
tags=tags,
caption=caption,
parse_mode="HTML",
photo_file_id=photo_file_id,
)
await self.search_service.add_entry(entry)

View File

@ -191,7 +191,6 @@ class BaseClient:
headers = dict(headers or {}) headers = dict(headers or {})
headers.setdefault("User-Agent", self.USER_AGENT) headers.setdefault("User-Agent", self.USER_AGENT)
update_device_headers(self.hoyolab_id, headers) update_device_headers(self.hoyolab_id, headers)
logger.debug("Account ID: %s Header: %s" % (self.hoyolab_id, headers))
if method is None: if method is None:
method = "POST" if data else "GET" method = "POST" if data else "GET"