import re
from datetime import datetime, timedelta
from typing import List, Tuple, Optional, Dict, Union, TYPE_CHECKING
from httpx import AsyncClient
from core.dependence.assets import AssetsCouldNotFound
from metadata.genshin import AVATAR_DATA
from metadata.shortname import roleToId
from modules.apihelper.client.components.remote import Remote
from modules.apihelper.models.genshin.calendar import Date, FinalAct, ActEnum, ActDetail, ActTime, BirthChar
from modules.wiki.character import Character
from utils.log import logger
from core.dependence.assets import AssetsService
class Calendar:
ANNOUNCEMENT_LIST = "https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnList"
ANNOUNCEMENT_CONTENT = "https://hk4e-api.mihoyo.com/common/hk4e_cn/announcement/api/getAnnContent"
"game": "hk4e",
"game_biz": "hk4e_cn",
"lang": "zh-cn",
"bundle_id": "hk4e_cn",
"platform": "pc",
"region": "cn_gf01",
"level": "55",
"uid": "100000000",
MIAO_API = "http://miaoapi.cn/api/calendar"
495, # 有奖问卷调查开启!
1263, # 米游社《原神》专属工具一览
423, # 《原神》玩家社区一览
422, # 《原神》防沉迷系统说明
762, # 《原神》公平运营声明
762, # 《原神》公平运营声明
IGNORE_RE = re.compile(
FULL_TIME_RE = re.compile(r"(魔神任务)")
def __init__(self):
self.client = AsyncClient()
async def async_gen_birthday_list() -> Dict[str, List[str]]:
birthday_list = Calendar.gen_birthday_list()
remote_data = await Remote.get_remote_birthday()
if remote_data:
return birthday_list
def gen_birthday_list() -> Dict[str, List[str]]:
birthday_list = {}
for value in AVATAR_DATA.values():
key = "_".join([str(i) for i in value["birthday"]])
data = birthday_list.get(key, [])
birthday_list[key] = data
return birthday_list
def get_now_hour() -> datetime:
return datetime.now().replace(minute=0, second=0, microsecond=0)
async def parse_official_content_date(self) -> Dict[str, ActTime]:
time_map = {}
req = await self.client.get(self.ANNOUNCEMENT_CONTENT, params=self.ANNOUNCEMENT_PARAMS)
if req.status_code != 200:
return time_map
detail_data = req.json()
for data in detail_data.get("data", {}).get("list", []):
ann_id = data.get("ann_id", 0)
title = data.get("title", "")
content = data.get("content", "")
if ann_id in self.IGNORE_IDS or self.IGNORE_RE.findall(title):
content = re.sub(r'(<|<)[\w "%:;=\-\\/\\(\\),\\.]+(>|>)', "", content)
if reg_ret := re.search(
r"(?:活动时间|祈愿介绍|任务开放时间|冒险....包|折扣时间)\s*〓([^〓]+)(〓|$)", content
if time_ret := re.search(r"(?:活动时间)?(?:〓|\s)*([0-9\\/\\: ~]{6,})", reg_ret[1]):
start_time, end_time = time_ret[1].split("~")
start_time = start_time.replace("/", "-").strip()
end_time = end_time.replace("/", "-").strip()
time_map[str(ann_id)] = ActTime(
except (IndexError, ValueError):
return time_map
async def req_cal_data(self) -> Tuple[List[List[ActDetail]], Dict[str, ActTime]]:
list_data = await self.client.get(self.ANNOUNCEMENT_LIST, params=self.ANNOUNCEMENT_PARAMS)
list_data = list_data.json()
new_list_data = [[], []]
for idx, data in enumerate(list_data.get("data", {}).get("list", [])):
for item in data.get("list", []):
time_map = {}
time_map.update(await self.parse_official_content_date())
remote_data = await Remote.get_remote_calendar()
if remote_data:
time_map.update({key: ActTime(**value) for key, value in remote_data.get("data", {}).items()})
return new_list_data, time_map
def date_to_weekday(date_: datetime) -> str:
time = ["一", "二", "三", "四", "五", "六", "日"]
return time[date_.weekday()]
async def get_date_list(self) -> Tuple[List[Date], datetime, datetime, timedelta, float]:
data_list: List[Date] = []
today = self.get_now_hour()
temp = today - timedelta(days=7)
month = 0
date, week, is_today = [], [], []
start_date, end_date = None, None
for i in range(13):
temp += timedelta(days=1)
m, d, w = temp.month, temp.day, self.date_to_weekday(temp)
if month == 0:
start_date = temp
month = m
if month != m and len(date) > 0:
data_list.append(Date(month=month, date=date, week=week, is_today=is_today))
date, week, is_today = [], [], []
month = m
is_today.append(temp == today)
if i == 12:
data_list.append(Date(month=month, date=date, week=week, is_today=is_today))
end_date = temp
start_time = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_time = end_date.replace(hour=23, minute=59, second=59, microsecond=999999)
total_range: timedelta = end_time - start_time
now_left: float = (self.get_now_hour() - start_time) / total_range * 100
return (
def human_read(d: timedelta) -> str:
hour = d.seconds // 3600
minute = d.seconds // 60 % 60
if minute >= 59:
hour += 1
text = ""
if d.days:
text += f"{d.days}天"
if hour:
text += f"{hour}小时"
return text
def count_width(
act: FinalAct,
detail: Optional[ActTime],
ds: ActDetail,
start_time: datetime,
end_time: datetime,
total_range: timedelta,
) -> Tuple[datetime, datetime]:
def get_date(d1: str, d2: str) -> datetime:
if d1 and len(d1) > 6:
return datetime.strptime(d1, "%Y-%m-%d %H:%M:%S")
except ValueError:
return datetime.strptime(d1, "%Y-%m-%d %H:%M")
return datetime.strptime(d2, "%Y-%m-%d %H:%M:%S")
s_date = get_date(detail and detail.start, ds.start_time)
e_date = get_date(detail and detail.end, ds.end_time)
s_time = max(s_date, start_time)
e_time = min(e_date, end_time)
s_range = s_time - start_time
e_range = e_time - start_time
act.left = s_range / total_range * 100
act.width = e_range / total_range * 100 - act.left
act.duration = (e_time - s_time).total_seconds()
act.start = s_date.strftime("%m-%d %H:%M")
act.end = e_date.strftime("%m-%d %H:%M")
return s_date, e_date
def parse_label(self, act: FinalAct, is_act: bool, s_date: datetime, e_date: datetime) -> None:
now = self.get_now_hour()
label = ""
if self.FULL_TIME_RE.findall(act.title) or e_date - s_date > timedelta(days=365):
label = f"{s_date.strftime('%m-%d %H:%M')} 后永久有效" if s_date < now else "永久有效"
elif s_date < now < e_date:
label = f'{e_date.strftime("%m-%d %H:%M")} ({self.human_read(e_date - now)}后结束)'
if act.width > (38 if is_act else 55):
label = f"{s_date.strftime('%m-%d %H:%M')} ~ {label}"
elif s_date > now:
label = f'{s_date.strftime("%m-%d %H:%M")} ({self.human_read(s_date - now)}后开始)'
elif is_act:
label = f"{s_date.strftime('%m-%d %H:%M')} ~ {e_date.strftime('%m-%d %H:%M')}"
act.label = label
async def parse_type(act: FinalAct, assets: "AssetsService") -> None:
if "神铸赋形" in act.title:
act.type = ActEnum.weapon
act.title = re.sub(r"(单手剑|双手剑|长柄武器|弓|法器|·)", "", act.title)
act.sort = 2
elif "祈愿" in act.title:
act.type = ActEnum.character
if reg_ret := re.search(r"·(.*)\(", act.title):
char_name = reg_ret[1]
char = assets.avatar(roleToId(char_name))
act.banner = (await assets.namecard(char.id).navbar()).as_uri()
act.face = (await char.icon()).as_uri()
act.sort = 1
elif "纪行" in act.title:
act.type = ActEnum.no_display
elif act.title == "深渊":
act.type = ActEnum.abyss
elif act.title == "幻想真境剧诗":
act.type = ActEnum.img_theater
async def get_list(
ds: ActDetail,
start_time: datetime,
end_time: datetime,
total_range: timedelta,
time_map: Dict[str, ActTime],
is_act: bool,
2023-03-14 01:27:22 +00:00
assets: "AssetsService",
) -> Optional[FinalAct]:
act = FinalAct(
type=ActEnum.activity if is_act else ActEnum.normal,
banner=ds.banner if is_act else "",
sort=5 if is_act else 10,
detail: Optional[ActTime] = time_map.get(str(act.id))
if act.id in self.IGNORE_IDS or self.IGNORE_RE.findall(act.title) or (detail and not detail.display):
return None
await self.parse_type(act, assets)
s_date, e_date = self.count_width(act, detail, ds, start_time, end_time, total_range)
self.parse_label(act, is_act, s_date, e_date)
if s_date <= end_time and e_date >= start_time:
act.mergeStatus = 1 if act.type in {ActEnum.activity, ActEnum.normal} else 0
return act
def _get_abyss_cal(start_time: datetime, end_time: datetime, day: int) -> List[List[Union[datetime, str]]]:
last = datetime.now().replace(day=1) - timedelta(days=2)
last_month = last.month
curr = datetime.now()
curr_month = curr.month
next_date = last + timedelta(days=40)
next_month = next_date.month
2024-07-29 12:32:34 +00:00
next_next_date = next_date + timedelta(days=40)
2024-07-29 12:32:34 +00:00
def start(date: datetime):
return date.replace(day=day, hour=4, minute=0, second=0, microsecond=0)
2023-02-16 08:47:48 +00:00
def end(date: datetime):
return date.replace(day=day, hour=3, minute=59, second=59, microsecond=999999)
2023-02-16 08:47:48 +00:00
check = [
[start(last), end(curr), f"{last_month}月"],
[start(curr), end(next_date), f"{curr_month}月"],
[start(next_date), end(next_next_date), f"{next_month}月"],
ret = []
for ds in check:
s, e, _ = ds
if (s <= start_time <= e) or (s <= end_time <= e):
return ret
def get_abyss_cal(start_time: datetime, end_time: datetime) -> List[List[Union[datetime, str]]]:
return Calendar._get_abyss_cal(start_time, end_time, 16)
def get_img_theater_cal(start_time: datetime, end_time: datetime) -> List[List[Union[datetime, str]]]:
return Calendar._get_abyss_cal(start_time, end_time, 1)
async def get_birthday_char(
self, date_list: List[Date], assets: "AssetsService"
) -> Tuple[int, Dict[str, Dict[str, List[BirthChar]]]]:
2023-02-28 14:15:48 +00:00
birthday_list = await self.async_gen_birthday_list()
birthday_char_line = 0
birthday_chars = {}
for date in date_list:
birthday_chars[str(date.month)] = {}
for d in date.date:
key = f"{date.month}_{d}"
2023-02-28 14:15:48 +00:00
if char := birthday_list.get(key):
birthday_char_line = max(len(char), birthday_char_line)
birthday_chars[str(date.month)][str(d)] = []
for c in char:
character = await Character.get_by_name(c)
icon=(await assets.avatar(roleToId(c)).icon()).as_uri(),
2023-02-16 08:47:48 +00:00
2024-01-07 13:40:50 +00:00
except AssetsCouldNotFound:
logger.warning("角色 %s 图片素材未找到", c)
return birthday_char_line, birthday_chars
def get_merge_next(target: List[FinalAct], li: FinalAct) -> Optional[FinalAct]:
return next(
(li2 for li2 in target if (li2.mergeStatus == 1) and (li.left + li.width <= li2.left)),
def merge_list(self, target: List[FinalAct]) -> Tuple[List[List[FinalAct]], int, int]:
char_count = 0
char_old = 0
ret: List[List[FinalAct]] = []
for idx, li in enumerate(target):
if li.type == ActEnum.character:
char_count += 1
if li.left == 0:
char_old += 1
li.idx = char_count
if li.mergeStatus == 1:
if li2 := self.get_merge_next(target[idx + 1 :], li):
li.mergeStatus = 2
li2.mergeStatus = 2
ret.append([li, li2])
if li.mergeStatus != 2:
li.mergeStatus = 2
return ret, char_count, char_old
async def get_photo_data(self, assets: "AssetsService") -> Dict:
now = self.get_now_hour()
list_data, time_map = await self.req_cal_data()
) = await self.get_date_list()
birthday_char_line, birthday_chars = await self.get_birthday_char(date_list, assets)
target: List[FinalAct] = []
abyss: List[FinalAct] = []
img_theater: List[FinalAct] = []
2023-02-16 08:47:48 +00:00
for ds in list_data[1]:
if act := await self.get_list(ds, start_time, end_time, total_range, time_map, True, assets):
for ds in list_data[0]:
if act := await self.get_list(ds, start_time, end_time, total_range, time_map, False, assets):
# 深渊
abyss_cal = self.get_abyss_cal(start_time, end_time)
for t in abyss_cal:
ds = ActDetail(
title=f"「深境螺旋」· {t[2]}",
start_time=t[0].strftime("%Y-%m-%d %H:%M:%S"),
end_time=t[1].strftime("%Y-%m-%d %H:%M:%S"),
if act := await self.get_list(ds, start_time, end_time, total_range, {}, True, assets):
# 幻想真境剧
img_theater_cal = self.get_img_theater_cal(start_time, end_time)
for t in img_theater_cal:
ds = ActDetail(
title=f"「幻想真境剧诗」· {t[2]}",
start_time=t[0].strftime("%Y-%m-%d %H:%M:%S"),
end_time=t[1].strftime("%Y-%m-%d %H:%M:%S"),
if act := await self.get_list(ds, start_time, end_time, total_range, {}, True, assets):
target.sort(key=lambda x: (x.sort, x.start, x.duration))
target, char_count, char_old = self.merge_list(target)
return {
"date_list": date_list,
"now_left": now_left,
"list": target,
"abyss": abyss,
"img_theater": img_theater,
"char_mode": f"char-{char_count}-{char_old}",
"now_time": now.strftime("%Y-%m-%d %H 时"),
"birthday_char_line": birthday_char_line,
"birthday_chars": birthday_chars,