2023-08-29 05:15:55 +00:00
|
|
|
from typing import List
|
|
|
|
|
|
|
|
from pydantic import ValidationError
|
|
|
|
|
|
|
|
from func.client import client, retry
|
|
|
|
from func.data import all_light_cones, all_light_cones_name, dump_light_cones
|
|
|
|
from models.light_cone import YattaLightCone
|
|
|
|
from res_func.url import light_cone_yatta_url
|
|
|
|
|
|
|
|
|
|
|
|
@retry
|
|
|
|
async def get_single_light_cone(url: str) -> None:
|
|
|
|
req = await client.get(url)
|
|
|
|
try:
|
|
|
|
light_cone = YattaLightCone(**(req.json()["data"]))
|
|
|
|
except Exception as e:
|
|
|
|
print(f"{url} 获取光锥数据失败")
|
|
|
|
raise e
|
|
|
|
all_light_cones.append(light_cone)
|
|
|
|
all_light_cones_name[light_cone.name] = light_cone
|
|
|
|
|
|
|
|
|
|
|
|
@retry
|
|
|
|
async def get_all_light_cones() -> List[str]:
|
|
|
|
req = await client.get(light_cone_yatta_url)
|
|
|
|
return list(req.json()["data"]["items"].keys())
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_light_cones():
|
|
|
|
print("获取光锥数据")
|
|
|
|
light_cones = await get_all_light_cones()
|
|
|
|
for light_cone_id in light_cones:
|
|
|
|
try:
|
|
|
|
await get_single_light_cone(f"{light_cone_yatta_url}/{light_cone_id}")
|
|
|
|
except ValidationError:
|
2024-03-29 14:24:11 +00:00
|
|
|
print(
|
|
|
|
f"{light_cone_yatta_url}/{light_cone_id} 获取光锥数据失败,数据格式异常"
|
|
|
|
)
|
2023-08-29 05:15:55 +00:00
|
|
|
print("获取光锥数据完成")
|
|
|
|
await dump_light_cones()
|