Fix: Rogue bugfixes

- Do nothing if multiple aim found
- Continue walking after handle_map_A()
- Allow arrive with unexpected result
- Strict route search
This commit is contained in:
LmeSzinc 2023-10-03 17:59:51 +08:00
parent d1abb19982
commit 398488f5e7
7 changed files with 219 additions and 62 deletions

View File

@ -26,6 +26,8 @@ class OcrPlaneName(Ocr):
result = result.replace('avatia', 'avalia') result = result.replace('avatia', 'avalia')
# DomainiRespite # DomainiRespite
result = result.replace('omaini', 'omain') result = result.replace('omaini', 'omain')
# Domain=Combat
result = result.replace('=', '')
# 累塔的办公室 # 累塔的办公室
result = result.replace('累塔', '黑塔') result = result.replace('累塔', '黑塔')

View File

@ -1,5 +1,8 @@
from collections import deque
from functools import cached_property from functools import cached_property
import numpy as np
from module.base.timer import Timer from module.base.timer import Timer
from module.logger import logger from module.logger import logger
from tasks.base.assets.assets_base_page import CLOSE from tasks.base.assets.assets_base_page import CLOSE
@ -131,6 +134,7 @@ class MapControl(Combat, AimDetectorMixin):
aim_interval = Timer(0.3, count=1) aim_interval = Timer(0.3, count=1)
attacked_enemy = Timer(1.2, count=4) attacked_enemy = Timer(1.2, count=4)
attacked_item = Timer(0.6, count=2) attacked_item = Timer(0.6, count=2)
near_queue = deque(maxlen=waypoint.unexpected_confirm.count)
while 1: while 1:
if skip_first_screenshot: if skip_first_screenshot:
skip_first_screenshot = False skip_first_screenshot = False
@ -176,7 +180,7 @@ class MapControl(Combat, AimDetectorMixin):
if self.aim.aimed_enemy: if self.aim.aimed_enemy:
if 'enemy' in waypoint.expected_end: if 'enemy' in waypoint.expected_end:
if self.handle_map_A(): if self.handle_map_A():
allow_run_2x = allow_straight_run = allow_run = allow_walk = False allow_run_2x = allow_straight_run = False
attacked_enemy.reset() attacked_enemy.reset()
direction_interval.reset() direction_interval.reset()
rotation_interval.reset() rotation_interval.reset()
@ -185,7 +189,7 @@ class MapControl(Combat, AimDetectorMixin):
if self.aim.aimed_item: if self.aim.aimed_item:
if 'item' in waypoint.expected_end: if 'item' in waypoint.expected_end:
if self.handle_map_A(): if self.handle_map_A():
allow_run_2x = allow_straight_run = allow_run = allow_walk = False allow_run_2x = allow_straight_run = False
attacked_item.reset() attacked_item.reset()
direction_interval.reset() direction_interval.reset()
rotation_interval.reset() rotation_interval.reset()
@ -199,20 +203,22 @@ class MapControl(Combat, AimDetectorMixin):
return result return result
# Arrive # Arrive
if not attacked_enemy.started() and not attacked_item.started(): if near :=self.minimap.is_position_near(waypoint.position, threshold=waypoint.get_threshold(end_opt)):
if self.minimap.is_position_near(waypoint.position, threshold=waypoint.get_threshold(end_opt)): near_queue.append(near)
if not waypoint.expected_end or waypoint.match_results(result): if not waypoint.expected_end or waypoint.match_results(result):
logger.info(f'Arrive waypoint: {waypoint}') logger.info(f'Arrive waypoint: {waypoint}')
return result return result
else:
if waypoint.unexpected_confirm.reached():
logger.info(f'Arrive waypoint with unexpected result: {waypoint}')
return result
else: else:
if waypoint.unexpected_confirm.reached():
logger.info(f'Arrive waypoint with unexpected result: {waypoint}')
return result
else:
near_queue.append(near)
logger.info(near_queue)
if np.mean(near_queue) < 0.6:
waypoint.unexpected_confirm.reset() waypoint.unexpected_confirm.reset()
# Switch run case # Switch run case
if end_opt: if end_opt:
if allow_run_2x and diff < 20: if allow_run_2x and diff < 20:
logger.info(f'Approaching target, diff={round(diff, 1)}, disallow run_2x') logger.info(f'Approaching target, diff={round(diff, 1)}, disallow run_2x')
@ -303,6 +309,9 @@ class MapControl(Combat, AimDetectorMixin):
Args: Args:
waypoints: position (x, y), a list of position to go along, waypoints: position (x, y), a list of position to go along,
or a list of Waypoint objects to go along. or a list of Waypoint objects to go along.
Returns:
list[str]: A list of walk result
""" """
logger.hr('Goto', level=1) logger.hr('Goto', level=1)
self.map_A_timer.clear() self.map_A_timer.clear()
@ -313,6 +322,7 @@ class MapControl(Combat, AimDetectorMixin):
end_list = [False for _ in waypoints] end_list = [False for _ in waypoints]
end_list[-1] = True end_list[-1] = True
results = []
with JoystickContact(self) as contact: with JoystickContact(self) as contact:
for waypoint, end in zip(waypoints, end_list): for waypoint, end in zip(waypoints, end_list):
waypoint: Waypoint waypoint: Waypoint
@ -324,6 +334,7 @@ class MapControl(Combat, AimDetectorMixin):
) )
expected = waypoint.expected_to_str(waypoint.expected_end) expected = waypoint.expected_to_str(waypoint.expected_end)
logger.info(f'Arrive waypoint, expected: {expected}, result: {result}') logger.info(f'Arrive waypoint, expected: {expected}, result: {result}')
results += result
matched = waypoint.match_results(result) matched = waypoint.match_results(result)
if not waypoint.expected_end or matched: if not waypoint.expected_end or matched:
logger.info(f'Arrive waypoint with expected result: {matched}') logger.info(f'Arrive waypoint with expected result: {matched}')
@ -334,6 +345,7 @@ class MapControl(Combat, AimDetectorMixin):
if end_point.end_rotation is not None: if end_point.end_rotation is not None:
logger.hr('End rotation', level=2) logger.hr('End rotation', level=2)
self.rotation_set(end_point.end_rotation, threshold=end_point.end_rotation_threshold) self.rotation_set(end_point.end_rotation, threshold=end_point.end_rotation_threshold)
return results
def clear_item(self, *waypoints): def clear_item(self, *waypoints):
""" """
@ -358,7 +370,7 @@ class MapControl(Combat, AimDetectorMixin):
waypoints: position (x, y), a list of position to go along. waypoints: position (x, y), a list of position to go along.
or a list of Waypoint objects to go along. or a list of Waypoint objects to go along.
""" """
logger.hr('Clear item', level=1) logger.hr('Clear enemy', level=1)
waypoints = ensure_waypoints(waypoints) waypoints = ensure_waypoints(waypoints)
end_point = waypoints[-1] end_point = waypoints[-1]
end_point.expected_end.append('enemy') end_point.expected_end.append('enemy')

View File

@ -40,7 +40,7 @@ class Waypoint:
# If triggered any expected event, consider arrive and stop walking # If triggered any expected event, consider arrive and stop walking
early_stop: bool = True early_stop: bool = True
# Confirm timer if arrived but didn't trigger any expected event # Confirm timer if arrived but didn't trigger any expected event
unexpected_confirm: Timer = field(default_factory=lambda: Timer(2, count=6)) unexpected_confirm: Timer = field(default_factory=lambda: Timer(3, count=15))
def __str__(self): def __str__(self):
return f'Waypoint({self.position})' return f'Waypoint({self.position})'

View File

@ -302,11 +302,11 @@ class Aim:
def aimed_enemy(self) -> tuple[int, int] | None: def aimed_enemy(self) -> tuple[int, int] | None:
if self.points_enemy is None: if self.points_enemy is None:
return None return None
try:
_ = self.points_enemy[1] count = len(self.points_enemy)
logger.warning(f'Multiple aimed enemy found, using first point of {self.points_enemy}') if count >= 2:
except IndexError: logger.warning(f'Multiple aimed enemy found: {self.points_enemy}')
pass return None
try: try:
point = self.points_enemy[0] point = self.points_enemy[0]
return tuple(point) return tuple(point)

View File

@ -13,7 +13,8 @@ from tasks.map.resource.const import ResourceConst
from tasks.map.keywords import KEYWORDS_MAP_PLANE, MapPlane from tasks.map.keywords import KEYWORDS_MAP_PLANE, MapPlane
SPECIAL_PLANES = [ SPECIAL_PLANES = [
('Luofu_StargazerNavalia', 'F2Rogue') ('Luofu_StargazerNavalia', 'F2Rogue'),
('Luofu_Cloudford', 'F1Rogue'),
] ]

View File

@ -1,6 +1,9 @@
from module.logger import logger from module.logger import logger
from tasks.base.page import page_rogue
from tasks.map.control.waypoint import ensure_waypoints from tasks.map.control.waypoint import ensure_waypoints
from tasks.map.route.base import RouteBase as RouteBase_ from tasks.map.route.base import RouteBase as RouteBase_
from tasks.rogue.assets.assets_rogue_reward import ROGUE_REPORT
from tasks.rogue.assets.assets_rogue_ui import BLESSING_CONFIRM
from tasks.rogue.bleesing.blessing import RogueBlessingSelector from tasks.rogue.bleesing.blessing import RogueBlessingSelector
from tasks.rogue.bleesing.bonus import RogueBonusSelector from tasks.rogue.bleesing.bonus import RogueBonusSelector
from tasks.rogue.bleesing.curio import RogueCurioSelector from tasks.rogue.bleesing.curio import RogueCurioSelector
@ -27,13 +30,36 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
def combat_execute(self, expected_end=None): def combat_execute(self, expected_end=None):
return super().combat_execute(expected_end=self.combat_expected_end) return super().combat_execute(expected_end=self.combat_expected_end)
def handle_blessing(self):
"""
Returns:
bool: If handled
"""
if self.is_page_choose_blessing():
logger.hr('Choose blessing', level=2)
selector = RogueBlessingSelector(self)
selector.recognize_and_select()
return True
if self.is_page_choose_curio():
logger.hr('Choose curio', level=2)
selector = RogueCurioSelector(self)
selector.recognize_and_select()
return True
if self.is_page_choose_bonus():
logger.hr('Choose bonus', level=2)
selector = RogueBonusSelector(self)
selector.recognize_and_select()
return True
return False
def clear_blessing(self, skip_first_screenshot=True): def clear_blessing(self, skip_first_screenshot=True):
""" """
Pages: Pages:
in: combat_expected_end() in: combat_expected_end()
out: is_in_main() out: is_in_main()
""" """
logger.info(f'Clear blessing') logger.info('Clear blessing')
while 1: while 1:
if skip_first_screenshot: if skip_first_screenshot:
skip_first_screenshot = False skip_first_screenshot = False
@ -45,51 +71,83 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
logger.info(f'clear_blessing() ended at page_main') logger.info(f'clear_blessing() ended at page_main')
break break
if self.is_page_choose_blessing(): if self.handle_blessing():
logger.hr('Choose blessing', level=2) continue
selector = RogueBlessingSelector(self)
selector.recognize_and_select() def clear_occurrence(self, skip_first_screenshot=True):
if self.is_page_choose_curio(): """
logger.hr('Choose curio', level=2) Pages:
selector = RogueCurioSelector(self) in: page_rogue, occurrence
selector.recognize_and_select() out: is_in_main()
if self.is_page_choose_bonus(): """
logger.hr('Choose bonus', level=2) logger.info('Clear occurrence')
selector = RogueBonusSelector(self) while 1:
selector.recognize_and_select() if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_in_main():
logger.info(f'clear_occurrence() ended at page_main')
break
if self.handle_reward(interval=2):
continue
if self.is_combat_executing():
self.combat_execute()
continue
if self.handle_blessing():
continue
# TODO: Select events
pass
def goto(self, *waypoints):
result = super().goto(*waypoints)
if 'enemy' in result:
self.clear_blessing()
return result
""" """
Additional rogue methods Additional rogue methods
""" """
def clear_enemy(self, *waypoints):
logger.hr('Clear enemy', level=1)
result = super().clear_enemy(*waypoints)
self.clear_blessing()
return result
def clear_elite(self, *waypoints): def clear_elite(self, *waypoints):
logger.hr('Clear elite', level=1) logger.hr('Clear elite', level=1)
waypoints = ensure_waypoints(waypoints) waypoints = ensure_waypoints(waypoints)
end_point = waypoints[-1] end_point = waypoints[-1]
end_point.speed = 'run_2x' end_point.speed = 'run_2x'
# Use skill # TODO: Use techniques before BOSS
pass pass
result = super().clear_enemy(*waypoints) result = super().clear_enemy(*waypoints)
self.clear_blessing()
return result return result
def _domain_event_expected_end(self):
"""
Returns:
bool: If entered event
"""
if self.ui_page_appear(page_rogue):
return True
if self.handle_combat_interact():
return False
return False
def clear_event(self, *waypoints): def clear_event(self, *waypoints):
""" """
Handle an event in DomainOccurrence, DomainEncounter, DomainTransaction Handle an event in DomainOccurrence, DomainEncounter, DomainTransaction
""" """
logger.hr('Clear event', level=1) logger.hr('Clear event', level=1)
waypoints = ensure_waypoints(waypoints)
end_point = waypoints[-1]
end_point.endpoint_threshold = 1.5
end_point.expected_end.append(self._domain_event_expected_end)
result = self.goto(*waypoints) result = self.goto(*waypoints)
self.clear_occurrence()
return result return result
def domain_reward(self, *waypoints): def domain_reward(self, *waypoints):
@ -98,10 +156,14 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
""" """
logger.hr('Clear reward', level=1) logger.hr('Clear reward', level=1)
# Skip if not going to get reward # TODO: Skip if user don't want rewards or stamina exhausted
pass return []
result = self.goto(*waypoints) result = self.goto(*waypoints)
# TODO: Get reward
pass
return result return result
def domain_herta(self, *waypoints): def domain_herta(self, *waypoints):
@ -110,6 +172,58 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
""" """
pass pass
def _domain_exit_expected_end(self):
"""
Returns:
bool: If domain exited
"""
if self.is_map_loading():
logger.info('domain exit: is_map_loading()')
return True
# No loading after elite
if self.is_map_loading_black():
logger.info('domain exit: is_map_loading_black()')
return True
# Rogue cleared
if self.appear(ROGUE_REPORT, interval=2):
logger.info(f'domain exit: {ROGUE_REPORT}')
return True
if self.handle_popup_confirm():
return False
if self.handle_combat_interact():
return False
return False
def _domain_exit_wait_next(self, skip_first_screenshot=True):
"""
Pages:
in: is_map_loading()
out: page_main
or page_rogue if rogue cleared
"""
logger.info('Wait next domain')
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_in_main():
logger.info('Entered another domain')
break
if self.ui_page_appear(page_rogue):
logger.info('Rogue cleared')
break
if self.appear(ROGUE_REPORT, interval=2):
self.device.click(BLESSING_CONFIRM)
continue
if self.handle_popup_confirm():
continue
def domain_single_exit(self, *waypoints): def domain_single_exit(self, *waypoints):
""" """
Goto a single exit, exit current domain Goto a single exit, exit current domain
@ -117,9 +231,11 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
""" """
logger.hr('Domain single exit', level=1) logger.hr('Domain single exit', level=1)
waypoints = ensure_waypoints(waypoints) waypoints = ensure_waypoints(waypoints)
result = self.goto(*waypoints) end_point = waypoints[-1]
end_point.expected_end.append(self._domain_exit_expected_end)
self.domain_exit_interact() result = self.goto(*waypoints)
self._domain_exit_wait_next()
return result return result
def domain_exit(self, *waypoints, end_rotation=None): def domain_exit(self, *waypoints, end_rotation=None):
@ -127,8 +243,12 @@ class RouteBase(RouteBase_, RogueUI, RogueExit):
waypoints = ensure_waypoints(waypoints) waypoints = ensure_waypoints(waypoints)
end_point = waypoints[-1] end_point = waypoints[-1]
end_point.end_rotation = end_rotation end_point.end_rotation = end_rotation
end_point.endpoint_threshold = 1.5
result = self.goto(*waypoints) result = self.goto(*waypoints)
# TODO: Domain exit detection
pass
return result return result
""" """

View File

@ -1,5 +1,7 @@
from typing import Optional from typing import Optional
import numpy as np
from module.base.decorator import cached_property from module.base.decorator import cached_property
from module.logger import logger from module.logger import logger
from tasks.base.main_page import MainPage from tasks.base.main_page import MainPage
@ -42,6 +44,12 @@ class MinimapWrapper:
Luofu_AurumAlley, Luofu_AurumAlley,
] ]
maps = {} maps = {}
for plane, floor in SPECIAL_PLANES:
minimap = Minimap()
minimap.set_plane(plane=plane, floor=floor)
maps[f'{plane}_{floor}'] = minimap
for plane in MapPlane.instances.values(): for plane in MapPlane.instances.values():
if plane in blacklist: if plane in blacklist:
continue continue
@ -52,16 +60,14 @@ class MinimapWrapper:
minimap.set_plane(plane=plane, floor=floor) minimap.set_plane(plane=plane, floor=floor)
maps[f'{plane.name}_{floor}'] = minimap maps[f'{plane.name}_{floor}'] = minimap
for plane, floor in SPECIAL_PLANES: logger.attr('MinimapLoaded', len(maps))
minimap = Minimap()
minimap.set_plane(plane=plane, floor=floor)
maps[f'{plane}_{floor}'] = minimap
return maps return maps
@cached_property @cached_property
def all_route(self) -> list[RogueRouteModel]: def all_route(self) -> list[RogueRouteModel]:
return model_from_json(RogueRouteListModel, './route/rogue/route.json').root routes = model_from_json(RogueRouteListModel, './route/rogue/route.json').root
logger.attr('RouteLoaded', len(routes))
return routes
def get_minimap(self, route: RogueRouteModel): def get_minimap(self, route: RogueRouteModel):
return self.all_minimap[route.plane_floor] return self.all_minimap[route.plane_floor]
@ -84,7 +90,7 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage):
if plane.rogue_domain in ['Encounter', 'Transaction'] and route.is_DomainOccurrence: if plane.rogue_domain in ['Encounter', 'Transaction'] and route.is_DomainOccurrence:
# Treat as "Occurrence" # Treat as "Occurrence"
pass pass
if plane.rogue_domain in ['Boss'] and route.is_DomainElite: elif plane.rogue_domain in ['Boss'] and route.is_DomainElite:
# Treat as "Elite" # Treat as "Elite"
pass pass
else: else:
@ -93,22 +99,37 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage):
minimap.init_position(route.position, show_log=False) minimap.init_position(route.position, show_log=False)
try: try:
minimap.update_position(image) minimap.update_position(image)
except FileNotFoundError: except FileNotFoundError as e:
logger.warning(e)
continue continue
visited.append((route, minimap.position_similarity)) visited.append((route, minimap.position_similarity, minimap.position))
if len(visited) < 3: if len(visited) < 3:
logger.warning('Too few routes to search from, not enough to make a prediction') logger.warning('Too few routes to search from, not enough to make a prediction')
return return
visited = sorted(visited, key=lambda x: x[1], reverse=True) visited = sorted(visited, key=lambda x: x[1], reverse=True)
logger.info(f'Best 3 prediction: {[(r.name, s) for r, s in visited[:3]]}') logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in visited[:3]]}')
if visited[1][1] / visited[0][1] > 0.75: nearby = [
logger.warning('Similarity too close, not enough to make a prediction') (r, s, p) for r, s, p in visited if np.linalg.norm(np.subtract(r.position, p)) < 5
return ]
logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in nearby[:3]]}')
if len(nearby) == 1:
if nearby[0][1] > 0.05:
logger.attr('RoutePredict', nearby[0][0].name)
return nearby[0][0]
elif len(nearby) >= 2:
if nearby[0][1] / nearby[1][1] > 0.75:
logger.attr('RoutePredict', nearby[0][0].name)
return nearby[0][0]
logger.attr('RoutePredict', visited[0][0].name) # logger.info(f'Best 3 prediction: {[(r.name, s, p) for r, s, p in visited[:3]]}')
return visited[0][0] # if visited[0][1] / visited[1][1] > 0.75:
# logger.attr('RoutePredict', visited[0][0].name)
# return visited[0][0]
logger.warning('Similarity too close, not enough to make a prediction')
return None
def position_find_bruteforce(self, image) -> Minimap: def position_find_bruteforce(self, image) -> Minimap:
""" """
@ -155,6 +176,7 @@ class RouteLoader(MinimapWrapper, RouteLoader_, MainPage):
if __name__ == '__main__': if __name__ == '__main__':
self = RouteLoader('src', task='Rogue') self = RouteLoader('src', task='Rogue')
# self.image_file = r'' # self.image_file = r''
# self.device.screenshot()
# self.position_find_bruteforce(self.device.image) # self.position_find_bruteforce(self.device.image)
self.device.screenshot() self.device.screenshot()