支持生成5.0纳塔地图

This commit is contained in:
KimigaiiWuyi 2024-08-29 22:07:00 +08:00
parent 67186f8610
commit dbe40bdca0
3 changed files with 60 additions and 57 deletions

3
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,3 @@
{
"python.languageServer": "None"
}

View File

@ -12,7 +12,7 @@ slice_path.mkdir(parents=True, exist_ok=True)
BASE = 'https://act-webstatic.mihoyo.com/ys-map-op/map' BASE = 'https://act-webstatic.mihoyo.com/ys-map-op/map'
world = { world = {
2: '/2/253e4ea4c79eb920429e26720cebf6ef', 2: '/2/c64d14ffe710540c50df8df05c96f8b5',
7: '/7/2d0a83cf40ca8f5a2ef0b1a5199fc407', 7: '/7/2d0a83cf40ca8f5a2ef0b1a5199fc407',
9: '/9/96733f1194aed673f3cdafee4f56b2d2', 9: '/9/96733f1194aed673f3cdafee4f56b2d2',
34: '/34/9af6a4747bab91f96c598f8e8a9b7ce5', 34: '/34/9af6a4747bab91f96c598f8e8a9b7ce5',
@ -67,8 +67,8 @@ async def make_P0_map(map_id: int) -> Image.Image:
async with AsyncClient() as client: async with AsyncClient() as client:
TASK = [] TASK = []
for i in range(0, 72): for i in range(0, 99):
for j in range(0, 72): for j in range(0, 99):
if (slice_path / f'{map_id}_{i}_{j}.webp').exists(): if (slice_path / f'{map_id}_{i}_{j}.webp').exists():
logger.info(f'文件 {map_id}_{i}_{j}.webp 已存在!跳过下载..') logger.info(f'文件 {map_id}_{i}_{j}.webp 已存在!跳过下载..')
if x < i: if x < i:
@ -76,8 +76,8 @@ async def make_P0_map(map_id: int) -> Image.Image:
if y < j: if y < j:
y = j y = j
continue continue
else:
TASK.append(download_P0_img(client, map_id, i, j)) TASK.append(download_P0_img(client, map_id, i, j))
if len(TASK) >= 15: if len(TASK) >= 15:
await asyncio.gather(*TASK) await asyncio.gather(*TASK)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
@ -87,7 +87,7 @@ async def make_P0_map(map_id: int) -> Image.Image:
TASK.clear() TASK.clear()
if map_id == 2: if map_id == 2:
ox, oy = 2048, 1024 ox, oy = -2048, -1024
else: else:
ox, oy = 0, 0 ox, oy = 0, 0
big_img = Image.new('RGBA', (x * 256 + ox, y * 256 + oy)) big_img = Image.new('RGBA', (x * 256 + ox, y * 256 + oy))

View File

@ -12,25 +12,25 @@ from .logger import logger
from .download import download_file, make_P0_map from .download import download_file, make_P0_map
Image.MAX_IMAGE_PIXELS = 603120000 Image.MAX_IMAGE_PIXELS = 603120000
router = APIRouter(prefix="/get_map") router = APIRouter(prefix='/get_map')
TEXT_PATH = Path(__file__).parent / "texture2d" TEXT_PATH = Path(__file__).parent / 'texture2d'
mark_quest = Image.open(TEXT_PATH / "mark_quest.png").resize((32, 32)) mark_quest = Image.open(TEXT_PATH / 'mark_quest.png').resize((32, 32))
MAP = Path(__file__).parent / "map_data" MAP = Path(__file__).parent / 'map_data'
RESOURCE_PATH = Path(__file__).parent / "resource_data" RESOURCE_PATH = Path(__file__).parent / 'resource_data'
ICON_PATH = Path(__file__).parent / "icon_data" ICON_PATH = Path(__file__).parent / 'icon_data'
CHASM_PATH = MAP / "chasm.png" CHASM_PATH = MAP / 'chasm.png'
ENKANOMIYA_PATH = MAP / "enkanomiya.png" ENKANOMIYA_PATH = MAP / 'enkanomiya.png'
TEYVAT_PATH = MAP / "teyvat.png" TEYVAT_PATH = MAP / 'teyvat.png'
_path = Path(__file__).parent / "map.yaml" _path = Path(__file__).parent / 'map.yaml'
with open(_path, "r", encoding="utf-8") as ymlfile: with open(_path, 'r', encoding='utf-8') as ymlfile:
resource_aliases = yaml.load(ymlfile, Loader=yaml.SafeLoader) resource_aliases = yaml.load(ymlfile, Loader=yaml.SafeLoader)
MAP_ID_DICT = { MAP_ID_DICT = {
"2": models.MapID.teyvat, # 提瓦特 '2': models.MapID.teyvat, # 提瓦特
"9": models.MapID.chasm, # 层岩巨渊 '9': models.MapID.chasm, # 层岩巨渊
"7": models.MapID.enkanomiya, # 渊下宫 '7': models.MapID.enkanomiya, # 渊下宫
"34": models.MapID.sea_of_bygone_eras, # 旧日之海 '34': models.MapID.sea_of_bygone_eras, # 旧日之海
# MapID.golden_apple_archipelago, # 金苹果群岛 # MapID.golden_apple_archipelago, # 金苹果群岛
} }
@ -38,7 +38,7 @@ if not ICON_PATH.exists():
ICON_PATH.mkdir(exist_ok=True) ICON_PATH.mkdir(exist_ok=True)
@router.on_event("startup") @router.on_event('startup')
async def create_genshin_map(): async def create_genshin_map():
if ( if (
CHASM_PATH.exists() CHASM_PATH.exists()
@ -46,11 +46,11 @@ async def create_genshin_map():
and TEYVAT_PATH.exists() and TEYVAT_PATH.exists()
and RESOURCE_PATH.exists() and RESOURCE_PATH.exists()
): ):
logger.info("****************** 开始地图API服务 *****************") logger.info('****************** 开始地图API服务 *****************')
return return
logger.info("****************** 地图API服务进行初始化 *****************") logger.info('****************** 地图API服务进行初始化 *****************')
mark_god_pic = Image.open(TEXT_PATH / "mark_god.png") mark_god_pic = Image.open(TEXT_PATH / 'mark_god.png')
mark_trans_pic = Image.open(TEXT_PATH / "mark_trans.png") mark_trans_pic = Image.open(TEXT_PATH / 'mark_trans.png')
for map_id in models.MapID: for map_id in models.MapID:
maps = await request.get_maps(map_id) maps = await request.get_maps(map_id)
points = await request.get_points(map_id) points = await request.get_points(map_id)
@ -81,14 +81,14 @@ async def create_genshin_map():
) )
if not MAP.exists(): if not MAP.exists():
MAP.mkdir() MAP.mkdir()
map_img.save(MAP / f"{map_id.name}.png") map_img.save(MAP / f'{map_id.name}.png')
logger.info("****************** 开始绘制 *****************") logger.info('****************** 开始绘制 *****************')
trees = await request.get_labels(map_id) trees = await request.get_labels(map_id)
''' '''
for tree in trees: for tree in trees:
for label in tree.children: for label in tree.children:
await get_map_response( await get_map_response(
"PRE-START", 'PRE-START',
label.name, label.name,
map_id, map_id,
False, False,
@ -102,14 +102,14 @@ async def create_genshin_map():
for label in tree.children: for label in tree.children:
tasks.append( tasks.append(
get_map_response( get_map_response(
"PRE-START", 'PRE-START',
label.name, label.name,
map_id, map_id,
False, False,
) )
) )
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
logger.info("****************** 开始地图API服务 *****************") logger.info('****************** 开始地图API服务 *****************')
async def get_map_response( async def get_map_response(
@ -119,21 +119,21 @@ async def get_map_response(
is_cluster: bool = False, is_cluster: bool = False,
) -> Optional[Path]: ) -> Optional[Path]:
# 寻找主地图的缓存 # 寻找主地图的缓存
map_path = MAP / f"{map_id.name}.png" map_path = MAP / f'{map_id.name}.png'
if "/" in resource_name: if '/' in resource_name:
resource_name = resource_name.replace("/", "_") resource_name = resource_name.replace('/', '_')
# 寻找保存点 # 寻找保存点
if not RESOURCE_PATH.exists(): if not RESOURCE_PATH.exists():
RESOURCE_PATH.mkdir() RESOURCE_PATH.mkdir()
if is_cluster: if is_cluster:
save_path = RESOURCE_PATH / f"{map_id.name}_{resource_name}_KMEANS.jpg" save_path = RESOURCE_PATH / f'{map_id.name}_{resource_name}_KMEANS.jpg'
else: else:
save_path = RESOURCE_PATH / f"{map_id.name}_{resource_name}.jpg" save_path = RESOURCE_PATH / f'{map_id.name}_{resource_name}.jpg'
# 如果存在缓存,直接回复 # 如果存在缓存,直接回复
if save_path.exists(): if save_path.exists():
logger.info(f"{prefix} [查询成功]:发送缓存 [{save_path.name}]") logger.info(f'{prefix} [查询成功]:发送缓存 [{save_path.name}]')
return save_path return save_path
maps = await request.get_maps(map_id) maps = await request.get_maps(map_id)
@ -193,7 +193,7 @@ async def get_map_response(
] ]
] ]
logger.info(f"{prefix} [新增缓存]:开始绘制 {save_path.name}...") logger.info(f'{prefix} [新增缓存]:开始绘制 {save_path.name}...')
# 打开主地图 # 打开主地图
genshin_map = Image.open(map_path) genshin_map = Image.open(map_path)
@ -220,7 +220,7 @@ async def get_map_response(
int(point.y) - int(lt_point.y), int(point.y) - int(lt_point.y),
) )
icon_path = ICON_PATH / f"{resource_name}.png" icon_path = ICON_PATH / f'{resource_name}.png'
while True: while True:
try: try:
if not icon_path.exists(): if not icon_path.exists():
@ -237,15 +237,15 @@ async def get_map_response(
z = point.z # type: ignore z = point.z # type: ignore
if z <= 3: if z <= 3:
mark = Image.open(TEXT_PATH / f"mark_{z}.png") mark = Image.open(TEXT_PATH / f'mark_{z}.png')
else: else:
mark = Image.open(TEXT_PATH / "mark_B.png") mark = Image.open(TEXT_PATH / 'mark_B.png')
_m = None _m = None
if point.s == 1: # type: ignore if point.s == 1: # type: ignore
_m = Image.open(TEXT_PATH / "B.png") _m = Image.open(TEXT_PATH / 'B.png')
elif point.s == 3: # type: ignore elif point.s == 3: # type: ignore
_m = Image.open(TEXT_PATH / "W.png") _m = Image.open(TEXT_PATH / 'W.png')
if _m is not None: if _m is not None:
mark.paste(_m, (13, 50), _m) mark.paste(_m, (13, 50), _m)
@ -264,21 +264,21 @@ async def get_map_response(
) )
# 转换RGB图 # 转换RGB图
genshin_map = genshin_map.convert("RGB") genshin_map = genshin_map.convert('RGB')
# 转为Bytes,暂时废弃 # 转为Bytes,暂时废弃
# result_buffer = BytesIO() # result_buffer = BytesIO()
# genshin_map.save(result_buffer, format='PNG', quality=80, subsampling=0) # genshin_map.save(result_buffer, format='PNG', quality=80, subsampling=0)
# 进行保存 # 进行保存
genshin_map.save(save_path, "JPEG", quality=95) genshin_map.save(save_path, 'JPEG', quality=95)
logger.info(f"{prefix} [查询成功]:新增缓存 [{save_path.name}]") logger.info(f'{prefix} [查询成功]:新增缓存 [{save_path.name}]')
return save_path return save_path
@router.get("") @router.get('')
async def get_map_by_point( async def get_map_by_point(
resource_name: str = "甜甜花", resource_name: str = '甜甜花',
map_id: Union[str, int] = 0, map_id: Union[str, int] = 0,
is_cluster: bool = False, is_cluster: bool = False,
): ):
@ -297,18 +297,18 @@ async def get_map_by_point(
# 判断别名 # 判断别名
resource_name = resource_aliases_to_name(resource_name) resource_name = resource_aliases_to_name(resource_name)
prefix = f">> [请求序列:{req_id}]" prefix = f'>> [请求序列:{req_id}]'
logger.info( logger.info(
f'{prefix} [查询请求]:在地图 ID {map_id or "auto"} 内查询 {resource_name}...' f'{prefix} [查询请求]:在地图 ID {map_id or 'auto'} 内查询 {resource_name}...'
) )
if map_id: if map_id:
# 校验 map_id 有效性 # 校验 map_id 有效性
if map_id not in MAP_ID_DICT: if map_id not in MAP_ID_DICT:
logger.warning(f"{prefix} [失败]:地图 ID - {map_id} 不存在!") logger.warning(f'{prefix} [失败]:地图 ID - {map_id} 不存在!')
return { return {
"retcode": -1, 'retcode': -1,
"message": f"地图 ID - {map_id} 不存在!", 'message': f'地图 ID - {map_id} 不存在!',
} }
maps = [MAP_ID_DICT[str(map_id)]] maps = [MAP_ID_DICT[str(map_id)]]
else: else:
@ -321,10 +321,10 @@ async def get_map_by_point(
return FileResponse(res) return FileResponse(res)
if len(maps) > 1: if len(maps) > 1:
logger.info( logger.info(
f"{prefix} [自动重试]:地图 ID {map._value_} 内不存在 {resource_name}..." f'{prefix} [自动重试]:地图 ID {map._value_} 内不存在 {resource_name}...'
) )
logger.warning(f"{prefix} [失败]:资源点 - {resource_name} 不存在!") logger.warning(f'{prefix} [失败]:资源点 - {resource_name} 不存在!')
return { return {
"retcode": -1, 'retcode': -1,
"message": f"资源点 - {resource_name} 不存在!", 'message': f'资源点 - {resource_name} 不存在!',
} }