支持新地图

This commit is contained in:
KimigaiiWuyi 2024-05-15 04:19:31 +08:00
parent 3238d51b17
commit e74960040f
6 changed files with 152 additions and 32 deletions

3
.gitignore vendored
View File

@ -132,4 +132,5 @@ dmypy.json
fastapi_genshin_map/GetMapImage/map_data fastapi_genshin_map/GetMapImage/map_data
fastapi_genshin_map/GetMapImage/resource_data fastapi_genshin_map/GetMapImage/resource_data
fastapi_genshin_map/GetMapImage/icon_data fastapi_genshin_map/GetMapImage/icon_data
fastapi_genshin_map/GetMapImage/genshinmap.log fastapi_genshin_map/GetMapImage/genshinmap.log
fastapi_genshin_map/GetMapImage/slice_data

View File

@ -17,6 +17,8 @@ class MapID(IntEnum):
"""层岩巨渊·地下矿区""" """层岩巨渊·地下矿区"""
# golden_apple_archipelago = 12 # golden_apple_archipelago = 12
"""金苹果群岛""" """金苹果群岛"""
sea_of_bygone_eras = 34
"""旧日之海"""
class Label(BaseModel): class Label(BaseModel):
@ -190,7 +192,7 @@ class PageLabel(BaseModel):
@validator("center", pre=True) @validator("center", pre=True)
def center_str_to_tuple(cls, v: str) -> Optional[Tuple[float, float]]: def center_str_to_tuple(cls, v: str) -> Optional[Tuple[float, float]]:
if v and (splitted := v.split(",")): if v and (splitted := v.split(",")):
return tuple(map(float, splitted)) return tuple(map(float, splitted)) # type: ignore
@validator("zoom", pre=True) @validator("zoom", pre=True)
def zoom_str_to_float(cls, v: str): def zoom_str_to_float(cls, v: str):

View File

@ -3,7 +3,7 @@ from __future__ import annotations
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
from httpx import Response, AsyncClient from httpx import Response, AsyncClient
from ...logger import logger
from .exc import StatusError from .exc import StatusError
from .models import ( from .models import (
Spot, Spot,
@ -17,7 +17,7 @@ from .models import (
) )
API_CLIENT = AsyncClient( API_CLIENT = AsyncClient(
base_url="https://api-takumi.mihoyo.com/common/map_user/ys_obc/v1/map" base_url="https://waf-api-takumi.mihoyo.com/common/map_user/ys_obc"
) )
Spots = Dict[int, List[Spot]] Spots = Dict[int, List[Spot]]
@ -25,16 +25,19 @@ Spots = Dict[int, List[Spot]]
async def _request( async def _request(
endpoint: str, client: AsyncClient = API_CLIENT endpoint: str, client: AsyncClient = API_CLIENT
) -> Dict[str, Any]: ) -> Dict[str, Any]:
logger.info(f"[API] 正在访问 {endpoint}")
while True: while True:
try: try:
resp = await client.get(endpoint) resp = await client.get(endpoint)
resp.raise_for_status() resp.raise_for_status()
data: Dict[str, Any] = resp.json() data: Dict[str, Any] = resp.json()
logger.info(f"[API] {data}")
if data["retcode"] != 0: if data["retcode"] != 0:
raise StatusError(data["retcode"], data["message"]) raise StatusError(data["retcode"], data["message"])
return data["data"] return data["data"]
except Exception as e: except Exception as e:
if "Timeout" in str(e): if "Timeout" in str(e):
logger.warning(f"[API] {e}")
continue continue
@ -49,7 +52,7 @@ async def get_labels(map_id: MapID) -> List[Tree]:
返回 返回
`list[Tree]` `list[Tree]`
""" """
data = await _request(f"/label/tree?map_id={map_id}&app_sn=ys_obc") data = await _request(f"/v2/map/label/tree?map_id={map_id}&app_sn=ys_obc")
return [Tree.parse_obj(i) for i in data["tree"]] return [Tree.parse_obj(i) for i in data["tree"]]
@ -64,7 +67,7 @@ async def get_points(map_id: MapID) -> List[Point]:
返回 返回
`list[Point]` `list[Point]`
""" """
data = await _request(f"/point/list?map_id={map_id}&app_sn=ys_obc") data = await _request(f"/v3/map/point/list?map_id={map_id}&app_sn=ys_obc")
return [Point.parse_obj(i) for i in data["point_list"]] return [Point.parse_obj(i) for i in data["point_list"]]
@ -79,7 +82,9 @@ async def get_maps(map_id: MapID) -> MapInfo:
返回 返回
`MapInfo` `MapInfo`
""" """
data = await _request(f"/info?map_id={map_id}&app_sn=ys_obc&lang=zh-cn") data = await _request(
f"/v3/map/info?map_id={map_id}&app_sn=ys_obc&lang=zh-cn"
)
return MapInfo.parse_obj(data["info"]) return MapInfo.parse_obj(data["info"])
@ -111,7 +116,7 @@ async def get_spot_from_game(
# 1. 申请刷新 # 1. 申请刷新
resp = await API_CLIENT.post( resp = await API_CLIENT.post(
"/spot_kind/sync_game_spot", "/v1/map/spot_kind/sync_game_spot",
json={ json={
"map_id": str(map_id.value), "map_id": str(map_id.value),
"app_sn": "ys_obc", "app_sn": "ys_obc",
@ -123,7 +128,7 @@ async def get_spot_from_game(
# 2. 获取类别 # 2. 获取类别
resp = await API_CLIENT.get( resp = await API_CLIENT.get(
"/spot_kind/get_spot_kinds?map_id=2&app_sn=ys_obc&lang=zh-cn", "/v1/map/spot_kind/get_spot_kinds?map_id=2&app_sn=ys_obc&lang=zh-cn",
headers={"Cookie": cookie}, headers={"Cookie": cookie},
) )
data = _raise_for_retcode(resp) data = _raise_for_retcode(resp)
@ -132,7 +137,7 @@ async def get_spot_from_game(
# 3.获取坐标 # 3.获取坐标
resp = await API_CLIENT.post( resp = await API_CLIENT.post(
"/spot/get_map_spots_by_kinds", "/v1/map/spot/get_map_spots_by_kinds",
json={ json={
"map_id": str(map_id.value), "map_id": str(map_id.value),
"app_sn": "ys_obc", "app_sn": "ys_obc",
@ -159,7 +164,7 @@ async def get_page_label(map_id: MapID) -> List[PageLabel]:
`list[PageLabel]` `list[PageLabel]`
""" """
data = await _request( data = await _request(
f"/get_map_pageLabel?map_id={map_id}&app_sn=ys_obc&lang=zh-cn", f"/v1/map/get_map_pageLabel?map_id={map_id}&app_sn=ys_obc&lang=zh-cn",
) )
return [PageLabel.parse_obj(i) for i in data["list"]] return [PageLabel.parse_obj(i) for i in data["list"]]

View File

@ -4,7 +4,7 @@ from math import ceil
from io import BytesIO from io import BytesIO
from typing import List, Tuple, Union from typing import List, Tuple, Union
from asyncio import gather, create_task from asyncio import gather, create_task
from ...logger import logger
from PIL import Image from PIL import Image
from httpx import AsyncClient from httpx import AsyncClient
@ -14,7 +14,8 @@ CLIENT = AsyncClient()
async def get_img(url: str) -> Image.Image: async def get_img(url: str) -> Image.Image:
resp = await CLIENT.get(url) logger.info(f"[API] 正在下载 {url}")
resp = await CLIENT.get(url, timeout=600)
resp.raise_for_status() resp.raise_for_status()
return Image.open(BytesIO(resp.read())) return Image.open(BytesIO(resp.read()))
@ -39,7 +40,7 @@ async def make_map(map: Maps) -> Image.Image:
另见 另见
`get_map_by_pos` `get_map_by_pos`
""" """
img = Image.new("RGBA", tuple(map.total_size)) img = Image.new("RGBA", tuple(map.total_size)) # type: ignore
x = 0 x = 0
y = 0 y = 0
maps: List[Image.Image] = await gather( maps: List[Image.Image] = await gather(

View File

@ -1,5 +1,24 @@
import aiofiles import aiofiles
import aiohttp import aiohttp
from httpx import AsyncClient
from PIL import Image
import asyncio
from .logger import logger
from pathlib import Path
slice_path = Path(__file__).parent / 'slice_data'
slice_path.mkdir(parents=True, exist_ok=True)
BASE = 'https://act-webstatic.mihoyo.com/ys-map-op/map'
world = {
2: '/2/253e4ea4c79eb920429e26720cebf6ef',
7: '/7/2d0a83cf40ca8f5a2ef0b1a5199fc407',
9: '/9/96733f1194aed673f3cdafee4f56b2d2',
34: '/34/9af6a4747bab91f96c598f8e8a9b7ce5',
}
x, y = 0, 0
async def download_file(url, save_path): async def download_file(url, save_path):
@ -8,3 +27,68 @@ async def download_file(url, save_path):
if response.status == 200: if response.status == 200:
async with aiofiles.open(save_path, "wb") as f: async with aiofiles.open(save_path, "wb") as f:
await f.write(await response.read()) await f.write(await response.read())
async def download_P0_img(
client: AsyncClient,
map_id: int,
i: int,
j: int,
):
logger.info(f'当前尝试请求:[{map_id}] | {i} {j}')
global x, y
if map_id not in world:
logger.warning(f'地图 {map_id} 不存在!')
return
URL = BASE + world[map_id] + '/{}_P0.webp'
resp = await client.get(URL.format(f'{i}_{j}'))
if resp.status_code != 200:
return
if x < i:
x = i
if y < j:
y = j
async with aiofiles.open(slice_path / f'{map_id}_{i}_{j}.webp', 'wb') as f:
await f.write(resp.read())
logger.info(f'请求成功,文件 [{map_id}] | {i}_{j}.webp 已保存!')
async def make_P0_map(map_id: int) -> Image.Image:
global x, y
async with AsyncClient() as client:
TASK = []
for i in range(0, 72):
for j in range(0, 72):
if (slice_path / f'{map_id}_{i}_{j}.webp').exists():
logger.info(f'文件 {map_id}_{i}_{j}.webp 已存在!跳过下载..')
if x < i:
x = i
if y < j:
y = j
continue
TASK.append(download_P0_img(client, map_id, i, j))
if len(TASK) >= 15:
await asyncio.gather(*TASK)
await asyncio.sleep(0.5)
TASK.clear()
if TASK:
await asyncio.gather(*TASK)
TASK.clear()
big_img = Image.new('RGBA', (x * 256 + 2048, y * 256 + 1024))
logger.info(f'{map_id}切片下载完成, 开始合并】x: {x}, y: {y}')
for i in range(x):
for j in range(y):
logger.info(f'合并: {i} {j}')
img = Image.open(slice_path / f'{map_id}_{i}_{j}.webp')
img = img.convert('RGBA')
big_img.paste(img, (i * 256 + 2048, j * 256 + 1024), img)
return big_img

View File

@ -9,7 +9,7 @@ from PIL import Image
from .GenshinMap.genshinmap import img, models, request, utils from .GenshinMap.genshinmap import img, models, request, utils
from .logger import logger from .logger import logger
from .download import download_file from .download import download_file, make_P0_map
Image.MAX_IMAGE_PIXELS = 603120000 Image.MAX_IMAGE_PIXELS = 603120000
router = APIRouter(prefix="/get_map") router = APIRouter(prefix="/get_map")
@ -22,7 +22,8 @@ CHASM_PATH = MAP / "chasm.png"
ENKANOMIYA_PATH = MAP / "enkanomiya.png" ENKANOMIYA_PATH = MAP / "enkanomiya.png"
TEYVAT_PATH = MAP / "teyvat.png" TEYVAT_PATH = MAP / "teyvat.png"
with open(Path(__file__).parent / "map.yaml", "r", encoding="utf-8") as ymlfile: _path = Path(__file__).parent / "map.yaml"
with open(_path, "r", encoding="utf-8") as ymlfile:
resource_aliases = yaml.load(ymlfile, Loader=yaml.SafeLoader) resource_aliases = yaml.load(ymlfile, Loader=yaml.SafeLoader)
MAP_ID_DICT = { MAP_ID_DICT = {
@ -58,9 +59,13 @@ async def create_genshin_map():
mark_trans = utils.get_points_by_id(3, points) mark_trans = utils.get_points_by_id(3, points)
# 转换两个锚点为标准坐标 # 转换两个锚点为标准坐标
mark_god_converted = utils.convert_pos(mark_god, maps.detail.origin) mark_god_converted = utils.convert_pos(mark_god, maps.detail.origin)
mark_trans_converted = utils.convert_pos(mark_trans, maps.detail.origin) mark_trans_converted = utils.convert_pos(
mark_trans,
maps.detail.origin,
)
maps = await request.get_maps(map_id) maps = await request.get_maps(map_id)
map_img = await utils.make_map(maps.detail) # map_img = await utils.make_map(maps.detail)
map_img = await make_P0_map(maps.id)
for mark_god_point in mark_god_converted: for mark_god_point in mark_god_converted:
map_img.paste( map_img.paste(
mark_god_pic, mark_god_pic,
@ -78,22 +83,39 @@ async def create_genshin_map():
map_img.save(MAP / f"{map_id.name}.png") map_img.save(MAP / f"{map_id.name}.png")
logger.info("****************** 开始绘制 *****************") logger.info("****************** 开始绘制 *****************")
trees = await request.get_labels(map_id) trees = await request.get_labels(map_id)
# for tree in trees: '''
# for label in tree.children: for tree in trees:
# await get_map_response("PRE-START", label.name, map_id, False) for label in tree.children:
await get_map_response(
"PRE-START",
label.name,
map_id,
False,
)
'''
# 改成并发 # 改成并发
import asyncio import asyncio
tasks = [] tasks = []
for tree in trees: for tree in trees:
for label in tree.children: for label in tree.children:
tasks.append(get_map_response("PRE-START", label.name, map_id, False)) tasks.append(
get_map_response(
"PRE-START",
label.name,
map_id,
False,
)
)
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
logger.info("****************** 开始地图API服务 *****************") logger.info("****************** 开始地图API服务 *****************")
async def get_map_response( async def get_map_response(
prefix: str, resource_name: str, map_id: models.MapID, is_cluster: bool = False prefix: str,
resource_name: str,
map_id: models.MapID,
is_cluster: bool = False,
) -> Optional[Path]: ) -> Optional[Path]:
# 寻找主地图的缓存 # 寻找主地图的缓存
map_path = MAP / f"{map_id.name}.png" map_path = MAP / f"{map_id.name}.png"
@ -134,7 +156,10 @@ async def get_map_response(
transmittable = utils.get_points_by_id(resource_id, points) transmittable = utils.get_points_by_id(resource_id, points)
# 转换坐标 # 转换坐标
transmittable_converted = utils.convert_pos(transmittable, maps.detail.origin) transmittable_converted = utils.convert_pos(
transmittable,
maps.detail.origin,
)
# 进行最密点获取 # 进行最密点获取
if is_cluster: if is_cluster:
@ -200,15 +225,15 @@ async def get_map_response(
if not icon_path.exists(): if not icon_path.exists():
await download_file(icon, icon_path) await download_file(icon, icon_path)
icon_pic = Image.open(icon_path).resize((52, 52)) icon_pic = Image.open(icon_path).resize((52, 52))
except: except: # noqa: E722
await download_file(icon, icon_path) await download_file(icon, icon_path)
continue continue
break break
if point.s == 1: if point.s == 1: # type: ignore
z = 1 z = 1
else: else:
z = point.z z = point.z # type: ignore
if z <= 3: if z <= 3:
mark = Image.open(TEXT_PATH / f"mark_{z}.png") mark = Image.open(TEXT_PATH / f"mark_{z}.png")
@ -216,16 +241,16 @@ async def get_map_response(
mark = Image.open(TEXT_PATH / "mark_B.png") mark = Image.open(TEXT_PATH / "mark_B.png")
_m = None _m = None
if point.s == 1: if point.s == 1: # type: ignore
_m = Image.open(TEXT_PATH / "B.png") _m = Image.open(TEXT_PATH / "B.png")
elif point.s == 3: elif point.s == 3: # type: ignore
_m = Image.open(TEXT_PATH / "W.png") _m = Image.open(TEXT_PATH / "W.png")
if _m is not None: if _m is not None:
mark.paste(_m, (13, 50), _m) mark.paste(_m, (13, 50), _m)
mark.paste(icon_pic, (25, 17), icon_pic) mark.paste(icon_pic, (25, 17), icon_pic)
mark_size = (40, 40) mark_size = (70, 70)
mark = mark.resize(mark_size) mark = mark.resize(mark_size)
genshin_map.paste( genshin_map.paste(
@ -245,7 +270,7 @@ async def get_map_response(
# genshin_map.save(result_buffer, format='PNG', quality=80, subsampling=0) # genshin_map.save(result_buffer, format='PNG', quality=80, subsampling=0)
# 进行保存 # 进行保存
genshin_map.save(save_path, "JPEG", quality=90) genshin_map.save(save_path, "JPEG", quality=95)
logger.info(f"{prefix} [查询成功]:新增缓存 [{save_path.name}]") logger.info(f"{prefix} [查询成功]:新增缓存 [{save_path.name}]")
return save_path return save_path
@ -262,7 +287,9 @@ async def get_map_by_point(
resource_name = resource_name.lower() resource_name = resource_name.lower()
for m in resource_aliases: for m in resource_aliases:
for a in resource_aliases[m]: for a in resource_aliases[m]:
if resource_name == a or resource_name in resource_aliases[m][a]: if resource_name == a:
return a
if resource_name in resource_aliases[m][a]:
return a return a
return resource_name return resource_name