mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-27 02:26:08 +00:00
401 lines
16 KiB
Python
401 lines
16 KiB
Python
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Tuple, Optional, Dict, Union, TYPE_CHECKING
|
|
|
|
from httpx import AsyncClient
|
|
|
|
from core.dependence.assets import AssetsCouldNotFound
|
|
from metadata.genshin import AVATAR_DATA
|
|
from metadata.shortname import roleToId
|
|
from modules.apihelper.client.components.remote import Remote
|
|
from modules.apihelper.models.genshin.calendar import Date, FinalAct, ActEnum, ActDetail, ActTime, BirthChar
|
|
from modules.wiki.character import Character
|
|
from utils.log import logger
|
|
|
|
if TYPE_CHECKING:
|
|
from core.dependence.assets import AssetsService
|
|
|
|
|
|
class Calendar:
|
|
"""原神活动日历"""
|
|
|
|
ANNOUNCEMENT_LIST = "https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnList"
|
|
ANNOUNCEMENT_CONTENT = "https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnContent"
|
|
ANNOUNCEMENT_PARAMS = {
|
|
"game": "hk4e",
|
|
"game_biz": "hk4e_cn",
|
|
"lang": "zh-cn",
|
|
"bundle_id": "hk4e_cn",
|
|
"platform": "pc",
|
|
"region": "cn_gf01",
|
|
"level": "55",
|
|
"uid": "100000000",
|
|
}
|
|
MIAO_API = "http://miaoapi.cn/api/calendar"
|
|
IGNORE_IDS = [
|
|
495, # 有奖问卷调查开启!
|
|
1263, # 米游社《原神》专属工具一览
|
|
423, # 《原神》玩家社区一览
|
|
422, # 《原神》防沉迷系统说明
|
|
762, # 《原神》公平运营声明
|
|
762, # 《原神》公平运营声明
|
|
]
|
|
IGNORE_RE = re.compile(r"(内容专题页|版本更新说明|调研|防沉迷|米游社|专项意见|更新修复与优化|问卷调查|版本更新通知|更新时间说明|预下载功能|周边限时|周边上新|角色演示)")
|
|
FULL_TIME_RE = re.compile(r"(魔神任务)")
|
|
|
|
def __init__(self):
|
|
self.client = AsyncClient()
|
|
|
|
@staticmethod
|
|
async def async_gen_birthday_list() -> Dict[str, List[str]]:
|
|
"""生成生日列表并且合并云端生日列表"""
|
|
birthday_list = Calendar.gen_birthday_list()
|
|
remote_data = await Remote.get_remote_birthday()
|
|
if remote_data:
|
|
birthday_list.update(remote_data)
|
|
return birthday_list
|
|
|
|
@staticmethod
|
|
def gen_birthday_list() -> Dict[str, List[str]]:
|
|
"""生成生日列表"""
|
|
birthday_list = {}
|
|
for value in AVATAR_DATA.values():
|
|
key = "_".join([str(i) for i in value["birthday"]])
|
|
data = birthday_list.get(key, [])
|
|
data.append(value["name"])
|
|
birthday_list[key] = data
|
|
return birthday_list
|
|
|
|
@staticmethod
|
|
def get_now_hour() -> datetime:
|
|
"""获取当前时间"""
|
|
return datetime.now().replace(minute=0, second=0, microsecond=0)
|
|
|
|
async def parse_official_content_date(self) -> Dict[str, ActTime]:
|
|
"""解析官方内容时间"""
|
|
time_map = {}
|
|
req = await self.client.get(self.ANNOUNCEMENT_CONTENT, params=self.ANNOUNCEMENT_PARAMS)
|
|
if req.status_code != 200:
|
|
return time_map
|
|
detail_data = req.json()
|
|
for data in detail_data.get("data", {}).get("list", []):
|
|
ann_id = data.get("ann_id", 0)
|
|
title = data.get("title", "")
|
|
content = data.get("content", "")
|
|
if ann_id in self.IGNORE_IDS or self.IGNORE_RE.findall(title):
|
|
continue
|
|
content = re.sub(r'(<|<)[\w "%:;=\-\\/\\(\\),\\.]+(>|>)', "", content)
|
|
try:
|
|
if reg_ret := re.search(r"(?:活动时间|祈愿介绍|任务开放时间|冒险....包|折扣时间)\s*〓([^〓]+)(〓|$)", content):
|
|
if time_ret := re.search(r"(?:活动时间)?(?:〓|\s)*([0-9\\/\\: ~]{6,})", reg_ret[1]):
|
|
start_time, end_time = time_ret[1].split("~")
|
|
start_time = start_time.replace("/", "-").strip()
|
|
end_time = end_time.replace("/", "-").strip()
|
|
time_map[str(ann_id)] = ActTime(
|
|
title=title,
|
|
start=start_time,
|
|
end=end_time,
|
|
)
|
|
except (IndexError, ValueError):
|
|
continue
|
|
return time_map
|
|
|
|
async def req_cal_data(self) -> Tuple[List[List[ActDetail]], Dict[str, ActTime]]:
|
|
"""请求日历数据"""
|
|
list_data = await self.client.get(self.ANNOUNCEMENT_LIST, params=self.ANNOUNCEMENT_PARAMS)
|
|
list_data = list_data.json()
|
|
|
|
new_list_data = [[], []]
|
|
for idx, data in enumerate(list_data.get("data", {}).get("list", [])):
|
|
for item in data.get("list", []):
|
|
new_list_data[idx].append(ActDetail(**item))
|
|
time_map = {}
|
|
time_map.update(await self.parse_official_content_date())
|
|
remote_data = await Remote.get_remote_calendar()
|
|
if remote_data:
|
|
time_map.update({key: ActTime(**value) for key, value in remote_data.get("data", {}).items()})
|
|
return new_list_data, time_map
|
|
|
|
@staticmethod
|
|
def date_to_weekday(date_: datetime) -> str:
|
|
"""日期转换为星期"""
|
|
time = ["一", "二", "三", "四", "五", "六", "日"]
|
|
return time[date_.weekday()]
|
|
|
|
async def get_date_list(self) -> Tuple[List[Date], datetime, datetime, timedelta, float]:
|
|
"""获取日历数据"""
|
|
data_list: List[Date] = []
|
|
today = self.get_now_hour()
|
|
temp = today - timedelta(days=7)
|
|
month = 0
|
|
date, week, is_today = [], [], []
|
|
start_date, end_date = None, None
|
|
for i in range(13):
|
|
temp += timedelta(days=1)
|
|
m, d, w = temp.month, temp.day, self.date_to_weekday(temp)
|
|
if month == 0:
|
|
start_date = temp
|
|
month = m
|
|
if month != m and len(date) > 0:
|
|
data_list.append(Date(month=month, date=date, week=week, is_today=is_today))
|
|
date, week, is_today = [], [], []
|
|
month = m
|
|
date.append(d)
|
|
week.append(w)
|
|
is_today.append(temp == today)
|
|
if i == 12:
|
|
data_list.append(Date(month=month, date=date, week=week, is_today=is_today))
|
|
end_date = temp
|
|
start_time = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_time = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
|
|
total_range: timedelta = end_time - start_time
|
|
now_left: float = (self.get_now_hour() - start_time) / total_range * 100
|
|
return (
|
|
data_list,
|
|
start_time,
|
|
end_time,
|
|
total_range,
|
|
now_left,
|
|
)
|
|
|
|
@staticmethod
|
|
def human_read(d: timedelta) -> str:
|
|
"""将日期转换为人类可读"""
|
|
hour = d.seconds // 3600
|
|
minute = d.seconds // 60 % 60
|
|
if minute >= 59:
|
|
hour += 1
|
|
text = ""
|
|
if d.days:
|
|
text += f"{d.days}天"
|
|
if hour:
|
|
text += f"{hour}小时"
|
|
return text
|
|
|
|
@staticmethod
|
|
def count_width(
|
|
act: FinalAct,
|
|
detail: Optional[ActTime],
|
|
ds: ActDetail,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
total_range: timedelta,
|
|
) -> Tuple[datetime, datetime]:
|
|
"""计算宽度"""
|
|
|
|
def get_date(d1: str, d2: str) -> datetime:
|
|
if d1 and len(d1) > 6:
|
|
return datetime.strptime(d1, "%Y-%m-%d %H:%M:%S")
|
|
return datetime.strptime(d2, "%Y-%m-%d %H:%M:%S")
|
|
|
|
s_date = get_date(detail and detail.start, ds.start_time)
|
|
e_date = get_date(detail and detail.end, ds.end_time)
|
|
s_time = max(s_date, start_time)
|
|
e_time = min(e_date, end_time)
|
|
|
|
s_range = s_time - start_time
|
|
e_range = e_time - start_time
|
|
|
|
act.left = s_range / total_range * 100
|
|
act.width = e_range / total_range * 100 - act.left
|
|
act.duration = (e_time - s_time).total_seconds()
|
|
act.start = s_date.strftime("%m-%d %H:%M")
|
|
act.end = e_date.strftime("%m-%d %H:%M")
|
|
return s_date, e_date
|
|
|
|
def parse_label(self, act: FinalAct, is_act: bool, s_date: datetime, e_date: datetime) -> None:
|
|
"""解析活动标签"""
|
|
now = self.get_now_hour()
|
|
label = ""
|
|
if self.FULL_TIME_RE.findall(act.title) or e_date - s_date > timedelta(days=365):
|
|
label = f"{s_date.strftime('%m-%d %H:%M')} 后永久有效" if s_date < now else "永久有效"
|
|
elif s_date < now < e_date:
|
|
label = f'{e_date.strftime("%m-%d %H:%M")} ({self.human_read(e_date - now)}后结束)'
|
|
if act.width > (38 if is_act else 55):
|
|
label = f"{s_date.strftime('%m-%d %H:%M')} ~ {label}"
|
|
elif s_date > now:
|
|
label = f'{s_date.strftime("%m-%d %H:%M")} ({self.human_read(s_date - now)}后开始)'
|
|
elif is_act:
|
|
label = f"{s_date.strftime('%m-%d %H:%M')} ~ {e_date.strftime('%m-%d %H:%M')}"
|
|
act.label = label
|
|
|
|
@staticmethod
|
|
async def parse_type(act: FinalAct, assets: "AssetsService") -> None:
|
|
"""解析活动类型"""
|
|
if "神铸赋形" in act.title:
|
|
act.type = ActEnum.weapon
|
|
act.title = re.sub(r"(单手剑|双手剑|长柄武器|弓|法器|·)", "", act.title)
|
|
act.sort = 2
|
|
elif "祈愿" in act.title:
|
|
act.type = ActEnum.character
|
|
if reg_ret := re.search(r"·(.*)\(", act.title):
|
|
char_name = reg_ret[1]
|
|
char = assets.avatar(roleToId(char_name))
|
|
act.banner = (await assets.namecard(char.id).navbar()).as_uri()
|
|
act.face = (await char.icon()).as_uri()
|
|
act.sort = 1
|
|
elif "纪行" in act.title:
|
|
act.type = ActEnum.no_display
|
|
elif act.title == "深渊":
|
|
act.type = ActEnum.abyss
|
|
|
|
async def get_list(
|
|
self,
|
|
ds: ActDetail,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
total_range: timedelta,
|
|
time_map: Dict[str, ActTime],
|
|
is_act: bool,
|
|
assets: "AssetsService",
|
|
) -> Optional[FinalAct]:
|
|
"""获取活动列表"""
|
|
act = FinalAct(
|
|
id=ds.ann_id,
|
|
type=ActEnum.activity if is_act else ActEnum.normal,
|
|
title=ds.title,
|
|
banner=ds.banner if is_act else "",
|
|
sort=5 if is_act else 10,
|
|
icon=ds.tag_icon,
|
|
)
|
|
detail: Optional[ActTime] = time_map.get(str(act.id))
|
|
|
|
if act.id in self.IGNORE_IDS or self.IGNORE_RE.findall(act.title) or (detail and not detail.display):
|
|
return None
|
|
await self.parse_type(act, assets)
|
|
s_date, e_date = self.count_width(act, detail, ds, start_time, end_time, total_range)
|
|
self.parse_label(act, is_act, s_date, e_date)
|
|
|
|
if s_date <= end_time and e_date >= start_time:
|
|
act.mergeStatus = 1 if act.type in {ActEnum.activity, ActEnum.normal} else 0
|
|
return act
|
|
|
|
@staticmethod
|
|
def get_abyss_cal(start_time: datetime, end_time: datetime) -> List[List[Union[datetime, str]]]:
|
|
"""获取深渊日历"""
|
|
last = datetime.now().replace(day=1) - timedelta(days=2)
|
|
last_month = last.month
|
|
curr = datetime.now()
|
|
curr_month = curr.month
|
|
next_date = last + timedelta(days=40)
|
|
next_month = next_date.month
|
|
|
|
def start(date: datetime, up: bool = False):
|
|
return date.replace(day=1 if up else 16, hour=4, minute=0, second=0, microsecond=0)
|
|
|
|
def end(date: datetime, up: bool = False):
|
|
return date.replace(day=1 if up else 16, hour=3, minute=59, second=59, microsecond=999999)
|
|
|
|
check = [
|
|
[start(last, False), end(last, True), f"{last_month}月下半"],
|
|
[start(curr, True), end(curr, False), f"{curr_month}月上半"],
|
|
[start(curr, False), end(next_date, True), f"{curr_month}月下半"],
|
|
[start(next_date, True), end(next_date, False), f"{next_month}月上半"],
|
|
]
|
|
ret = []
|
|
for ds in check:
|
|
s, e, _ = ds
|
|
if (s <= start_time <= e) or (s <= end_time <= e):
|
|
ret.append(ds)
|
|
return ret
|
|
|
|
async def get_birthday_char(
|
|
self, date_list: List[Date], assets: "AssetsService"
|
|
) -> Tuple[int, Dict[str, Dict[str, List[BirthChar]]]]:
|
|
"""获取生日角色"""
|
|
birthday_list = await self.async_gen_birthday_list()
|
|
birthday_char_line = 0
|
|
birthday_chars = {}
|
|
for date in date_list:
|
|
birthday_chars[str(date.month)] = {}
|
|
for d in date.date:
|
|
key = f"{date.month}_{d}"
|
|
if char := birthday_list.get(key):
|
|
birthday_char_line = max(len(char), birthday_char_line)
|
|
birthday_chars[str(date.month)][str(d)] = []
|
|
for c in char:
|
|
character = await Character.get_by_name(c)
|
|
try:
|
|
birthday_chars[str(date.month)][str(d)].append(
|
|
BirthChar(
|
|
name=c,
|
|
star=character.rarity,
|
|
icon=(await assets.avatar(roleToId(c)).icon()).as_uri(),
|
|
)
|
|
)
|
|
except AssetsCouldNotFound:
|
|
logger.warning("角色 %s 图片素材未找到", c)
|
|
return birthday_char_line, birthday_chars
|
|
|
|
@staticmethod
|
|
def get_merge_next(target: List[FinalAct], li: FinalAct) -> Optional[FinalAct]:
|
|
"""获取下一个可以合并的活动"""
|
|
return next(
|
|
(li2 for li2 in target if (li2.mergeStatus == 1) and (li.left + li.width <= li2.left)),
|
|
None,
|
|
)
|
|
|
|
def merge_list(self, target: List[FinalAct]) -> Tuple[List[List[FinalAct]], int, int]:
|
|
"""将两个活动合并为一行"""
|
|
char_count = 0
|
|
char_old = 0
|
|
ret: List[List[FinalAct]] = []
|
|
for idx, li in enumerate(target):
|
|
if li.type == ActEnum.character:
|
|
char_count += 1
|
|
if li.left == 0:
|
|
char_old += 1
|
|
li.idx = char_count
|
|
if li.mergeStatus == 1:
|
|
if li2 := self.get_merge_next(target[idx + 1 :], li):
|
|
li.mergeStatus = 2
|
|
li2.mergeStatus = 2
|
|
ret.append([li, li2])
|
|
if li.mergeStatus != 2:
|
|
li.mergeStatus = 2
|
|
ret.append([li])
|
|
return ret, char_count, char_old
|
|
|
|
async def get_photo_data(self, assets: "AssetsService") -> Dict:
|
|
"""获取数据"""
|
|
now = self.get_now_hour()
|
|
list_data, time_map = await self.req_cal_data()
|
|
(
|
|
date_list,
|
|
start_time,
|
|
end_time,
|
|
total_range,
|
|
now_left,
|
|
) = await self.get_date_list()
|
|
birthday_char_line, birthday_chars = await self.get_birthday_char(date_list, assets)
|
|
target: List[FinalAct] = []
|
|
abyss: List[FinalAct] = []
|
|
|
|
for ds in list_data[1]:
|
|
if act := await self.get_list(ds, start_time, end_time, total_range, time_map, True, assets):
|
|
target.append(act)
|
|
for ds in list_data[0]:
|
|
if act := await self.get_list(ds, start_time, end_time, total_range, time_map, False, assets):
|
|
target.append(act)
|
|
abyss_cal = self.get_abyss_cal(start_time, end_time)
|
|
for t in abyss_cal:
|
|
ds = ActDetail(
|
|
title=f"「深境螺旋」· {t[2]}",
|
|
start_time=t[0].strftime("%Y-%m-%d %H:%M:%S"),
|
|
end_time=t[1].strftime("%Y-%m-%d %H:%M:%S"),
|
|
)
|
|
if act := await self.get_list(ds, start_time, end_time, total_range, {}, True, assets):
|
|
abyss.append(act)
|
|
target.sort(key=lambda x: (x.sort, x.start, x.duration))
|
|
target, char_count, char_old = self.merge_list(target)
|
|
return {
|
|
"date_list": date_list,
|
|
"now_left": now_left,
|
|
"list": target,
|
|
"abyss": abyss,
|
|
"char_mode": f"char-{char_count}-{char_old}",
|
|
"now_time": now.strftime("%Y-%m-%d %H 时"),
|
|
"birthday_char_line": birthday_char_line,
|
|
"birthday_chars": birthday_chars,
|
|
}
|