Add: Combat module

This commit is contained in:
LmeSzinc 2023-06-17 03:15:26 +08:00
parent d7bc6d82f6
commit fc1e1b4971
12 changed files with 300 additions and 42 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.7 KiB

After

Width:  |  Height:  |  Size: 5.4 KiB

View File

@ -25,8 +25,6 @@ class Device(Screenshot, Control, AppControl, Platform):
detect_record = set() detect_record = set()
click_record = deque(maxlen=15) click_record = deque(maxlen=15)
stuck_timer = Timer(60, count=60).start() stuck_timer = Timer(60, count=60).start()
stuck_timer_long = Timer(180, count=180).start()
stuck_long_wait_list = ['BATTLE_STATUS_S', 'PAUSE', 'LOGIN_CHECK']
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
for _ in range(2): for _ in range(2):
@ -96,7 +94,6 @@ class Device(Screenshot, Control, AppControl, Platform):
def stuck_record_clear(self): def stuck_record_clear(self):
self.detect_record = set() self.detect_record = set()
self.stuck_timer.reset() self.stuck_timer.reset()
self.stuck_timer_long.reset()
def stuck_record_check(self): def stuck_record_check(self):
""" """
@ -104,14 +101,8 @@ class Device(Screenshot, Control, AppControl, Platform):
GameStuckError: GameStuckError:
""" """
reached = self.stuck_timer.reached() reached = self.stuck_timer.reached()
reached_long = self.stuck_timer_long.reached()
if not reached: if not reached:
return False return False
if not reached_long:
for button in self.stuck_long_wait_list:
if button in self.detect_record:
return False
logger.warning('Wait too long') logger.warning('Wait too long')
logger.warning(f'Waiting for {self.detect_record}') logger.warning(f'Waiting for {self.detect_record}')

View File

@ -174,7 +174,9 @@ class Screenshot(Adb, WSA, DroidCast, AScreenCap, Scrcpy):
image = self.image image = self.image
Image.fromarray(image).show() Image.fromarray(image).show()
def image_save(self, file): def image_save(self, file=None):
if file is None:
file = f'{int(time.time() * 1000)}.png'
save_image(self.image, file) save_image(self.image, file)
def check_screen_size(self): def check_screen_size(self):

View File

@ -245,7 +245,7 @@ class DigitCounter(Ocr):
if res: if res:
groups = [int(s) for s in res.groups()] groups = [int(s) for s in res.groups()]
current, total = int(groups[0]), int(groups[1]) current, total = int(groups[0]), int(groups[1])
current = min(current, total) # current = min(current, total)
return current, total - current, total return current, total - current, total
else: else:
logger.warning(f'No digit counter found in {result}') logger.warning(f'No digit counter found in {result}')

View File

@ -47,9 +47,9 @@ WAVE_PLUS = ButtonWrapper(
name='WAVE_PLUS', name='WAVE_PLUS',
share=Button( share=Button(
file='./assets/share/combat/prepare/WAVE_PLUS.png', file='./assets/share/combat/prepare/WAVE_PLUS.png',
area=(1203, 578, 1239, 598), area=(1204, 578, 1236, 598),
search=(1183, 558, 1259, 618), search=(1184, 558, 1256, 618),
color=(23, 24, 25), color=(229, 229, 229),
button=(1203, 578, 1239, 598), button=(1204, 578, 1236, 598),
), ),
) )

175
tasks/combat/combat.py Normal file
View File

@ -0,0 +1,175 @@
from module.logger import logger
from tasks.base.page import page_main
from tasks.combat.assets.assets_combat_finish import COMBAT_AGAIN, COMBAT_EXIT
from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE
from tasks.combat.assets.assets_combat_team import COMBAT_TEAM_PREPARE
from tasks.combat.interact import CombatInteract
from tasks.combat.prepare import CombatPrepare
from tasks.combat.state import CombatState
from tasks.combat.team import CombatTeam
class Combat(CombatInteract, CombatPrepare, CombatState, CombatTeam):
_combat_has_multi_wave = True
@property
def combat_cost(self):
if self._combat_has_multi_wave:
return 10
else:
return 30
def handle_combat_prepare(self):
"""
Pages:
in: COMBAT_PREPARE
"""
current = self.combat_get_trailblaze_power()
self._combat_has_multi_wave = self.combat_has_multi_wave()
if self._combat_has_multi_wave:
wave = min(current // self.combat_cost, 6)
logger.info(f'Current has {current}, combat costs {self.combat_cost}, able to do {wave} waves')
self.combat_set_wave(wave)
else:
logger.info(f'Current has {current}, combat costs {self.combat_cost}, do 1 wave')
def combat_prepare(self, team=1):
"""
Pages:
in: COMBAT_PREPARE
out: is_combat_executing
"""
logger.hr('Combat prepare')
skip_first_screenshot = True
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.is_combat_executing():
break
# Click
if self.appear(COMBAT_PREPARE, interval=2):
self.handle_combat_prepare()
self.device.click(COMBAT_PREPARE)
self.interval_reset(COMBAT_PREPARE)
continue
if self.appear(COMBAT_TEAM_PREPARE, interval=2):
self.team_set(team)
self.device.click(COMBAT_TEAM_PREPARE)
self.interval_reset(COMBAT_TEAM_PREPARE)
continue
if self.handle_combat_interact():
continue
def combat_execute(self):
"""
Pages:
in: is_combat_executing
out: COMBAT_AGAIN
"""
logger.hr('Combat execute')
skip_first_screenshot = True
is_executing = True
self.combat_state_reset()
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.appear(COMBAT_AGAIN):
break
# Daemon
if self.is_combat_executing():
if not is_executing:
logger.info('Combat continues')
self.device.stuck_record_clear()
is_executing = True
else:
is_executing = False
if self.handle_combat_state():
continue
def _combat_can_again(self) -> bool:
"""
Pages:
in: COMBAT_AGAIN
"""
current = self.combat_get_trailblaze_power(expect_reduce=True)
if self._combat_has_multi_wave:
if current >= self.combat_cost * 6:
logger.info(f'Current has {current}, combat costs {self.combat_cost}, can run again')
return True
else:
logger.info(f'Current has {current}, combat costs {self.combat_cost}, can not run again')
return False
else:
logger.info(f'Current has {current}, combat costs {self.combat_cost}, no again')
return False
def combat_finish(self) -> bool:
"""
Returns:
bool: True if exit, False if again
Pages:
in: COMBAT_AGAIN
out: page_main if exit
is_combat_executing if again
"""
logger.hr('Combat finish')
skip_first_screenshot = True
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
if self.appear(page_main.check_button):
logger.info('Combat finishes at page_main')
return True
if self.is_combat_executing():
logger.info('Combat finishes at another combat')
return False
# Click
if self.appear(COMBAT_AGAIN, interval=2):
if self._combat_can_again():
self.device.click(COMBAT_AGAIN)
else:
self.device.click(COMBAT_EXIT)
self.interval_reset(COMBAT_AGAIN)
def combat(self, team: int = 1, skip_first_screenshot=True):
"""
Combat until trailblaze power runs out.
Args:
team: 1 to 6.
skip_first_screenshot:
Pages:
in: COMBAT_PREPARE
or page_main with DUNGEON_COMBAT_INTERACT
out: page_main
"""
if not skip_first_screenshot:
self.device.screenshot()
while 1:
logger.hr('Combat', level=2)
self.combat_prepare(team)
self.combat_execute()
finish = self.combat_finish()
if self.state.TrailblazePower >= self.combat_cost:
logger.info('Still having some trailblaze power run with less waves to empty it')
continue
if finish:
break

14
tasks/combat/interact.py Normal file
View File

@ -0,0 +1,14 @@
from tasks.base.ui import UI
from tasks.combat.assets.assets_combat_interact import DUNGEON_COMBAT_INTERACT
class CombatInteract(UI):
def handle_combat_interact(self):
"""
Returns:
bool: If clicked.
"""
if self.appear_then_click(DUNGEON_COMBAT_INTERACT, interval=2):
return True
return False

View File

@ -1,3 +1,4 @@
from module.base.timer import Timer
from module.ocr.ocr import Digit, DigitCounter from module.ocr.ocr import Digit, DigitCounter
from tasks.base.ui import UI from tasks.base.ui import UI
from tasks.combat.assets.assets_combat_prepare import ( from tasks.combat.assets.assets_combat_prepare import (
@ -23,11 +24,37 @@ class CombatPrepare(UI):
skip_first_screenshot=True skip_first_screenshot=True
) )
def combat_get_trailblaze_power(self) -> int: def combat_has_multi_wave(self) -> bool:
""" """
If combat has waves to set.
Most dungeons can do 6 times at one time while bosses don't.
"""
return self.appear(WAVE_MINUS) or self.appear(WAVE_PLUS)
def combat_get_trailblaze_power(self, expect_reduce=False, skip_first_screenshot=True) -> int:
"""
Args:
expect_reduce: Current value is supposed to be lower than the previous.
skip_first_screenshot:
Pages: Pages:
in: COMBAT_PREPARE or COMBAT_REPEAT in: COMBAT_PREPARE or COMBAT_REPEAT
""" """
current, _, _ = DigitCounter(OCR_TRAILBLAZE_POWER).ocr_single_line(self.device.image) timeout = Timer(1, count=2).start()
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
current, _, _ = DigitCounter(OCR_TRAILBLAZE_POWER).ocr_single_line(self.device.image)
# Confirm if it is > 180, sometimes just OCR errors
if current > 180 and timeout.reached():
break
if expect_reduce and current >= self.state.TrailblazePower:
continue
if current <= 180:
break
self.state.TrailblazePower = current self.state.TrailblazePower = current
return current return current

View File

@ -9,7 +9,9 @@ from tasks.combat.assets.assets_combat_state import COMBAT_AUTO, COMBAT_PAUSE, C
class CombatState(UI): class CombatState(UI):
_combat_click_interval = Timer(1, count=2) _combat_click_interval = Timer(2, count=4)
_combat_auto_checked = False
_combat_2x_checked = False
def is_combat_executing(self) -> bool: def is_combat_executing(self) -> bool:
appear = self.appear(COMBAT_PAUSE) appear = self.appear(COMBAT_PAUSE)
@ -26,7 +28,7 @@ class CombatState(UI):
# 148 150 150 150 150 150 144 138 134 141 136 133 173 183 130 128 127 126] # 148 150 150 150 150 150 144 138 134 141 136 133 173 183 130 128 127 126]
parameters = { parameters = {
# Border is about 188-190 # Border is about 188-190
'height': 128, 'height': 96,
# Background is about 120-122 # Background is about 120-122
'prominence': 35, 'prominence': 35,
'width': (0, 7), 'width': (0, 7),
@ -39,7 +41,8 @@ class CombatState(UI):
elif count == 2: elif count == 2:
return True return True
else: else:
logger.warning(f'Unexpected peak amount on {button}: {count}, lines={lines}') # logger.warning(f'Unexpected peak amount on {button}: {count}, lines={lines}')
# self.device.image_save()
return False return False
def is_combat_auto(self) -> bool: def is_combat_auto(self) -> bool:
@ -48,6 +51,10 @@ class CombatState(UI):
def is_combat_speed_2x(self) -> bool: def is_combat_speed_2x(self) -> bool:
return self._is_combat_button_active(COMBAT_SPEED_2X) return self._is_combat_button_active(COMBAT_SPEED_2X)
def combat_state_reset(self):
self._combat_auto_checked = False
self._combat_2x_checked = False
def handle_combat_state(self, auto=True, speed_2x=True): def handle_combat_state(self, auto=True, speed_2x=True):
""" """
Set combat auto and 2X speed. Enable both by default. Set combat auto and 2X speed. Enable both by default.
@ -55,28 +62,49 @@ class CombatState(UI):
Returns: Returns:
bool: If clicked bool: If clicked
""" """
if self._combat_auto_checked and self._combat_2x_checked:
return False
if not self.is_combat_executing(): if not self.is_combat_executing():
return False return False
if speed_2x and not self.is_combat_speed_2x(): if not self._combat_2x_checked:
if self._combat_click_interval.reached(): if speed_2x:
self.device.click(COMBAT_SPEED_2X) if self.is_combat_speed_2x():
self._combat_click_interval.reset() logger.info('_combat_2x_checked')
return True self._combat_2x_checked = True
if not speed_2x and self.is_combat_speed_2x(): else:
if self._combat_click_interval.reached(): if self._combat_click_interval.reached():
self.device.click(COMBAT_SPEED_2X) self.device.click(COMBAT_SPEED_2X)
self._combat_click_interval.reset() self._combat_click_interval.reset()
return True return True
if auto and not self.is_combat_auto(): else:
if self._combat_click_interval.reached(): if self.is_combat_speed_2x():
self.device.click(COMBAT_AUTO) if self._combat_click_interval.reached():
self._combat_click_interval.reset() self.device.click(COMBAT_SPEED_2X)
return True self._combat_click_interval.reset()
if not auto and self.is_combat_auto(): return True
if self._combat_click_interval.reached(): else:
self.device.click(COMBAT_AUTO) logger.info('_combat_2x_checked')
self._combat_click_interval.reset() self._combat_2x_checked = True
return True
if not self._combat_auto_checked:
if auto:
if self.is_combat_auto():
logger.info('_combat_auto_checked')
self._combat_auto_checked = True
else:
if self._combat_click_interval.reached():
self.device.click(COMBAT_AUTO)
self._combat_click_interval.reset()
return True
else:
if self.is_combat_auto():
if self._combat_click_interval.reached():
self.device.click(COMBAT_AUTO)
self._combat_click_interval.reset()
return True
else:
logger.info('_combat_auto_checked')
self._combat_auto_checked = True
return False return False

View File

@ -39,17 +39,38 @@ class CombatTeam(UI):
# logger.warning(f'No team selected') # logger.warning(f'No team selected')
return 0 return 0
def team_set(self, team: int = 1, skip_first_screenshot=True): def team_set(self, team: int = 1, skip_first_screenshot=True) -> bool:
""" """
Args: Args:
team: Team index, 1 to 6. team: Team index, 1 to 6.
skip_first_screenshot: skip_first_screenshot:
Returns:
bool: If clicked
Pages: Pages:
in: page_team in: page_team
""" """
logger.info(f'Team set: {team}') logger.info(f'Team set: {team}')
# Wait teams show up
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# End
current = self._get_team_selected()
if current:
if current == team:
logger.info(f'Selected to the correct team')
return False
else:
break
# Set team
interval = Timer(2) interval = Timer(2)
skip_first_screenshot = True
while 1: while 1:
if skip_first_screenshot: if skip_first_screenshot:
skip_first_screenshot = False skip_first_screenshot = False
@ -61,7 +82,7 @@ class CombatTeam(UI):
logger.attr('Team', current) logger.attr('Team', current)
if current and current == team: if current and current == team:
logger.info(f'Selected to the correct team') logger.info(f'Selected to the correct team')
break return True
# Click # Click
if interval.reached(): if interval.reached():