StarRailCopilot/tasks/combat/combat.py

372 lines
13 KiB
Python

from module.base.decorator import run_once
from module.exception import RequestHumanTakeover
from module.logger import logger
from tasks.combat.assets.assets_combat_finish import COMBAT_AGAIN, COMBAT_EXIT
from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE
from tasks.combat.assets.assets_combat_team import COMBAT_TEAM_PREPARE, COMBAT_TEAM_SUPPORT
from tasks.combat.interact import CombatInteract
from tasks.combat.prepare import CombatPrepare
from tasks.combat.skill import CombatSkill
from tasks.combat.state import CombatState
from tasks.combat.support import CombatSupport
from tasks.combat.team import CombatTeam
from tasks.map.control.joystick import MapControlJoystick
class Combat(CombatInteract, CombatPrepare, CombatState, CombatTeam, CombatSupport, CombatSkill, MapControlJoystick):
def handle_combat_prepare(self):
"""
Returns:
bool: If able to run a combat
Pages:
in: COMBAT_PREPARE
"""
self.combat_waves = 1
current = self.combat_get_trailblaze_power()
cost = self.combat_get_wave_cost()
if cost == 10:
# Calyx
self.combat_waves = min(current // self.combat_wave_cost, 6)
if self.combat_wave_limit:
self.combat_waves = min(self.combat_waves, self.combat_wave_limit - self.combat_wave_done)
logger.info(
f'Current has {current}, combat costs {self.combat_wave_cost}, '
f'wave={self.combat_wave_done}/{self.combat_wave_limit}, '
f'able to do {self.combat_waves} waves')
else:
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, '
f'able to do {self.combat_waves} waves')
if self.combat_waves > 0:
self.combat_set_wave(self.combat_waves)
else:
# Others
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, '
f'do {self.combat_waves} wave')
# Check limits
if self.config.stored.TrailblazePower.value < self.combat_wave_cost:
logger.info('Trailblaze power exhausted, cannot continue combat')
return False
if self.combat_waves <= 0:
logger.info('Combat wave limited, cannot continue combat')
return False
return True
def handle_ascension_dungeon_prepare(self):
"""
Returns:
bool: If clicked.
"""
if self.combat_wave_cost == 30 and self.is_in_main():
if self.handle_map_A():
return True
return False
def combat_prepare(self, team=1, support_character: str = None):
"""
Args:
team: 1 to 6.
support_character: Support character name
Returns:
bool: True if success to enter combat
False if trialblaze power is not enough
Pages:
in: COMBAT_PREPARE
out: is_combat_executing
"""
logger.hr('Combat prepare')
skip_first_screenshot = True
if support_character:
# To set team before support set
pre_set_team = True
# Block COMBAT_TEAM_PREPARE before support set
support_set = False
else:
pre_set_team = False
support_set = True
logger.info([support_character, pre_set_team, support_set])
trial = 0
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_combat_executing():
return True
# Relics full
# Clicking between COMBAT_PREPARE and COMBAT_TEAM_PREPARE
if trial > 3:
logger.critical('Failed to enter dungeon after 3 trial, probably because relics are full')
raise RequestHumanTakeover
# Click
if support_character and self.appear(COMBAT_TEAM_SUPPORT, interval=2):
if pre_set_team:
self.team_set(team)
pre_set_team = False
continue
self.support_set(support_character)
self.interval_reset(COMBAT_TEAM_SUPPORT)
support_set = True
continue
if support_set and self.appear(COMBAT_TEAM_PREPARE, interval=2):
self.team_set(team)
self.device.click(COMBAT_TEAM_PREPARE)
self.interval_reset(COMBAT_TEAM_PREPARE)
self.interval_reset(COMBAT_TEAM_SUPPORT)
continue
if self.appear(COMBAT_TEAM_PREPARE):
self.interval_reset(COMBAT_PREPARE)
self.map_A_timer.reset()
if self.appear(COMBAT_PREPARE, interval=2):
if not self.handle_combat_prepare():
return False
self.device.click(COMBAT_PREPARE)
self.interval_reset(COMBAT_PREPARE)
trial += 1
continue
if self.handle_combat_interact():
self.map_A_timer.reset()
continue
if self.handle_ascension_dungeon_prepare():
continue
if self.handle_popup_confirm():
continue
def combat_execute(self, expected_end=None):
"""
Args:
expected_end: A function returns bool, True represents end.
Pages:
in: is_combat_executing
out: COMBAT_AGAIN
"""
logger.hr('Combat execute')
skip_first_screenshot = True
is_executing = True
self.combat_state_reset()
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if callable(expected_end) and expected_end():
logger.info(f'Combat execute ended at {expected_end.__name__}')
break
if (self.appear(COMBAT_AGAIN) and
self.image_color_count(COMBAT_AGAIN, color=(227, 227, 228), threshold=221, count=50)):
logger.info(f'Combat execute ended at {COMBAT_AGAIN}')
break
if self.is_in_main():
logger.info(f'Combat execute ended at page_main')
break
# Daemon
if self.is_combat_executing():
if not is_executing:
logger.info('Combat continues')
self.device.stuck_record_clear()
is_executing = True
else:
is_executing = False
if self.handle_combat_state():
continue
def _combat_can_again(self) -> bool:
"""
Pages:
in: COMBAT_AGAIN
"""
current = self.combat_get_trailblaze_power(expect_reduce=self.combat_wave_cost > 0)
# Wave limit
if self.combat_wave_limit:
if self.combat_wave_done + self.combat_waves > self.combat_wave_limit:
logger.info(f'Combat wave limit: {self.combat_wave_done}/{self.combat_wave_limit}, '
f'can not run again')
return False
# Cost limit
if self.combat_wave_cost == 10:
if current >= self.combat_wave_cost * self.combat_waves:
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, can run again')
return True
else:
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, can not run again')
return False
elif self.combat_wave_cost <= 0:
logger.info(f'Free combat, combat costs {self.combat_wave_cost}, can not run again')
return False
else:
if current >= self.combat_wave_cost:
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, can run again')
return True
else:
logger.info(f'Current has {current}, combat costs {self.combat_wave_cost}, can not run again')
return False
def _combat_should_reenter(self):
"""
Returns:
bool: True to re-enter combat and run with another wave settings
"""
if self.config.stored.TrailblazePower.value < self.combat_wave_cost:
logger.info('Current trailblaze power is not enough for next run')
return False
# Wave limit
if self.combat_wave_limit:
if self.combat_wave_done < self.combat_wave_limit:
logger.info(f'Combat wave limit: {self.combat_wave_done}/{self.combat_wave_limit}, '
f'run again with less waves')
return True
else:
return False
# Cost limit
if self.config.stored.TrailblazePower.value >= self.combat_wave_cost:
logger.info('Still having some trailblaze power run with less waves to empty it')
return True
return False
def combat_finish(self) -> bool:
"""
Returns:
bool: True if exit, False if again
Pages:
in: COMBAT_AGAIN
out: page_main if exit
is_combat_executing if again
"""
logger.hr('Combat finish')
@run_once
def add_wave_done():
self.combat_wave_done += self.combat_waves
logger.info(f'Done {self.combat_waves} waves at total')
skip_first_screenshot = True
combat_can_again = None
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_in_main():
logger.info('Combat finishes at page_main')
return True
if self.is_combat_executing():
logger.info('Combat finishes at another combat')
return False
# Click
# Game client might slow to response COMBAT_AGAIN clicks
if self.appear(COMBAT_AGAIN, interval=5):
add_wave_done()
# Cache the result of _combat_can_again() as no expected stamina reduce during retry
if combat_can_again is None:
combat_can_again = self._combat_can_again()
if combat_can_again:
self.device.click(COMBAT_AGAIN)
else:
self.device.click(COMBAT_EXIT)
self.interval_reset(COMBAT_AGAIN)
continue
# Dropped light cone from weekly
if self.handle_get_light_cone():
continue
# Having any character died
if self.handle_popup_confirm():
continue
def combat_exit(self, skip_first_screenshot=True):
"""
Pages:
in: Any page during combat
out: page_main
"""
logger.info('Combat exit')
self.interval_clear([COMBAT_PREPARE, COMBAT_TEAM_PREPARE, COMBAT_AGAIN])
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_in_main():
break
# Click
if self.handle_ui_close(COMBAT_PREPARE, interval=2):
continue
if self.handle_ui_close(COMBAT_TEAM_PREPARE, interval=2):
continue
if self.appear(COMBAT_AGAIN, interval=2):
logger.info(f'{COMBAT_AGAIN} -> {COMBAT_EXIT}')
self.device.click(COMBAT_EXIT)
continue
if self.handle_get_light_cone():
continue
def is_trailblaze_power_exhausted(self) -> bool:
flag = self.config.stored.TrailblazePower.value < self.combat_wave_cost
logger.attr('TrailblazePowerExhausted', flag)
return flag
def combat(self, team: int = 1, wave_limit: int = 0, support_character: str = None, skip_first_screenshot=True):
"""
Combat until trailblaze power runs out.
Args:
team: 1 to 6.
wave_limit: Limit combat runs, 0 means no limit.
support_character: Support character name
skip_first_screenshot:
Returns:
int: Run count
Raises:
RequestHumanTakeover: If relics are full
Pages:
in: COMBAT_PREPARE
or page_main with DUNGEON_COMBAT_INTERACT
out: page_main
"""
if not skip_first_screenshot:
self.device.screenshot()
self.combat_wave_limit = wave_limit
self.combat_wave_done = 0
while 1:
logger.hr('Combat', level=2)
logger.info(f'Combat, team={team}, wave={self.combat_wave_done}/{self.combat_wave_limit}')
# Prepare
prepare = self.combat_prepare(team, support_character)
if not prepare:
self.combat_exit()
break
# Execute
self.combat_execute()
# Finish
finish = self.combat_finish()
if self._combat_should_reenter():
continue
if finish:
break
logger.attr('combat_wave_done', self.combat_wave_done)
return self.combat_wave_done