import os import re from typing import Iterator import numpy as np from tqdm import tqdm from module.base.code_generator import CodeGenerator, MarkdownGenerator from module.base.decorator import cached_property from module.base.utils import SelectedGrids, load_image from module.config.utils import iter_folder from tasks.map.route.model import RouteModel from tasks.rogue.route.model import RogueRouteListModel, RogueRouteModel, RogueWaypointListModel, RogueWaypointModel class RouteExtract: def __init__(self, folder): self.folder = folder def iter_files(self) -> Iterator[str]: for path, folders, files in os.walk(self.folder): path = path.replace('\\', '/') for file in files: if file.endswith('.py'): yield f'{path}/{file}' def extract_route(self, file) -> Iterator[RouteModel]: print(f'Extract {file}') with open(file, 'r', encoding='utf-8') as f: content = f.read() """ def route_item_enemy(self): self.enter_himeko_trial() self.map_init(plane=Jarilo_BackwaterPass, position=(519.9, 361.5)) """ regex = re.compile( r'def (?P[a-zA-Z0-9_]*?)\(self\):.*?' r'self\.map_init\((.*?)\)', re.DOTALL) file = file.replace(self.folder, '').replace('.py', '').replace('/', '_').strip('_') module = f"{self.folder.strip('./').replace('/', '.')}.{file}" for result in regex.findall(content): func, data = result res = re.search(r'plane=([a-zA-Z_]*)', data) if res: plane = res.group(1) else: # Must contain plane continue res = re.search(r'floor=([\'"a-zA-Z0-9_]*)', data) if res: floor = res.group(1).strip('"\'') else: floor = 'F1' res = re.search(r'position=\(([0-9.]*)[, ]+([0-9.]*)', data) if res: position = (float(res.group(1)), float(res.group(2))) else: position = None name = f'{file}__{func}' yield RouteModel( name=name, route=f'{module}:{func}', plane=plane, floor=floor, position=position, ) def iter_route(self): """ Yields: RouteData """ for f in self.iter_files(): for row in self.extract_route(f): yield row def write(self, file): gen = CodeGenerator() gen.Import(""" from tasks.map.route.model import RouteModel """) gen.CommentAutoGenerage('dev_tools.route_extract') for row in self.iter_route(): with gen.Object(key=row.name, object_class='RouteModel'): for key, value in row.__iter__(): gen.ObjectAttr(key, value) gen.write(file) def model_to_json(model, file): content = model.model_dump_json(indent=2) with open(file, 'w', encoding='utf-8') as f: f.write(content) class RouteDetect: GEN_END = '===== End of generated waypoints =====' def __init__(self, folder): self.folder = os.path.abspath(folder) print(self.folder) self.waypoints = SelectedGrids(list(self.iter_image())) @cached_property def detector(self): from tasks.map.minimap.brute_force import MinimapWrapper return MinimapWrapper() def get_minimap(self, route: RogueWaypointModel): return self.detector.all_minimap[route.plane_floor] def iter_image(self) -> Iterator[RogueWaypointModel]: regex_posi = re.compile(r'_?X(\d+)Y(\d+)') for domain_folder in iter_folder(self.folder, is_dir=True): domain = os.path.basename(domain_folder) for route_folder in iter_folder(domain_folder, is_dir=True): route = os.path.basename(route_folder) try: for image_file in iter_folder(os.path.join(route_folder, 'route'), ext='.png'): waypoint = os.path.basename(image_file[:-4]) parts = route.split('_', maxsplit=3) if len(parts) == 4: world, plane, floor, position = parts res = regex_posi.search(position) if res: position = int(res.group(1)), int(res.group(2)) else: position = (0, 0) elif len(parts) == 3: world, plane, floor = parts position = (0, 0) elif len(parts) == 2: world, plane = parts floor = 'F1' position = (0, 0) else: continue file = f'{self.folder}/{domain}/{route}/route/{waypoint}.png' res = regex_posi.search(waypoint) if res: position = int(res.group(1)), int(res.group(2)) # waypoint = regex_posi.sub('', waypoint) elif waypoint != 'spawn': position = (0, 0) model = RogueWaypointModel( domain=domain, route=route, waypoint=waypoint, index=0, file=file, plane=f'{world}_{plane}', floor=floor, position=position, direction=0., rotation=0, ) yield model # deep_set(out, keys=[image.route, image.waypoint], value=image) except FileNotFoundError: pass def predict(self): regex_posi = re.compile(r'_?X(\d+)Y(\d+)') for waypoint in tqdm(self.waypoints.grids): waypoint: RogueWaypointModel = waypoint minimap = self.get_minimap(waypoint) im = load_image(waypoint.file) prev = waypoint.position minimap.init_position(waypoint.position, show_log=False) minimap.update(im, show_log=False) waypoint.position = minimap.position waypoint.direction = minimap.direction waypoint.rotation = minimap.rotation if prev != (0, 0) and np.linalg.norm(np.subtract(waypoint.position, prev)) > 1.5: if waypoint.is_spawn: print(f'Position changed: {self.folder}/{waypoint.domain}/{waypoint.route}' f' -> {waypoint.plane}_{waypoint.floor}_{waypoint.positionXY}') else: name = regex_posi.sub('', waypoint.waypoint) print(f'Position changed: {waypoint.file}' f' -> {name}_{waypoint.positionXY}') self.waypoints.create_index('route') # Sort by distance for waypoints in self.waypoints.indexes.values(): waypoints = self.sort_waypoints(waypoints.grids) for index, waypoint in enumerate(waypoints): waypoint.index = index self.waypoints = self.waypoints.sort('route', 'index') @staticmethod def sort_waypoints(waypoints: list[RogueWaypointModel]) -> list[RogueWaypointModel]: waypoints = sorted(waypoints, key=lambda point: point.waypoint, reverse=True) middle = [point for point in waypoints if not point.is_spawn and not point.is_exit] if not middle: return waypoints try: spawn: RogueWaypointModel = [point for point in waypoints if point.is_spawn][0] except IndexError: return waypoints prev = spawn.position if prev == (0, 0): return waypoints sorted_middle = [] while len(middle): distance = np.array([point.position for point in middle]) - prev distance = np.linalg.norm(distance, axis=1) index = np.argmin(distance) sorted_middle.append(middle[index]) middle.pop(index) end = [point for point in waypoints if point.is_exit] waypoints = [spawn] + sorted_middle + end return waypoints def write(self): waypoints = RogueWaypointListModel(self.waypoints.grids) model_to_json(waypoints, f'{self.folder}/data.json') def gen_route(self, waypoints: SelectedGrids): gen = CodeGenerator() spawn: RogueWaypointModel = waypoints.select(is_spawn=True).first_or_none() exit_: RogueWaypointModel = waypoints.select(is_exit=True).first_or_none() if spawn is None or exit_ is None: return class WaypointRepr: def __init__(self, position): if isinstance(position, RogueWaypointModel): position = position.position self.position = tuple(position) def __repr__(self): return f'Waypoint({self.position})' __str__ = __repr__ def filter_waypoints(name: str): return lambda x: x.waypoint.startswith(name) def clear_elite(): with gen.Object('self.clear_elite'): for w in waypoints.filter(filter_waypoints('enemy')): gen.ObjectAttr(value=WaypointRepr(w)) def clear_item(): with gen.Object('self.clear_item'): for w in waypoints.filter(filter_waypoints('item')): gen.ObjectAttr(value=WaypointRepr(w)) def clear_event(): with gen.Object('self.clear_event'): for w in waypoints.filter(filter_waypoints('event')): gen.ObjectAttr(value=WaypointRepr(w)) def domain_reward(): with gen.Object('self.domain_reward'): for w in waypoints.filter(filter_waypoints('reward')): gen.ObjectAttr(value=WaypointRepr(w)) def domain_herta(): with gen.Object('self.domain_herta'): for w in waypoints.filter(filter_waypoints('herta')): gen.ObjectAttr(value=WaypointRepr(w)) with gen.tab(): with gen.Def(name=spawn.route, args='self'): table = MarkdownGenerator(['Waypoint', 'Position', 'Direction', 'Rotation']) for waypoint in waypoints: table.add_row([ waypoint.waypoint, f'{WaypointRepr(waypoint)},', waypoint.direction, waypoint.rotation ]) gen.add('"""') for row in table.generate(): gen.add(row) gen.add('"""') position = tuple(spawn.position) gen.add(f'self.map_init(plane={spawn.plane}, floor="{spawn.floor}", position={position})') if spawn.is_DomainBoss or spawn.is_DomainElite or spawn.is_DomainRespite: # Domain has only 1 exit pass else: gen.add(f'self.register_domain_exit({WaypointRepr(exit_)}, end_rotation={exit_.rotation})') # Domain specific if spawn.is_DomainBoss or spawn.is_DomainElite: gen.Empty() clear_elite() domain_reward() if spawn.is_DomainRespite: gen.Empty() clear_item() domain_herta() if spawn.is_DomainOccurrence or spawn.is_DomainTransaction: gen.Empty() clear_item() clear_event() if spawn.is_DomainBoss or spawn.is_DomainElite or spawn.is_DomainRespite: # Domain has only 1 exit with gen.Object('self.domain_single_exit'): gen.ObjectAttr(value=WaypointRepr(exit_)) # Single exit does not need end_rotation # gen.ObjectAttr(key='end_rotation', value=exit_.rotation) gen.Comment(self.GEN_END) return gen.generate() def insert(self, folder, base='tasks.map.route.base'): # Create folder self.waypoints.create_index('domain') for index, waypoints in self.waypoints.indexes.items(): domain = index[0] os.makedirs(f'{folder}/{domain}', exist_ok=True) # Create file self.waypoints.create_index('domain', 'plane', 'floor') for index, waypoints in self.waypoints.indexes.items(): domain, plane, floor = index file = f'{folder}/{domain}/{plane}_{floor}.py' if not os.path.exists(file): gen = CodeGenerator() gen.Import(f""" from {base} import RouteBase """) with gen.Class('Route', inherit='RouteBase'): pass # gen.Pass() gen.write(file) for index, routes in self.waypoints.indexes.items(): domain, plane, floor = index file = f'{folder}/{domain}/{plane}_{floor}.py' with open(file, 'r', encoding='utf-8') as f: content = f.read() # Add import if base != 'tasks.map.route.base': content = content.replace('tasks.map.route.base', base) p = '_'.join(plane.split('_', maxsplit=2)[:2]) imp = [ 'from tasks.map.control.waypoint import Waypoint', f'from tasks.map.keywords.plane import {p}', f'from {base} import RouteBase', ][::-1] res = re.search(r'^(.*?)class Route', content, re.DOTALL) if res: head = res.group(1) for row in imp: if row not in head: content = row + '\n' + content # Replace or add routes routes.create_index('route') for waypoints in routes.indexes.values(): spawn = waypoints.select(is_spawn=True).first_or_none() if spawn is None: continue regex = re.compile(rf'def {spawn.route}.*?{self.GEN_END}', re.DOTALL) res = regex.search(content) if res: before = res.group(0).strip() after = self.gen_route(waypoints).strip() content = content.replace(before, after) else: content += '\n' + self.gen_route(waypoints) # Sort routes regex = re.compile( r'(?=(\n def ([a-zA-Z0-9_]+)\(.*?\n def|\n def ([a-zA-Z0-9_]+)\(.*?$))', re.DOTALL) funcs = regex.findall(content) known_routes = [route[0] for route in routes.indexes.keys()] routes = [] for code, route1, route2 in funcs: if route1: route = route1 elif route2: route = route2 else: continue if route not in known_routes: continue code = code.removesuffix('\n def').removeprefix('\n') routes.append((route, code)) sorted_routes = sorted(routes, key=lambda x: x[0]) routes = [route[1] for route in routes] sorted_routes = [route[1] for route in sorted_routes] new = '' for before, after in zip(routes, sorted_routes): left = content.index(before) right = left + len(before) new += content[:left] new += after content = content[right:] new += content content = new # Format content = re.sub(r'[\n ]+ def', '\n\n def', content, re.DOTALL) content = content.rstrip('\n') + '\n' # Write with open(file, 'w', encoding='utf-8', newline='') as f: f.write(content) def rogue_extract(folder): print('rogue_extract') def iter_route(): for row in RouteExtract(f'{folder}').iter_route(): domain = row.name.split('_', maxsplit=1)[0] row = RogueRouteModel(domain=domain, **row.model_dump()) row.name = f'{row.domain}_{row.route.split(":")[1]}' row.route = row.route.replace('_', '.', 1) yield row routes = RogueRouteListModel(list(iter_route())) model_to_json(routes, f'{folder}/route.json') if __name__ == '__main__': os.chdir(os.path.join(os.path.dirname(__file__), '../')) RouteExtract('./route/daily').write('./tasks/map/route/route/daily.py') self = RouteDetect('../SrcRoute/rogue') self.predict() self.write() self.insert('./route/rogue', base='tasks.rogue.route.base')