StarRailCopilot/tasks/rogue/event/event.py
2024-10-25 01:09:52 +08:00

320 lines
13 KiB
Python

import random
import re
from dataclasses import dataclass
from functools import cached_property
from itertools import chain
from pponnxcr.predict_system import BoxedResult
from module.base.button import ClickButton
from module.base.decorator import del_cached_property
from module.base.utils import area_limit, area_offset
from module.logger import logger
from module.ocr.ocr import Ocr, OcrResultButton
from module.ui.scroll import Scroll
from tasks.rogue.assets.assets_rogue_event import *
from tasks.rogue.assets.assets_rogue_ui import BLESSING_CONFIRM, PAGE_EVENT
from tasks.rogue.blessing.ui import RogueUI
from tasks.rogue.event.preset import STRATEGIES, STRATEGY_COMMON
from tasks.rogue.keywords import (KEYWORDS_ROGUE_EVENT_OPTION,
KEYWORDS_ROGUE_EVENT_TITLE, RogueEventOption,
RogueEventTitle)
@dataclass
class OptionButton:
prefix_icon: ClickButton
button: OcrResultButton = None
is_valid: bool = True # Option with requirements might be disabled
is_bottom_page: bool = False
def __str__(self) -> str:
if self.button is not None:
return str(self.button.matched_keyword)
return super().__str__()
class OcrRogueEvent(Ocr):
merge_thres_y = 5
OCR_REPLACE = {
'cn': [],
'en': []
}
@cached_property
def ocr_regex(self) -> re.Pattern | None:
rules = self.OCR_REPLACE.get(self.lang)
if not rules:
return None
return re.compile('|'.join(
f'(?P<{kw.name}>{pat})'
for kw, pat in rules
))
def _after_process(self, result, keyword_class):
result = super().after_process(result)
if self.ocr_regex is None:
return result
matched = self.ocr_regex.fullmatch(result)
if matched is None:
return result
matched = keyword_class.find(matched.lastgroup)
matched = getattr(matched, self.lang)
return matched
class OcrRogueEventTitle(OcrRogueEvent):
OCR_REPLACE = {
'cn': [
(KEYWORDS_ROGUE_EVENT_TITLE.Rock_Paper_Scissors, '^猜拳.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Ka_ching_IPC_Banking_I, '^咔.*其一.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Ka_ching_IPC_Banking_II, '^咔.*其二.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Beast_Horde_Voracious_Catastrophe, '^兽群.*'),
],
'en': [
(KEYWORDS_ROGUE_EVENT_TITLE.Nomadic_Miners, '^Nomadic.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Nildis, '^Nildis.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Tavern, '^Tavern.*'),
(KEYWORDS_ROGUE_EVENT_TITLE.Insights_from_the_Universal_Dancer, '.*Dancer$'),
]
}
def after_process(self, result):
result = re.sub('卫[成戌]', '卫戍', result)
return self._after_process(result, RogueEventTitle)
class OcrRogueEventOption(OcrRogueEvent):
expected_options: list[OptionButton] = []
OCR_REPLACE = {
'cn': [
# Special cases with placeholder
(KEYWORDS_ROGUE_EVENT_OPTION.Deposit_2_Cosmic_Fragments_39a6, '存入\d+.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Withdraw_2_Cosmic_Fragments, '取出\d+.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Record_of_the_Aeon_of_1, '^关于.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Wait_for_THEM_13dc, '^等待.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Choose_number_two_It_snores_like_Andatur_Zazzalo, '.*二号.*安达.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Choose_number_three_Its_teeth_are_rusted_0f13, '.*三号.*牙齿.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Believe_in_THEM_with_pure_devotion, '虔诚信仰'),
(KEYWORDS_ROGUE_EVENT_OPTION.A_box_of_expired_doughnuts_6308, '^盒过期甜甜圈'),
],
'en': [
(KEYWORDS_ROGUE_EVENT_OPTION.Deposit_2_Cosmic_Fragments_39a6, 'Deposit \d+.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Withdraw_2_Cosmic_Fragments, 'Withdraw \d+.*'),
(KEYWORDS_ROGUE_EVENT_OPTION.Record_of_the_Aeon_of_1, '^Record of the Aeon.*'),
]
}
def filter_detected(self, result: BoxedResult) -> bool:
if not self.expected_options:
return True
right_bound = self.expected_options[0].prefix_icon.area[2]
if result.box[0] < right_bound:
return False
return True
def pre_process(self, image):
# Mask starlike icons to avoid them to be recognized as */#/+/米
offset = tuple(-x for x in self.button.area[:2])
for option in self.expected_options:
x1, y1, x2, y2 = area_offset(option.prefix_icon.area, offset)
image[y1:y2, x1:x2] = (0, 0, 0)
return image
def after_process(self, result):
return self._after_process(result, RogueEventOption)
class OptionScroll(Scroll):
def position_to_screen(self, position, random_range=(-0.05, 0.05)):
# This scroll itself can not be dragged, but OCR_OPTION.area can
area = super().position_to_screen(position, random_range)
confirm_width = self.area[0] - CHOOSE_OPTION_CONFIRM.button[0]
area_width = CHOOSE_OPTION_CONFIRM.button[0] - OCR_OPTION.area[0]
# A fixed offset is easy to fail for some reason
random_offset = random.uniform(0.2, 0.8) * area_width + confirm_width
area = area_offset(area, (-random_offset, 0))
# Flip drag direction upside down
return (
area[0], self.area[1] + self.area[3] - area[3],
area[2], self.area[1] + self.area[3] - area[1],
)
SCROLL_OPTION = OptionScroll(OPTION_SCROLL, color=(
219, 194, 145), name='SCROLL_OPTION')
class RogueEvent(RogueUI):
event_title: RogueEventTitle = None
options: list[OptionButton] = []
@cached_property
def valid_options(self) -> list[OptionButton]:
return [x for x in self.options if x.is_valid]
def handle_event_continue(self):
if self.appear(PAGE_EVENT, interval=0.6):
logger.info(f'{PAGE_EVENT} -> {BLESSING_CONFIRM}')
self.device.click(BLESSING_CONFIRM)
return True
if self.appear_then_click(CHOOSE_STORY, interval=2):
self.device.click_record_clear()
return True
if self.appear_then_click(CHOOSE_OPTION_CONFIRM, interval=2):
self.interval_reset([
PAGE_EVENT,
CHOOSE_STORY,
CHOOSE_OPTION,
])
return True
return False
def handle_event_option(self):
"""
self.event_title SHOULD be set to None before calling this function
Pages:
in: page_rogue
"""
self.options = []
del_cached_property(self, 'valid_options')
self._event_option_match()
count = len(self.valid_options)
if count == 0:
return False
logger.attr('EventOption', f'{count}/{len(self.options)}')
# Only one option, click directly
if count == 1:
if self.interval_is_reached(CHOOSE_OPTION, interval=2):
button = self.valid_options[0].prefix_icon
# Option at bottom
if button.area[1] > 500 and SCROLL_OPTION.appear(main=self):
SCROLL_OPTION.set_bottom(main=self)
self.device.click(button)
self.interval_reset(CHOOSE_OPTION, interval=2)
return True
if self.interval_is_reached(CHOOSE_OPTION, interval=2):
option = self._event_option_filter()
if SCROLL_OPTION.appear(main=self):
if option.is_bottom_page:
SCROLL_OPTION.set_bottom(main=self)
else:
SCROLL_OPTION.set_top(main=self)
self.device.click(option.prefix_icon)
self.interval_reset(CHOOSE_OPTION, interval=2)
return True
return False
def _event_option_match(self, is_bottom_page=False) -> int:
"""
Returns:
int: Number of option icons matched
"""
option_icons = CHOOSE_OPTION.match_multi_template(self.device.image)
for button in option_icons:
button.button = area_limit(button.button, OCR_OPTION.area)
self.options += [OptionButton(
prefix_icon=icon,
is_valid=self.image_color_count(icon.area, color=(
181, 162, 126), threshold=221, count=25),
is_bottom_page=is_bottom_page
) for icon in option_icons]
if option_icons:
del_cached_property(self, 'valid_options')
return len(option_icons)
def _event_option_ocr(self, expected_count: int) -> None:
"""
Args:
expected_count (int): Number of option icons matched
"""
expected_options = self.options[-expected_count:]
ocr = OcrRogueEventOption(OCR_OPTION)
ocr.expected_options = expected_options
ocr_results = ocr.matched_ocr(self.device.image, [RogueEventOption])
# Pair icons and ocr results
index = 0
all_matched = True
for option in expected_options:
_, y1, _, y2 = option.prefix_icon.area
for index in range(index, len(ocr_results)):
_, yy1, _, yy2 = ocr_results[index].area
if yy2 < y1:
continue
if yy1 > y2:
break
option.button = ocr_results[index]
break
if option.button is None:
option.is_valid = False
all_matched = False
if not all_matched:
logger.warning('Count of OCR_OPTION results is not as expected')
del_cached_property(self, 'valid_options')
def _event_option_filter(self) -> OptionButton:
if self.event_title is None:
# OCR area of rest area is different from other occurrences
if self.appear(REST_AREA):
self.event_title = KEYWORDS_ROGUE_EVENT_TITLE.Rest_Area
else:
# Title may contains multi lines
results = OcrRogueEventTitle(OCR_TITLE).matched_ocr(
self.device.image,
[RogueEventTitle]
)
if results:
self.event_title = results[0].matched_keyword
if self.event_title is None:
random_index = random.choice(range(len(self.valid_options)))
logger.warning('Failed to OCR title')
logger.info(f'Randomly select option {random_index+1}')
return self.valid_options[random_index]
strategy_name = self.config.RogueWorld_DomainStrategy
logger.attr('DomainStrategy', strategy_name)
if strategy_name not in STRATEGIES:
logger.warning(
'Unknown domain strategy, fall back to STRATEGY_COMMON'
)
strategy = STRATEGIES.get(strategy_name, STRATEGY_COMMON)
if self.event_title not in strategy:
random_index = random.choice(range(len(self.valid_options)))
logger.info(f'No strategy preset for {self.event_title}')
logger.info(f'Randomly select option {random_index+1}')
return self.valid_options[random_index]
# Try ocr
if not self.options:
self._event_option_match()
self._event_option_ocr(len(self.options))
# Check next page if there is scroll
if SCROLL_OPTION.appear(main=self) and SCROLL_OPTION.is_draggable(main=self):
if SCROLL_OPTION.set_bottom(main=self):
expected = self._event_option_match(is_bottom_page=True)
self._event_option_ocr(expected)
priority = [
random.shuffle(x) or x
if isinstance(x, (list, tuple)) else [x]
for x in strategy[self.event_title]
]
priority = list(chain.from_iterable(priority))
# Reason why _keywords_to_find()[0] is used to compare:
# Text of options in different events can be the same,
# so it is possible that keywords returned by matched_ocr
# is not exactly the same as options in RogueEventTitle.option_ids.
for expect in priority:
for i, option in enumerate(self.valid_options):
ocr_text = option.button.matched_keyword._keywords_to_find()[0]
expect_text = expect._keywords_to_find()[0]
if ocr_text == expect_text:
logger.info(f'Select option {i+1}: {option}')
return option
logger.error('No option was selected, return the last instead')
logger.info(f'Select last option: {self.valid_options[-1]}')
return self.valid_options[-1]