♻️ Refactor code

Plugin name changed to `daily_farming`
Change `honey impact` source to `ambr` source
This commit is contained in:
Karako 2024-03-17 06:02:56 +08:00
parent 62891670c0
commit c6be4e467a
No known key found for this signature in database
18 changed files with 588 additions and 5 deletions

@ -1 +1 @@
Subproject commit 968b3fd52d699c65648ecab22b3c7ab1f2d32d52 Subproject commit 3057fba7a6e84be60ece1ab98e6cd012aba37e78

View File

View File

@ -0,0 +1,28 @@
INTERVAL = 1
RETRY_TIMES = 5
WEEK_MAP = ["", "", "", "", "", "", ""]
AREAS = ["蒙德", "璃月", "稻妻", "须弥", "枫丹", "纳塔", "至冬", "坎瑞亚"]
# fmt: off
# 章节顺序、国家(区域)名是从《足迹》 PV 中取的
DOMAINS = [
"忘却之峡", # 蒙德精通秘境
"太山府", # 璃月精通秘境
"菫色之庭", # 稻妻精通秘境
"昏识塔", # 须弥精通秘境
"苍白的遗荣", # 枫丹精通秘境
"", # 纳塔精通秘境
"", # 至东精通秘境
"", # 坎瑞亚精通秘境
"塞西莉亚苗圃", # 蒙德炼武秘境
"震雷连山密宫", # 璃月炼武秘境
"砂流之庭", # 稻妻炼武秘境
"有顶塔", # 须弥炼武秘境
"深潮的余响", # 枫丹炼武秘境
"", # 纳塔炼武秘境
"", # 至东炼武秘境
"", # 坎瑞亚炼武秘境
]
# fmt: on
DOMAIN_AREA_MAP = dict(zip(DOMAINS, AREAS * 2))

View File

@ -0,0 +1,85 @@
from collections.abc import ItemsView
from pydantic import BaseModel as PydanticBaseModel
try:
import ujson as json
except ImportError:
import json
class BaseModel(PydanticBaseModel):
class Config:
json_loads = json.loads
class ItemData(BaseModel):
id: str # ID
name: str # 名称
rarity: int # 星级
icon: str # 图标
level: int | None = None # 等级
class MaterialData(BaseModel):
icon: str
rarity: int
class AvatarData(ItemData):
constellation: int | None = None # 命座
skills: list[int] | None = None # 天赋等级
class WeaponData(ItemData):
refinement: int | None = None # 精炼度
avatar_icon: str | None = None # 武器使用者图标
class AreaData(BaseModel):
name: str # 区域名
material_name: str # 区域的材料系列名
materials: list[MaterialData] = [] # 区域材料
avatars: list[AvatarData] = []
weapons: list[WeaponData] = []
@property
def items(self) -> list[AvatarData | WeaponData]:
"""可培养的角色或武器"""
return self.avatars or WeaponData
class RenderData(BaseModel):
title: str # 页面标题,主要用于显示星期几
time: str # 页面时间
uid: str | None = None # 用户UID
character: list[AreaData] = [] # 角色数据
weapon: list[AreaData] = [] # 武器数据
def __getitem__(self, item):
return self.__getattribute__(item)
class UserOwned(BaseModel):
avatars: dict[str, AvatarData] = {}
"""角色 ID 到角色对象的映射"""
weapons: dict[str, list[WeaponData]] = {}
"""用户同时可以拥有多把同名武器,因此是 ID 到 list 的映射"""
class FarmingData(BaseModel):
weekday: str
areas: list[AreaData]
def items(self) -> ItemsView:
return self.dict().items()
class FullFarmingData(BaseModel):
__root__: dict[int, FarmingData] = {}
def __bool__(self) -> bool:
return bool(self.__root__)
def weekday(self, weekday: int) -> FarmingData | dict:
return self.__root__.get(weekday, {})

View File

@ -0,0 +1,150 @@
import asyncio
import sys
from abc import ABC, abstractmethod
from collections import Counter
from multiprocessing import RLock
from ssl import SSLZeroReturnError
from typing import TypeVar, ParamSpec, final, Iterable
from httpx import AsyncClient, HTTPError
from core.dependence.assets import AssetsService
from plugins.genshin.farming._const import AREAS, INTERVAL, RETRY_TIMES, WEEK_MAP
from plugins.genshin.farming._model import (
FarmingData,
MaterialData,
AreaData,
AvatarData,
WeaponData,
)
from utils.log import logger
__all__ = ("Spider",)
R = TypeVar("R")
P = ParamSpec("P")
def get_material_serial_name(names: Iterable[str]) -> str:
counter = None
for name in names:
if counter is None:
counter = Counter(name)
continue
counter = counter & Counter(name)
return "".join(counter.keys()).replace("", "").replace("", "")
class Spider(ABC):
_lock = RLock()
_client: AsyncClient | None = None
priority: int = sys.maxsize
def __init__(self, assets: AssetsService):
self.assets = assets
@property
def client(self) -> AsyncClient:
with self._lock:
if self._client is None or self._client.is_closed:
self._client = AsyncClient()
return self._client
@abstractmethod
async def __call__(self) -> dict[int, FarmingData]:
pass
@classmethod
@final
async def execute(cls, assets: AssetsService) -> dict[int, FarmingData]:
result = None
for spider in sorted(
map(lambda x: x(assets), cls.__subclasses__()), key=lambda x: (x.priority, x.__class__.__name__)
):
if (result := await spider()) is not None:
return result
if result is None:
logger.error("每日素材刷新失败,请稍后重试")
class Ambr(Spider):
farming_url = "https://api.ambr.top/v2/chs/dailyDungeon"
async def _request(self, url: str) -> dict | None:
response = None
for attempts in range(RETRY_TIMES):
try:
response = await self.client.get(url)
response.raise_for_status()
break
except (HTTPError, SSLZeroReturnError):
await asyncio.sleep(INTERVAL)
if attempts + 1 == RETRY_TIMES:
return None
else:
logger.warning("每日素材刷新失败, 正在重试第 %d", attempts)
continue
if response is not None:
return response.json()["data"]
async def _parse_item_data(self, material_json_data: dict):
items = []
for key in ["avatar", "weapon"]:
cls = AvatarData if key == "avatar" else WeaponData
if chunk_data := material_json_data["additions"]["requiredBy"].get(key):
for data in filter(lambda x: x["rank"] > 3, chunk_data):
item_id = data["id"]
if isinstance(item_id, str):
continue # 旅行者
item_icon = await getattr(self.assets, key)(item_id).icon()
items.append(
cls(
id=item_id,
name=data["name"],
rarity=data["rank"],
icon=item_icon.as_uri(),
)
)
return items
async def _parse_weekday_data(self, weekday_data: dict) -> list[AreaData]:
area_data_list = []
for domain in weekday_data.values():
area_name = AREAS[int(domain["city"]) - 1]
material_name_list = []
materials = []
items = []
for material_id in domain["reward"][3:]:
material_json_data = await self._request(f"https://api.ambr.top/v2/CHS/material/{material_id}")
material_name_list.append(material_json_data["name"])
material_icon = await self.assets.material(material_id).icon()
materials.append(MaterialData(icon=material_icon.as_uri(), rarity=material_json_data["rank"]))
items = await self._parse_item_data(material_json_data)
area_data_list.append(
AreaData(
name=area_name,
material_name=get_material_serial_name(material_name_list),
materials=materials,
**{"avatars" if isinstance(items[0], AvatarData) else "weapons": items},
)
)
return area_data_list
async def __call__(self) -> dict[int, FarmingData]:
week_map = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
if (full_farming_json_data := await self._request(self.farming_url)) is None:
logger.error("从 Ambr 上爬取每日素材失败,请稍后重试")
return {}
farming_data_list = []
for weekday_name, weekday_data in full_farming_json_data.items():
if (weekday := week_map.index(weekday_name) + 1) == 7:
continue # 跳过星期天
area_data_list = await self._parse_weekday_data(weekday_data)
farming_data_list.append(FarmingData(weekday=WEEK_MAP[weekday - 1], areas=area_data_list))
return {k + 1: v for k, v in enumerate(farming_data_list)}

View File

@ -0,0 +1,320 @@
import asyncio
from asyncio import Lock
from copy import deepcopy
from datetime import datetime
from functools import partial
from typing import ParamSpec, TYPE_CHECKING, TypeVar
import aiofiles
import pydantic
from simnet import GenshinClient
from simnet.errors import BadRequest as SimnetBadRequest
from simnet.errors import InvalidCookies
from telegram.constants import ChatAction, ParseMode
from core.dependence.assets import AssetsService
from gram_core.plugin import Plugin, handler
from gram_core.services.template.models import FileType, RenderGroupResult
from gram_core.services.template.services import TemplateService
from plugins.genshin.farming._const import AREAS, INTERVAL
from plugins.genshin.farming._model import AreaData, AvatarData, FullFarmingData, RenderData, UserOwned, WeaponData
from plugins.genshin.farming._spider import Spider
from plugins.tools.genshin import CharacterDetails, CookiesNotFoundError, GenshinHelper, PlayerNotFoundError
from utils.const import DATA_DIR
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from telegram import Message, Update
from telegram.ext import ContextTypes
R = TypeVar("R")
P = ParamSpec("P")
_RETRY_TIMES = 5
_WEEK_MAP = ["", "", "", "", "", "", ""]
DATA_FILE_PATH = DATA_DIR.joinpath("daily_material.json").resolve()
def sort_item(item: AvatarData | WeaponData) -> tuple:
rarity = item.rarity
level = item.level
if isinstance(item, AvatarData):
owned = item.constellation is not None
strengthening = item.constellation or 0
else:
owned = item.refinement is not None
strengthening = item.refinement or 0
return owned, rarity, level, strengthening
class DailyFarming(Plugin):
lock: Lock = Lock()
full_farming_data = FullFarmingData()
def __init__(
self,
assets: AssetsService,
template: TemplateService,
genshin_helper: GenshinHelper,
character_details: CharacterDetails,
) -> None:
self.assets_service = assets
self.template_service = template
self.helper = genshin_helper
self.character_details = character_details
async def _refresh_farming_data(self) -> bool:
async with self.lock:
if (result := await Spider.execute(self.assets_service)) is not None:
self.full_farming_data.__root__ = result
else:
return False
if self.full_farming_data:
async with aiofiles.open(DATA_FILE_PATH, "w", encoding="utf-8") as file:
await file.write(self.full_farming_data.json(ensure_ascii=False))
return True
# noinspection PyUnresolvedReferences
async def initialize(self) -> None:
"""插件在初始化时,会检查一下本地是否缓存了每日素材的数据"""
async def refresh_task():
"""构建每日素材文件的后台任务"""
logger.info("开始获取并缓存每日素材表")
if await self._refresh_farming_data():
logger.success("每日素材表缓存成功")
else:
logger.error("每日素材表缓存失败,请稍后重试")
# 当缓存不存在或已过期(默认 1 天)则重新下载
if not await aiofiles.os.path.exists(DATA_FILE_PATH):
# 由于构建后台任务的话错误不会输出并堵塞主线程,所以用前台任务
await refresh_task()
else:
mtime = await aiofiles.os.path.getmtime(DATA_FILE_PATH)
mtime = datetime.fromtimestamp(mtime)
elapsed = datetime.now() - mtime
if elapsed.days > 1:
await refresh_task()
# 若存在则直接使用
if await aiofiles.os.path.exists(DATA_FILE_PATH):
try:
async with aiofiles.open(DATA_FILE_PATH, "rb") as file:
self.full_farming_data = FullFarmingData.parse_raw(await file.read())
except pydantic.ValidationError:
await aiofiles.os.remove(DATA_FILE_PATH)
await refresh_task()
async def _get_character_skill(self, client, character) -> list[int]:
if getattr(client, "damaged", False):
return []
try:
detail = await self.character_details.get_character_details(client, character)
return [t.level for t in detail.talents if t.type in ["attack", "skill", "burst"]]
except InvalidCookies:
setattr(client, "damaged", True)
except SimnetBadRequest as e:
if e.ret_code == -502002:
client.damaged = True
setattr(client, "damaged", True)
return []
async def _get_user_items(self, user_id: int) -> tuple[GenshinClient | None, UserOwned]:
"""获取已经绑定的账号的角色、武器信息"""
user_data = UserOwned()
try:
logger.debug("尝试获取已绑定的原神账号")
client = await self.helper.get_genshin_client(user_id)
logger.debug("获取账号数据成功: UID=%s", client.player_id)
characters = await client.get_genshin_characters(client.player_id)
for character in filter(lambda x: x.name != "旅行者", characters):
character_id = str(character.id)
character_assets = self.assets_service.avatar(character_id)
character_icon = await character_assets.icon(False)
character_side = await character_assets.side(False)
user_data.avatars[character_id] = AvatarData(
id=character_id,
name=character.name,
rarity=character.rarity,
icon=character_icon.as_uri(),
level=character.level,
constellation=character.constellation,
skills=await self._get_character_skill(client, character),
)
# 判定武器的突破次数是否大于 2, 若是, 则将图标替换为 awakened (觉醒) 的图标
weapon = character.weapon
if weapon.rarity < 4:
continue # 忽略 4 星以下的武器
weapon_id = str(weapon.id)
weapon_icon_type = "icon" if weapon.ascension < 2 else "awaken"
weapon_icon = await getattr(self.assets_service.weapon(weapon_id), weapon_icon_type)()
if weapon_id not in user_data.weapons:
# 由于用户可能持有多把同一种武器
# 这里需要使用 List 来储存所有不同角色持有的同名武器
user_data.weapons[weapon_id] = []
user_data.weapons[weapon_id].append(
WeaponData(
id=weapon_id,
name=weapon.name,
rarity=weapon.rarity,
icon=weapon_icon.as_uri(),
level=weapon.level,
refinement=weapon.refinement,
avatar_icon=character_side.as_uri(),
)
)
except (PlayerNotFoundError, CookiesNotFoundError):
self.log_user(user_id, logger.info, "未查询到绑定的账号信息")
except InvalidCookies:
self.log_user(user_id, logger.info, "所绑定的账号信息已失效")
else:
# 没有异常返回数据
return client, user_data
# 有上述异常的, client 会返回 None
return None, user_data
async def _get_render_data(self, user_id, weekday, title, time_text):
# 尝试获取用户已绑定的原神账号信息
client, user_owned = await self._get_user_items(user_id)
today_farming = self.full_farming_data.weekday(weekday)
area_avatars: list[AreaData] = []
area_weapons: list[AreaData] = []
for area_data in today_farming.areas:
items = []
new_area_data = deepcopy(area_data)
for avatar in area_data.avatars:
items.append(user_owned.avatars.get(str(avatar.id), avatar))
new_area_data.avatars = list(sorted(items, key=sort_item, reverse=True))
for weapon in area_data.weapons:
if weapons := user_owned.weapons.get(str(weapon.id), []):
items.extend(weapons)
else:
items.append(weapon)
new_area_data.weapons = list(sorted(items, key=sort_item, reverse=True))
[area_weapons, area_avatars][bool(area_data.avatars)].append(new_area_data)
return RenderData(
title=title,
time=time_text,
uid=mask_number(client.player_id) if client else client,
character=list(sorted(area_avatars, key=lambda x: AREAS.index(x.name))),
weapon=list(sorted(area_weapons, key=lambda x: AREAS.index(x.name))),
)
@handler.command("daily_farming", block=False)
async def daily_farming(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
"""每日素材表
使用方式 /daily_farming (星期)
"""
user_id = await self.get_real_user_id(update)
message: "Message" = update.effective_message
args = self.get_args(context)
weekday, title, time_text, full = _parse_time(args)
self.log_user(update, logger.info, "每日素材命令请求 || 参数 weekday=%s full=%s", _WEEK_MAP[weekday - 1], full)
if weekday == 7:
from telegram.constants import ParseMode
the_day = "今天" if title == "今日" else "这天"
await message.reply_text(f"{the_day}是星期天, <b>全部素材都可以</b>刷哦~", parse_mode=ParseMode.HTML)
return
if self.lock.locked(): # 若检测到了第一个锁:正在下载每日素材表的数据
loading_prompt = await message.reply_text("派蒙正在摘抄每日素材表,以后再来探索吧~")
self.add_delete_message_job(loading_prompt, delay=5)
return
loading_prompt = await message.reply_text("派蒙可能需要找找图标素材,还请耐心等待哦~")
await message.reply_chat_action(ChatAction.TYPING)
# 获取已经缓存的秘境素材信息
if not self.full_farming_data: # 若没有缓存每日素材表的数据
logger.info("正在获取每日素材缓存")
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await self._refresh_farming_data()
render_data = await self._get_render_data(user_id, weekday, title, time_text)
await message.reply_chat_action(ChatAction.TYPING)
# 是否发送原图
file_type = FileType.DOCUMENT if full else FileType.PHOTO
character_img_data, weapon_img_data = await asyncio.gather(
self.template_service.render( # 渲染角色素材页
"genshin/daily_farming/character.jinja2",
{"data": render_data},
{"width": 1338, "height": 500},
file_type=file_type,
ttl=30 * 24 * 60 * 60,
),
self.template_service.render( # 渲染武器素材页
"genshin/daily_farming/weapon.jinja2",
{"data": render_data},
{"width": 1338, "height": 500},
file_type=file_type,
ttl=30 * 24 * 60 * 60,
),
)
self.add_delete_message_job(loading_prompt, delay=5)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
character_img_data.filename = f"{title}可培养角色.png"
weapon_img_data.filename = f"{title}可培养武器.png"
await RenderGroupResult([character_img_data, weapon_img_data]).reply_media_group(message)
logger.debug("角色、武器培养素材图发送成功")
@handler.command("refresh_farming_data", admin=True, block=False)
async def refresh_farming_data(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user
message = update.effective_message
logger.info("用户 {%s}[%s] 刷新[bold]每日素材[/]缓存命令", user.full_name, user.id, extra={"markup": True})
if self.lock.locked():
notice = await message.reply_text("派蒙还在抄每日素材表呢,我有在好好工作哦~")
self.add_delete_message_job(notice, delay=10)
return
notice = await message.reply_text("派蒙正在重新摘抄每日素材表,请稍等~", parse_mode=ParseMode.HTML)
async with self.lock: # 锁住第一把锁
await self._refresh_farming_data()
await notice.edit_text(
"每日素材表"
+ ("摘抄<b>完成!</b>" if self.full_farming_data else "坏掉了!等会它再长好了之后我再抄。。。"),
parse_mode=ParseMode.HTML,
)
def _parse_time(args: list[str]) -> tuple[int, str, str, bool]:
now = datetime.now()
try:
weekday = (_ := int(args[0])) - (_ > 0)
weekday = (weekday % 7 + 7) % 7
time_text = title = f"星期{_WEEK_MAP[weekday]}"
except (ValueError, IndexError):
title = "今日"
weekday = now.weekday() - (1 if now.hour < 4 else 0)
weekday = 6 if weekday < 0 else weekday
time_text = f"星期{_WEEK_MAP[weekday]}"
full = bool(args and args[-1] == "full")
return weekday + 1, title, time_text, full

View File

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

View File

Before

Width:  |  Height:  |  Size: 15 KiB

After

Width:  |  Height:  |  Size: 15 KiB

View File

Before

Width:  |  Height:  |  Size: 16 KiB

After

Width:  |  Height:  |  Size: 16 KiB

View File

Before

Width:  |  Height:  |  Size: 9.0 KiB

After

Width:  |  Height:  |  Size: 9.0 KiB

View File

Before

Width:  |  Height:  |  Size: 8.7 KiB

After

Width:  |  Height:  |  Size: 8.7 KiB

View File

Before

Width:  |  Height:  |  Size: 65 KiB

After

Width:  |  Height:  |  Size: 65 KiB

View File

Before

Width:  |  Height:  |  Size: 47 KiB

After

Width:  |  Height:  |  Size: 47 KiB

View File

@ -44,7 +44,7 @@
</span> </span>
</div> </div>
<div class="area-content"> <div class="area-content">
{% for item in area.items %} {% for item in area.avatars %}
<div <div
{% if data.uid != none and item.level == none %} {% if data.uid != none and item.level == none %}
class="item character item-not-owned" class="item character item-not-owned"

View File

@ -44,7 +44,7 @@
</span> </span>
</div> </div>
<div class="area-content"> <div class="area-content">
{% for item in area.items %} {% for item in area.weapons %}
<div <div
{% if data.uid != none and (item.level == none or item.level >= 81) %} {% if data.uid != none and (item.level == none or item.level >= 81) %}
class="item weapon item-not-owned" class="item weapon item-not-owned"
@ -52,9 +52,9 @@
class="item weapon" class="item weapon"
{% endif %} {% endif %}
> >
{% if item.c_path != none %} {% if item.avatar_icon != none %}
<div class="role"> <div class="role">
<img src="{{ item.c_path }}" alt=""/> <img src="{{ item.avatar_icon }}" alt=""/>
</div> </div>
{% endif %} {% endif %}
<div class="item-icon" <div class="item-icon"