StarRailCopilot/tasks/dungeon/state.py

155 lines
5.4 KiB
Python
Raw Normal View History

2023-12-08 10:09:33 +00:00
from datetime import timedelta
2023-12-09 13:39:37 +00:00
from module.base.base import ModuleBase
from module.base.timer import Timer
from module.base.utils import crop
2023-12-08 10:09:33 +00:00
from module.config.stored.classes import now
from module.config.utils import DEFAULT_TIME
from module.logger import logger
from module.ocr.ocr import DigitCounter
from tasks.base.ui import UI
from tasks.dungeon.assets.assets_dungeon_state import OCR_SIMUNI_POINT, OCR_SIMUNI_POINT_OFFSET, OCR_STAMINA
2023-12-08 10:09:33 +00:00
from tasks.dungeon.keywords import DungeonList
class OcrSimUniPoint(DigitCounter):
def after_process(self, result):
result = super().after_process(result)
result = result.replace('O', '0').replace('o', '0')
return result
class DungeonState(UI):
def dungeon_get_simuni_point(self, image=None) -> int:
"""
Page:
in: page_guide, Survival_Index, Simulated_Universe
"""
logger.info('Get simulated universe points')
if image is None:
image = self.device.image
_ = OCR_SIMUNI_POINT_OFFSET.match_template(image)
OCR_SIMUNI_POINT.load_offset(OCR_SIMUNI_POINT_OFFSET)
area = (
OCR_SIMUNI_POINT.area[0],
OCR_SIMUNI_POINT.button[1],
OCR_SIMUNI_POINT.area[2],
OCR_SIMUNI_POINT.button[3],
)
ocr = OcrSimUniPoint(OCR_SIMUNI_POINT)
value, _, total = ocr.ocr_single_line(crop(image, area), direct_ocr=True)
if total and value <= total:
logger.attr('SimulatedUniverse', f'{value}/{total}')
self.config.stored.SimulatedUniverse.set(value, total)
return value
else:
logger.warning(f'Invalid SimulatedUniverse points: {value}/{total}')
return 0
def dungeon_update_stamina(self, image=None, skip_first_screenshot=True):
"""
Returns:
bool: If success
Pages:
in: page_guild, Survival_Index, Simulated_Universe
or page_rogue, LEVEL_CONFIRM
or rogue, REWARD_CLOSE
"""
ocr = DigitCounter(OCR_STAMINA)
timeout = Timer(1, count=2).start()
if image is None:
image = self.device.image
use_cached_image = False
else:
skip_first_screenshot = True
use_cached_image = True
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
image = self.device.image
stamina = (0, 0, 0)
immersifier = (0, 0, 0)
if timeout.reached():
logger.warning('dungeon_update_stamina() timeout')
return False
for row in ocr.detect_and_ocr(image):
if row.ocr_text.isdigit():
continue
if row.ocr_text == '+':
continue
2024-01-01 20:13:40 +00:00
if not ocr.is_format_matched(row.ocr_text):
continue
data = ocr.format_result(row.ocr_text)
if data[2] == self.config.stored.TrailblazePower.FIXED_TOTAL:
stamina = data
if data[2] == self.config.stored.Immersifier.FIXED_TOTAL:
immersifier = data
if stamina[2] > 0 and immersifier[2] > 0:
break
if use_cached_image:
logger.info('dungeon_update_stamina() ended')
return
stamina = stamina[0]
immersifier = immersifier[0]
logger.attr('TrailblazePower', stamina)
logger.attr('Imersifier', immersifier)
with self.config.multi_set():
self.config.stored.TrailblazePower.value = stamina
self.config.stored.Immersifier.value = immersifier
return True
def dungeon_update_simuni(self):
"""
Update rogue weekly points, stamina, immersifier
Run in a new thread to be faster as data is not used immediately
Page:
in: page_guide, Survival_Index, Simulated_Universe
"""
logger.info('dungeon_update_simuni')
def func(image):
logger.info('Update thread start')
with self.config.multi_set():
self.dungeon_get_simuni_point(image)
self.dungeon_update_stamina(image)
2023-12-09 13:39:37 +00:00
ModuleBase.worker.submit(func, self.device.image)
2023-12-08 10:09:33 +00:00
def dungeon_stamina_delay(self, dungeon: DungeonList):
"""
Delay tasks that use stamina
"""
if dungeon.is_Simulated_Universe:
limit = 80
elif dungeon.is_Cavern_of_Corrosion:
limit = 80
elif dungeon.is_Echo_of_War:
limit = 30
else:
limit = 60
# Recover 1 trailbaze power each 6 minutes
current = self.config.stored.TrailblazePower.value
cover = max(limit - current, 0) * 6
future = now() + timedelta(minutes=cover)
logger.info(f'Currently has {current} need {cover} minutes to reach {limit}')
tasks = ['Dungeon', 'Weekly']
with self.config.multi_set():
for task in tasks:
next_run = self.config.cross_get(keys=f'{task}.Scheduler.NextRun', default=DEFAULT_TIME)
if future > next_run:
logger.info(f"Delay task `{task}` to {future}")
self.config.cross_set(keys=f'{task}.Scheduler.NextRun', value=future)