diff --git a/tasks/base/main_page.py b/tasks/base/main_page.py index 178bbd0d8..abdb3496a 100644 --- a/tasks/base/main_page.py +++ b/tasks/base/main_page.py @@ -26,6 +26,8 @@ class OcrPlaneName(Ocr): result = result.replace('avatia', 'avalia') # DomainiRespite result = result.replace('omaini', 'omain') + # Domain=Combat + result = result.replace('=', '') # 累塔的办公室 result = result.replace('累塔', '黑塔') diff --git a/tasks/map/control/control.py b/tasks/map/control/control.py index 01c524ba5..9e7ddd0ca 100644 --- a/tasks/map/control/control.py +++ b/tasks/map/control/control.py @@ -1,5 +1,8 @@ +from collections import deque from functools import cached_property +import numpy as np + from module.base.timer import Timer from module.logger import logger from tasks.base.assets.assets_base_page import CLOSE @@ -131,6 +134,7 @@ class MapControl(Combat, AimDetectorMixin): aim_interval = Timer(0.3, count=1) attacked_enemy = Timer(1.2, count=4) attacked_item = Timer(0.6, count=2) + near_queue = deque(maxlen=waypoint.unexpected_confirm.count) while 1: if skip_first_screenshot: skip_first_screenshot = False @@ -176,7 +180,7 @@ class MapControl(Combat, AimDetectorMixin): if self.aim.aimed_enemy: if 'enemy' in waypoint.expected_end: if self.handle_map_A(): - allow_run_2x = allow_straight_run = allow_run = allow_walk = False + allow_run_2x = allow_straight_run = False attacked_enemy.reset() direction_interval.reset() rotation_interval.reset() @@ -185,7 +189,7 @@ class MapControl(Combat, AimDetectorMixin): if self.aim.aimed_item: if 'item' in waypoint.expected_end: if self.handle_map_A(): - allow_run_2x = allow_straight_run = allow_run = allow_walk = False + allow_run_2x = allow_straight_run = False attacked_item.reset() direction_interval.reset() rotation_interval.reset() @@ -199,20 +203,22 @@ class MapControl(Combat, AimDetectorMixin): return result # Arrive - if not attacked_enemy.started() and not attacked_item.started(): - if self.minimap.is_position_near(waypoint.position, threshold=waypoint.get_threshold(end_opt)): - if not waypoint.expected_end or waypoint.match_results(result): - logger.info(f'Arrive waypoint: {waypoint}') - return result - else: - if waypoint.unexpected_confirm.reached(): - logger.info(f'Arrive waypoint with unexpected result: {waypoint}') - return result + if near :=self.minimap.is_position_near(waypoint.position, threshold=waypoint.get_threshold(end_opt)): + near_queue.append(near) + if not waypoint.expected_end or waypoint.match_results(result): + logger.info(f'Arrive waypoint: {waypoint}') + return result else: + if waypoint.unexpected_confirm.reached(): + logger.info(f'Arrive waypoint with unexpected result: {waypoint}') + return result + else: + near_queue.append(near) + logger.info(near_queue) + if np.mean(near_queue) < 0.6: waypoint.unexpected_confirm.reset() # Switch run case - if end_opt: if allow_run_2x and diff < 20: logger.info(f'Approaching target, diff={round(diff, 1)}, disallow run_2x') @@ -303,6 +309,9 @@ class MapControl(Combat, AimDetectorMixin): Args: waypoints: position (x, y), a list of position to go along, or a list of Waypoint objects to go along. + + Returns: + list[str]: A list of walk result """ logger.hr('Goto', level=1) self.map_A_timer.clear() @@ -313,6 +322,7 @@ class MapControl(Combat, AimDetectorMixin): end_list = [False for _ in waypoints] end_list[-1] = True + results = [] with JoystickContact(self) as contact: for waypoint, end in zip(waypoints, end_list): waypoint: Waypoint @@ -324,6 +334,7 @@ class MapControl(Combat, AimDetectorMixin): ) expected = waypoint.expected_to_str(waypoint.expected_end) logger.info(f'Arrive waypoint, expected: {expected}, result: {result}') + results += result matched = waypoint.match_results(result) if not waypoint.expected_end or matched: logger.info(f'Arrive waypoint with expected result: {matched}') @@ -334,6 +345,7 @@ class MapControl(Combat, AimDetectorMixin): if end_point.end_rotation is not None: logger.hr('End rotation', level=2) self.rotation_set(end_point.end_rotation, threshold=end_point.end_rotation_threshold) + return results def clear_item(self, *waypoints): """ @@ -358,7 +370,7 @@ class MapControl(Combat, AimDetectorMixin): waypoints: position (x, y), a list of position to go along. or a list of Waypoint objects to go along. """ - logger.hr('Clear item', level=1) + logger.hr('Clear enemy', level=1) waypoints = ensure_waypoints(waypoints) end_point = waypoints[-1] end_point.expected_end.append('enemy') diff --git a/tasks/map/control/waypoint.py b/tasks/map/control/waypoint.py index 831e5894a..0c61ef277 100644 --- a/tasks/map/control/waypoint.py +++ b/tasks/map/control/waypoint.py @@ -40,7 +40,7 @@ class Waypoint: # If triggered any expected event, consider arrive and stop walking early_stop: bool = True # Confirm timer if arrived but didn't trigger any expected event - unexpected_confirm: Timer = field(default_factory=lambda: Timer(2, count=6)) + unexpected_confirm: Timer = field(default_factory=lambda: Timer(3, count=15)) def __str__(self): return f'Waypoint({self.position})' diff --git a/tasks/map/interact/aim.py b/tasks/map/interact/aim.py index bebbd7fee..877b0dad7 100644 --- a/tasks/map/interact/aim.py +++ b/tasks/map/interact/aim.py @@ -302,11 +302,11 @@ class Aim: def aimed_enemy(self) -> tuple[int, int] | None: if self.points_enemy is None: return None - try: - _ = self.points_enemy[1] - logger.warning(f'Multiple aimed enemy found, using first point of {self.points_enemy}') - except IndexError: - pass + + count = len(self.points_enemy) + if count >= 2: + logger.warning(f'Multiple aimed enemy found: {self.points_enemy}') + return None try: point = self.points_enemy[0] return tuple(point) diff --git a/tasks/map/resource/resource.py b/tasks/map/resource/resource.py index faeaebe79..cb31b0207 100644 --- a/tasks/map/resource/resource.py +++ b/tasks/map/resource/resource.py @@ -13,7 +13,8 @@ from tasks.map.resource.const import ResourceConst from tasks.map.keywords import KEYWORDS_MAP_PLANE, MapPlane SPECIAL_PLANES = [ - ('Luofu_StargazerNavalia', 'F2Rogue') + ('Luofu_StargazerNavalia', 'F2Rogue'), + ('Luofu_Cloudford', 'F1Rogue'), ] diff --git a/tasks/rogue/route/base.py b/tasks/rogue/route/base.py index e8c631d99..91b9926b1 100644 --- a/tasks/rogue/route/base.py +++ b/tasks/rogue/route/base.py @@ -1,6 +1,9 @@ from module.logger import logger +from tasks.base.page import page_rogue from tasks.map.control.waypoint import ensure_waypoints from tasks.map.route.base import RouteBase as RouteBase_ +from tasks.rogue.assets.assets_rogue_reward import ROGUE_REPORT +from tasks.rogue.assets.assets_rogue_ui import BLESSING_CONFIRM from tasks.rogue.bleesing.blessing import RogueBlessingSelector from tasks.rogue.bleesing.bonus import RogueBonusSelector from tasks.rogue.bleesing.curio import RogueCurioSelector @@ -27,13 +30,36 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): def combat_execute(self, expected_end=None): return super().combat_execute(expected_end=self.combat_expected_end) + def handle_blessing(self): + """ + Returns: + bool: If handled + """ + if self.is_page_choose_blessing(): + logger.hr('Choose blessing', level=2) + selector = RogueBlessingSelector(self) + selector.recognize_and_select() + return True + if self.is_page_choose_curio(): + logger.hr('Choose curio', level=2) + selector = RogueCurioSelector(self) + selector.recognize_and_select() + return True + if self.is_page_choose_bonus(): + logger.hr('Choose bonus', level=2) + selector = RogueBonusSelector(self) + selector.recognize_and_select() + return True + + return False + def clear_blessing(self, skip_first_screenshot=True): """ Pages: in: combat_expected_end() out: is_in_main() """ - logger.info(f'Clear blessing') + logger.info('Clear blessing') while 1: if skip_first_screenshot: skip_first_screenshot = False @@ -45,51 +71,83 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): logger.info(f'clear_blessing() ended at page_main') break - if self.is_page_choose_blessing(): - logger.hr('Choose blessing', level=2) - selector = RogueBlessingSelector(self) - selector.recognize_and_select() - if self.is_page_choose_curio(): - logger.hr('Choose curio', level=2) - selector = RogueCurioSelector(self) - selector.recognize_and_select() - if self.is_page_choose_bonus(): - logger.hr('Choose bonus', level=2) - selector = RogueBonusSelector(self) - selector.recognize_and_select() + if self.handle_blessing(): + continue + + def clear_occurrence(self, skip_first_screenshot=True): + """ + Pages: + in: page_rogue, occurrence + out: is_in_main() + """ + logger.info('Clear occurrence') + while 1: + if skip_first_screenshot: + skip_first_screenshot = False + else: + self.device.screenshot() + + # End + if self.is_in_main(): + logger.info(f'clear_occurrence() ended at page_main') + break + + if self.handle_reward(interval=2): + continue + if self.is_combat_executing(): + self.combat_execute() + continue + if self.handle_blessing(): + continue + + # TODO: Select events + pass + + def goto(self, *waypoints): + result = super().goto(*waypoints) + if 'enemy' in result: + self.clear_blessing() + return result """ Additional rogue methods """ - def clear_enemy(self, *waypoints): - logger.hr('Clear enemy', level=1) - result = super().clear_enemy(*waypoints) - - self.clear_blessing() - return result - def clear_elite(self, *waypoints): logger.hr('Clear elite', level=1) waypoints = ensure_waypoints(waypoints) end_point = waypoints[-1] end_point.speed = 'run_2x' - # Use skill + # TODO: Use techniques before BOSS pass result = super().clear_enemy(*waypoints) - - self.clear_blessing() return result + def _domain_event_expected_end(self): + """ + Returns: + bool: If entered event + """ + if self.ui_page_appear(page_rogue): + return True + if self.handle_combat_interact(): + return False + return False + def clear_event(self, *waypoints): """ Handle an event in DomainOccurrence, DomainEncounter, DomainTransaction """ logger.hr('Clear event', level=1) + waypoints = ensure_waypoints(waypoints) + end_point = waypoints[-1] + end_point.endpoint_threshold = 1.5 + end_point.expected_end.append(self._domain_event_expected_end) result = self.goto(*waypoints) + self.clear_occurrence() return result def domain_reward(self, *waypoints): @@ -98,10 +156,14 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): """ logger.hr('Clear reward', level=1) - # Skip if not going to get reward - pass + # TODO: Skip if user don't want rewards or stamina exhausted + return [] result = self.goto(*waypoints) + + # TODO: Get reward + pass + return result def domain_herta(self, *waypoints): @@ -110,6 +172,58 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): """ pass + def _domain_exit_expected_end(self): + """ + Returns: + bool: If domain exited + """ + if self.is_map_loading(): + logger.info('domain exit: is_map_loading()') + return True + # No loading after elite + if self.is_map_loading_black(): + logger.info('domain exit: is_map_loading_black()') + return True + # Rogue cleared + if self.appear(ROGUE_REPORT, interval=2): + logger.info(f'domain exit: {ROGUE_REPORT}') + return True + + if self.handle_popup_confirm(): + return False + if self.handle_combat_interact(): + return False + + return False + + def _domain_exit_wait_next(self, skip_first_screenshot=True): + """ + Pages: + in: is_map_loading() + out: page_main + or page_rogue if rogue cleared + """ + logger.info('Wait next domain') + while 1: + if skip_first_screenshot: + skip_first_screenshot = False + else: + self.device.screenshot() + + # End + if self.is_in_main(): + logger.info('Entered another domain') + break + if self.ui_page_appear(page_rogue): + logger.info('Rogue cleared') + break + + if self.appear(ROGUE_REPORT, interval=2): + self.device.click(BLESSING_CONFIRM) + continue + if self.handle_popup_confirm(): + continue + def domain_single_exit(self, *waypoints): """ Goto a single exit, exit current domain @@ -117,9 +231,11 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): """ logger.hr('Domain single exit', level=1) waypoints = ensure_waypoints(waypoints) - result = self.goto(*waypoints) + end_point = waypoints[-1] + end_point.expected_end.append(self._domain_exit_expected_end) - self.domain_exit_interact() + result = self.goto(*waypoints) + self._domain_exit_wait_next() return result def domain_exit(self, *waypoints, end_rotation=None): @@ -127,8 +243,12 @@ class RouteBase(RouteBase_, RogueUI, RogueExit): waypoints = ensure_waypoints(waypoints) end_point = waypoints[-1] end_point.end_rotation = end_rotation + end_point.endpoint_threshold = 1.5 result = self.goto(*waypoints) + # TODO: Domain exit detection + pass + return result """ diff --git a/tasks/rogue/route/loader.py b/tasks/rogue/route/loader.py index 159eef73c..d5d73a198 100644 --- a/tasks/rogue/route/loader.py +++ b/tasks/rogue/route/loader.py @@ -1,5 +1,7 @@ from typing import Optional +import numpy as np + from module.base.decorator import cached_property from module.logger import logger from tasks.base.main_page import MainPage @@ -42,6 +44,12 @@ class MinimapWrapper: Luofu_AurumAlley, ] maps = {} + + for plane, floor in SPECIAL_PLANES: + minimap = Minimap() + minimap.set_plane(plane=plane, floor=floor) + maps[f'{plane}_{floor}'] = minimap + for plane in MapPlane.instances.values(): if plane in blacklist: continue @@ -52,16 +60,14 @@ class MinimapWrapper: minimap.set_plane(plane=plane, floor=floor) maps[f'{plane.name}_{floor}'] = minimap - for plane, floor in SPECIAL_PLANES: - minimap = Minimap() - minimap.set_plane(plane=plane, floor=floor) - maps[f'{plane}_{floor}'] = minimap - + logger.attr('MinimapLoaded', len(maps)) return maps @cached_property def all_route(self) -> list[RogueRouteModel]: - return model_from_json(RogueRouteListModel, './route/rogue/route.json').root + routes = model_from_json(RogueRouteListModel, './route/rogue/route.json').root + logger.attr('RouteLoaded', len(routes)) + return routes def get_minimap(self, route: RogueRouteModel): return self.all_minimap[route.plane_floor] @@ -84,7 +90,7 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage): if plane.rogue_domain in ['Encounter', 'Transaction'] and route.is_DomainOccurrence: # Treat as "Occurrence" pass - if plane.rogue_domain in ['Boss'] and route.is_DomainElite: + elif plane.rogue_domain in ['Boss'] and route.is_DomainElite: # Treat as "Elite" pass else: @@ -93,22 +99,37 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage): minimap.init_position(route.position, show_log=False) try: minimap.update_position(image) - except FileNotFoundError: + except FileNotFoundError as e: + logger.warning(e) continue - visited.append((route, minimap.position_similarity)) + visited.append((route, minimap.position_similarity, minimap.position)) if len(visited) < 3: logger.warning('Too few routes to search from, not enough to make a prediction') return visited = sorted(visited, key=lambda x: x[1], reverse=True) - logger.info(f'Best 3 prediction: {[(r.name, s) for r, s in visited[:3]]}') - if visited[1][1] / visited[0][1] > 0.75: - logger.warning('Similarity too close, not enough to make a prediction') - return + logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in visited[:3]]}') + nearby = [ + (r, s, p) for r, s, p in visited if np.linalg.norm(np.subtract(r.position, p)) < 5 + ] + logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in nearby[:3]]}') + if len(nearby) == 1: + if nearby[0][1] > 0.05: + logger.attr('RoutePredict', nearby[0][0].name) + return nearby[0][0] + elif len(nearby) >= 2: + if nearby[0][1] / nearby[1][1] > 0.75: + logger.attr('RoutePredict', nearby[0][0].name) + return nearby[0][0] - logger.attr('RoutePredict', visited[0][0].name) - return visited[0][0] + # logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in visited[:3]]}') + # if visited[0][1] / visited[1][1] > 0.75: + # logger.attr('RoutePredict', visited[0][0].name) + # return visited[0][0] + + logger.warning('Similarity too close, not enough to make a prediction') + return None def position_find_bruteforce(self, image) -> Minimap: """ @@ -155,6 +176,7 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage): if __name__ == '__main__': self = RouteLoader('src', task='Rogue') # self.image_file = r'' + # self.device.screenshot() # self.position_find_bruteforce(self.device.image) self.device.screenshot()