StarRailCopilot/dev_tools/route_extract.py

491 lines
19 KiB
Python
Raw Normal View History

2023-09-23 22:52:52 +00:00
import os
import re
2023-10-01 15:03:57 +00:00
from typing import Iterator
2023-09-23 22:52:52 +00:00
2023-10-01 15:03:57 +00:00
import numpy as np
from tqdm import tqdm
2023-09-23 22:52:52 +00:00
2023-10-01 15:03:57 +00:00
from module.base.code_generator import CodeGenerator, MarkdownGenerator
from module.base.decorator import cached_property
from module.base.utils import SelectedGrids, load_image
from module.config.utils import iter_folder
from tasks.map.route.model import RouteModel
from tasks.rogue.route.model import RogueRouteListModel, RogueRouteModel, RogueWaypointListModel, RogueWaypointModel
2023-09-23 22:52:52 +00:00
class RouteExtract:
def __init__(self, folder):
self.folder = folder
2023-10-01 15:03:57 +00:00
def iter_files(self) -> Iterator[str]:
2023-09-23 22:52:52 +00:00
for path, folders, files in os.walk(self.folder):
path = path.replace('\\', '/')
for file in files:
if file.endswith('.py'):
yield f'{path}/{file}'
2023-10-01 15:03:57 +00:00
def extract_route(self, file) -> Iterator[RouteModel]:
2023-09-23 22:52:52 +00:00
print(f'Extract {file}')
with open(file, 'r', encoding='utf-8') as f:
content = f.read()
"""
def route_item_enemy(self):
self.enter_himeko_trial()
self.map_init(plane=Jarilo_BackwaterPass, position=(519.9, 361.5))
"""
regex = re.compile(
r'def (?P<func>[a-zA-Z0-9_]*?)\(self\):.*?'
2023-10-01 15:03:57 +00:00
r'self\.map_init\((.*?)\)',
re.DOTALL)
2023-09-23 22:52:52 +00:00
file = file.replace(self.folder, '').replace('.py', '').replace('/', '_').strip('_')
2023-10-01 15:03:57 +00:00
module = f"{self.folder.strip('./').replace('/', '.')}.{file}"
2023-09-23 22:52:52 +00:00
for result in regex.findall(content):
func, data = result
res = re.search(r'plane=([a-zA-Z_]*)', data)
if res:
plane = res.group(1)
else:
# Must contain plane
continue
res = re.search(r'floor=([\'"a-zA-Z0-9_]*)', data)
if res:
floor = res.group(1).strip('"\'')
else:
floor = 'F1'
res = re.search(r'position=\(([0-9.]*)[, ]+([0-9.]*)', data)
if res:
position = (float(res.group(1)), float(res.group(2)))
else:
position = None
2023-10-01 15:03:57 +00:00
name = f'{file}__{func}'
yield RouteModel(
name=name,
2023-09-23 22:52:52 +00:00
route=f'{module}:{func}',
plane=plane,
floor=floor,
position=position,
)
2023-10-01 15:03:57 +00:00
def iter_route(self):
"""
Yields:
RouteData
"""
for f in self.iter_files():
for row in self.extract_route(f):
yield row
2023-09-23 22:52:52 +00:00
def write(self, file):
gen = CodeGenerator()
gen.Import("""
2023-10-01 15:03:57 +00:00
from tasks.map.route.model import RouteModel
2023-09-23 22:52:52 +00:00
""")
gen.CommentAutoGenerage('dev_tools.route_extract')
2023-10-01 15:03:57 +00:00
for row in self.iter_route():
with gen.Object(key=row.name, object_class='RouteModel'):
for key, value in row.__iter__():
gen.ObjectAttr(key, value)
2023-09-23 22:52:52 +00:00
gen.write(file)
2023-10-01 15:03:57 +00:00
def model_to_json(model, file):
content = model.model_dump_json(indent=2)
2024-01-06 10:40:27 +00:00
with open(file, 'w', encoding='utf-8', newline='') as f:
2023-10-01 15:03:57 +00:00
f.write(content)
regex_posi = re.compile(r'_?X(\d+)Y(\d+)')
def get_position_from_name(name):
res = regex_posi.search(name)
if res:
position = int(res.group(1)), int(res.group(2))
else:
position = (0, 0)
return position
def position2direction(target, origin):
"""
Args:
target: Target position (x, y)
origin: Origin position (x, y)
Returns:
float: Direction from current position to target position (0~360)
"""
diff = np.subtract(target, origin)
distance = np.linalg.norm(diff)
if distance < 0.05:
return 0
theta = np.rad2deg(np.arccos(-diff[1] / distance))
if diff[0] < 0:
theta = 360 - theta
theta = round(theta, 3)
return theta
def swap_exit(exit_, exit1, exit2):
diff = position2direction(exit1.position, exit_.position) - position2direction(exit2.position, exit_.position)
diff = diff % 360
if diff > 180:
diff -= 360
if diff < 0:
return exit1, exit2
else:
return exit2, exit1
2023-10-01 15:03:57 +00:00
class RouteDetect:
GEN_END = '===== End of generated waypoints ====='
def __init__(self, folder):
self.folder = os.path.abspath(folder)
print(self.folder)
self.waypoints = SelectedGrids(list(self.iter_image()))
@cached_property
def detector(self):
2023-10-02 09:14:33 +00:00
from tasks.rogue.route.loader import MinimapWrapper
2023-10-01 15:03:57 +00:00
return MinimapWrapper()
def get_minimap(self, route: RogueWaypointModel):
return self.detector.all_minimap[route.plane_floor]
def iter_image(self) -> Iterator[RogueWaypointModel]:
for domain_folder in iter_folder(self.folder, is_dir=True):
domain = os.path.basename(domain_folder)
for route_folder in iter_folder(domain_folder, is_dir=True):
route = os.path.basename(route_folder)
try:
for image_file in iter_folder(os.path.join(route_folder, 'route'), ext='.png'):
waypoint = os.path.basename(image_file[:-4])
parts = route.split('_', maxsplit=3)
if len(parts) == 4:
world, plane, floor, position = parts
position = get_position_from_name(position)
2023-10-01 15:03:57 +00:00
elif len(parts) == 3:
world, plane, floor = parts
position = (0, 0)
elif len(parts) == 2:
world, plane = parts
floor = 'F1'
position = (0, 0)
else:
continue
file = f'{self.folder}/{domain}/{route}/route/{waypoint}.png'
file_position = get_position_from_name(waypoint)
if file_position != (0, 0):
position = file_position
2023-10-01 15:03:57 +00:00
# waypoint = regex_posi.sub('', waypoint)
elif waypoint != 'spawn':
position = (0, 0)
model = RogueWaypointModel(
domain=domain,
route=route,
waypoint=waypoint,
index=0,
file=file,
plane=f'{world}_{plane}',
floor=floor,
position=position,
direction=0.,
rotation=0,
)
yield model
# deep_set(out, keys=[image.route, image.waypoint], value=image)
except FileNotFoundError:
pass
def predict(self):
for waypoint in tqdm(self.waypoints.grids):
waypoint: RogueWaypointModel = waypoint
minimap = self.get_minimap(waypoint)
im = load_image(waypoint.file)
prev = waypoint.position
minimap.init_position(waypoint.position, show_log=False)
minimap.update(im, show_log=False)
waypoint.position = minimap.position
waypoint.direction = minimap.direction
waypoint.rotation = minimap.rotation
if prev != (0, 0) and np.linalg.norm(np.subtract(waypoint.position, prev)) > 1.5:
if waypoint.is_spawn:
print(f'Position changed: {self.folder}/{waypoint.domain}/{waypoint.route}'
f' -> {waypoint.plane}_{waypoint.floor}_{waypoint.positionXY}')
else:
name = regex_posi.sub('', waypoint.waypoint)
print(f'Position changed: {waypoint.file}'
f' -> {name}_{waypoint.positionXY}')
2023-11-14 17:37:18 +00:00
self.waypoints.create_index('domain', 'route')
2023-10-01 15:03:57 +00:00
# Sort by distance
2023-11-14 17:37:18 +00:00
total = self.waypoints.filter(lambda x: (x.is_DomainCombat or x.is_DomainOccurrence) and x.is_spawn).count
migrated = 0
2023-10-01 15:03:57 +00:00
for waypoints in self.waypoints.indexes.values():
2023-11-14 17:37:18 +00:00
if waypoints.select(is_exit_door=True).count == 2:
migrated += 1
2023-10-01 15:03:57 +00:00
waypoints = self.sort_waypoints(waypoints.grids)
for index, waypoint in enumerate(waypoints):
waypoint.index = index
2023-11-14 17:37:18 +00:00
# Waypoints too far from each other, probably wrong position
diff = SelectedGrids(waypoints).get('position')
diff = np.linalg.norm(np.diff(diff, axis=0), axis=1)
for index in np.where(diff > 120)[0]:
w1, w2 = waypoints[index], waypoints[index + 1]
print(f'WARNING | Waypoint too far away in {w1.route}: {w1.position} -> {w2.position}')
print(f'INFO | Domain exit migrated: {migrated}/{total}')
self.waypoints = self.waypoints.sort('domain', 'route', 'index')
2023-10-01 15:03:57 +00:00
@staticmethod
def sort_waypoints(waypoints: list[RogueWaypointModel]) -> list[RogueWaypointModel]:
waypoints = sorted(waypoints, key=lambda point: point.waypoint)
middle = [point for point in waypoints if point.is_middle]
2023-10-01 15:03:57 +00:00
if not middle:
return waypoints
try:
spawn: RogueWaypointModel = [point for point in waypoints if point.is_spawn][0]
except IndexError:
return waypoints
prev = spawn.position
if prev == (0, 0):
return waypoints
sorted_middle = []
while len(middle):
distance = np.array([point.position for point in middle]) - prev
distance = np.linalg.norm(distance, axis=1)
index = np.argmin(distance)
sorted_middle.append(middle[index])
middle.pop(index)
end = [point for point in waypoints if point.is_exit]
door = [point for point in waypoints if point.is_exit_door]
waypoints = [spawn] + sorted_middle + end + door
2023-10-01 15:03:57 +00:00
return waypoints
def write(self):
waypoints = RogueWaypointListModel(self.waypoints.grids)
model_to_json(waypoints, f'{self.folder}/data.json')
def gen_route(self, waypoints: SelectedGrids):
gen = CodeGenerator()
spawn: RogueWaypointModel = waypoints.select(is_spawn=True).first_or_none()
exit_: RogueWaypointModel = waypoints.select(is_exit=True).first_or_none()
exit1: RogueWaypointModel = waypoints.select(is_exit1=True).first_or_none()
exit2: RogueWaypointModel = waypoints.select(is_exit2=True).first_or_none()
2023-10-01 15:03:57 +00:00
if spawn is None or exit_ is None:
2023-11-16 17:32:32 +00:00
print(f'WARNING | No spawn point or no exit: {waypoints}')
return ''
2023-10-01 15:03:57 +00:00
class WaypointRepr:
def __init__(self, position):
if isinstance(position, RogueWaypointModel):
position = position.position
self.position = tuple(position)
def __repr__(self):
return f'Waypoint({self.position})'
__str__ = __repr__
2023-10-01 15:20:42 +00:00
def call(func, name):
ws = waypoints.filter(lambda x: x.waypoint.startswith(name)).get('waypoint')
2023-10-02 09:19:21 +00:00
if ws:
ws = ', '.join(ws)
gen.add(f'self.{func}({ws})')
2023-10-01 15:03:57 +00:00
with gen.tab():
with gen.Def(name=spawn.route, args='self'):
table = MarkdownGenerator(['Waypoint', 'Position', 'Direction', 'Rotation'])
for waypoint in waypoints:
table.add_row([
waypoint.waypoint,
f'{WaypointRepr(waypoint)},',
waypoint.direction,
waypoint.rotation
])
gen.add('"""')
for row in table.generate():
gen.add(row)
gen.add('"""')
position = tuple(spawn.position)
gen.add(f'self.map_init(plane={spawn.plane}, floor="{spawn.floor}", position={position})')
if spawn.is_DomainBoss or spawn.is_DomainElite or spawn.is_DomainRespite:
# Domain has only 1 exit
pass
elif exit1 and exit2:
exit1, exit2 = swap_exit(exit_, exit1, exit2)
gen.add(f'self.register_domain_exit(')
gen.add(f' {WaypointRepr(exit_)}, end_rotation={exit_.rotation},')
gen.add(f' left_door={WaypointRepr(exit1)}, right_door={WaypointRepr(exit2)})')
2023-10-01 15:03:57 +00:00
else:
gen.add(f'self.register_domain_exit({WaypointRepr(exit_)}, end_rotation={exit_.rotation})')
2023-10-01 15:20:42 +00:00
# Waypoint attributes
for waypoint in waypoints:
if waypoint.is_spawn:
continue
if (waypoint.is_exit or waypoint.is_exit_door) \
and (spawn.is_DomainCombat or spawn.is_DomainOccurrence):
2023-10-01 15:20:42 +00:00
continue
gen.Value(key=waypoint.waypoint, value=WaypointRepr(waypoint))
2023-10-01 15:03:57 +00:00
# Domain specific
if spawn.is_DomainBoss or spawn.is_DomainElite:
gen.Empty()
2023-10-01 15:20:42 +00:00
call('clear_elite', 'enemy')
call('domain_reward', 'reward')
2023-10-01 15:03:57 +00:00
if spawn.is_DomainRespite:
gen.Empty()
2023-10-01 15:20:42 +00:00
call('clear_item', 'item')
call('domain_herta', 'herta')
2023-10-01 15:03:57 +00:00
if spawn.is_DomainOccurrence or spawn.is_DomainTransaction:
gen.Empty()
2023-10-01 15:20:42 +00:00
call('clear_item', 'item')
call('clear_event', 'event')
2023-10-01 15:03:57 +00:00
if spawn.is_DomainBoss or spawn.is_DomainElite or spawn.is_DomainRespite:
# Domain has only 1 exit
2023-10-01 15:20:42 +00:00
call('domain_single_exit', 'exit')
2023-10-01 15:03:57 +00:00
gen.Comment(self.GEN_END)
return gen.generate()
def insert(self, folder, base='tasks.map.route.base'):
# Create folder
self.waypoints.create_index('domain')
for index, waypoints in self.waypoints.indexes.items():
domain = index[0]
os.makedirs(f'{folder}/{domain}', exist_ok=True)
# Create file
self.waypoints.create_index('domain', 'plane', 'floor')
for index, waypoints in self.waypoints.indexes.items():
domain, plane, floor = index
file = f'{folder}/{domain}/{plane}_{floor}.py'
if not os.path.exists(file):
gen = CodeGenerator()
gen.Import(f"""
from {base} import RouteBase
""")
with gen.Class('Route', inherit='RouteBase'):
pass
# gen.Pass()
gen.write(file)
for index, routes in self.waypoints.indexes.items():
domain, plane, floor = index
file = f'{folder}/{domain}/{plane}_{floor}.py'
with open(file, 'r', encoding='utf-8') as f:
content = f.read()
# Add import
if base != 'tasks.map.route.base':
content = content.replace('tasks.map.route.base', base)
p = '_'.join(plane.split('_', maxsplit=2)[:2])
imp = [
'from tasks.map.control.waypoint import Waypoint',
f'from tasks.map.keywords.plane import {p}',
f'from {base} import RouteBase',
][::-1]
res = re.search(r'^(.*?)class Route', content, re.DOTALL)
if res:
head = res.group(1)
for row in imp:
if row not in head:
content = row + '\n' + content
2023-10-23 15:48:23 +00:00
content = content.replace(
'from tasks.rogue.route.base import locked',
'from tasks.map.route.base import locked')
2023-10-01 15:03:57 +00:00
# Replace or add routes
routes.create_index('route')
for waypoints in routes.indexes.values():
spawn = waypoints.select(is_spawn=True).first_or_none()
if spawn is None:
continue
regex = re.compile(rf'def {spawn.route}.*?{self.GEN_END}', re.DOTALL)
res = regex.search(content)
if res:
before = res.group(0).strip()
after = self.gen_route(waypoints).strip()
content = content.replace(before, after)
else:
content += '\n' + self.gen_route(waypoints)
# Sort routes
regex = re.compile(
r'(?=(\n def ([a-zA-Z0-9_]+)\(.*?\n def|\n def ([a-zA-Z0-9_]+)\(.*?$))', re.DOTALL)
funcs = regex.findall(content)
known_routes = [route[0] for route in routes.indexes.keys()]
routes = []
for code, route1, route2 in funcs:
if route1:
route = route1
elif route2:
route = route2
else:
continue
if route not in known_routes:
continue
code = code.removesuffix('\n def').removeprefix('\n')
routes.append((route, code))
sorted_routes = sorted(routes, key=lambda x: get_position_from_name(x[0]))
2023-10-01 15:03:57 +00:00
routes = [route[1] for route in routes]
sorted_routes = [route[1] for route in sorted_routes]
new = ''
for before, after in zip(routes, sorted_routes):
left = content.index(before)
right = left + len(before)
new += content[:left]
new += after
content = content[right:]
new += content
content = new
# Format
2023-10-23 15:48:23 +00:00
content = re.sub(r'[\n ]+ def', '\n\n def', content)
2023-10-01 15:03:57 +00:00
content = content.rstrip('\n') + '\n'
2023-10-23 15:48:23 +00:00
content = re.sub(r' (@[a-zA-Z0-9_().]+)[\n ]+ def', r' \1\n def', content)
2023-10-01 15:03:57 +00:00
# Write
with open(file, 'w', encoding='utf-8', newline='') as f:
f.write(content)
def rogue_extract(folder):
print('rogue_extract')
def iter_route():
for row in RouteExtract(f'{folder}').iter_route():
domain = row.name.split('_', maxsplit=1)[0]
row = RogueRouteModel(domain=domain, **row.model_dump())
row.name = f'{row.domain}_{row.route.split(":")[1]}'
row.route = row.route.replace('_', '.', 1)
yield row
routes = RogueRouteListModel(list(iter_route()))
model_to_json(routes, f'{folder}/route.json')
2023-09-23 22:52:52 +00:00
if __name__ == '__main__':
os.chdir(os.path.join(os.path.dirname(__file__), '../'))
RouteExtract('./route/daily').write('./tasks/map/route/route/daily.py')
2023-10-01 15:03:57 +00:00
self = RouteDetect('../SrcRoute/rogue')
self.predict()
self.write()
self.insert('./route/rogue', base='tasks.rogue.route.base')
2023-10-01 15:20:42 +00:00
rogue_extract('./route/rogue')