StarRailCopilot/tasks/planner/scan.py
2024-09-11 22:15:46 +08:00

257 lines
8.7 KiB
Python

import re
import cv2
from pponnxcr.predict_system import BoxedResult
from module.base.decorator import cached_property
from module.base.utils import area_center, area_in_area
from module.exception import GamePageUnknownError
from module.logger import logger
from module.ocr.ocr import Ocr, OcrWhiteLetterOnComplexBackground
from module.ui.scroll import AdaptiveScroll
from tasks.base.page import page_planner
from tasks.daily.synthesize import SynthesizeUI
from tasks.planner.assets.assets_planner_result import *
from tasks.planner.keywords import ITEM_CLASSES
from tasks.planner.keywords.classes import ItemCurrency
from tasks.planner.model import PlannerMixin, PlannerResultRow
CALCULATE_TITLE.load_search(RESULT_CHECK.search)
MATERIAL_TITLE.load_search(RESULT_CHECK.search)
DETAIL_TITLE.load_search(RESULT_CHECK.search)
class OcrItemName(Ocr):
def after_process(self, result):
result = result.replace('方相果实', '万相果实')
result = result.replace('念火之心', '忿火之心')
result = result.replace('楚天之魔', '焚天之魔')
result = re.sub('^火之心', '忿火之心', result)
result = re.sub('工造机$', '工造机杼', result)
result = re.sub('工造迥?轮', '工造迴轮', result)
result = re.sub('月狂[療撩]?牙', '月狂獠牙', result)
# 毁灭者的未路 思绪末屑
result = result.replace('未路', '末路')
result = result.replace('未屑', '末屑')
result = result.replace('粉未', '粉末')
# Error words on blank background
result = re.sub('^[國東]', '', result)
result = re.sub('時$', '', result)
# 一杯酩酊的时代
result = re.sub('一杯[酪酩酊酐]*的', '一杯酩酊的', result)
# 蠢动原核
result = re.sub('[鑫蠹]动', '蠢动', result)
return result
class OcrPlannerResult(OcrWhiteLetterOnComplexBackground, OcrItemName):
min_box = (16, 20)
def __init__(self, lang=None):
super().__init__(OCR_RESULT, lang=lang)
self.limited_area = OCR_RESULT.area
self.limit_y = 720
def _match_result(
self,
result: str,
keyword_classes,
lang: str = None,
ignore_punctuation=True,
ignore_digit=True):
lang = self.lang
return super()._match_result(
result,
keyword_classes,
lang,
ignore_punctuation,
ignore_digit,
)
def filter_detected(self, result: BoxedResult) -> bool:
if not area_in_area(result.box, self.limited_area, threshold=0):
return False
if area_center(result.box)[1] > self.limit_y:
return False
return True
def detect_and_ocr(self, image, *args, **kwargs):
# Remove rows below DETAIL_TITLE
if DETAIL_TITLE.match_template(image):
self.limit_y = DETAIL_TITLE.button[3]
else:
self.limit_y = 720
return super().detect_and_ocr(image, *args, **kwargs)
def pre_process(self, image):
image = cv2.subtract((255, 255, 255, 0), image)
return image
class PlannerScan(SynthesizeUI, PlannerMixin):
def is_in_planner_result(self):
if self.appear(RESULT_CHECK):
return True
if self.appear(CALCULATE_TITLE):
return True
if self.appear(MATERIAL_TITLE):
return True
if self.appear(DETAIL_TITLE):
return True
return False
@cached_property
def planner_lang(self) -> str:
if self.config.Emulator_PackageName in ['CN-Official', 'CN-Bilibili']:
lang = 'cn'
else:
lang = self.config.LANG
if lang == 'auto':
logger.error('Language was not set before planner scan, assume it is "cn"')
lang = 'cn'
logger.attr('PlannerLang', lang)
return lang
def parse_planner_result_page(self) -> list[PlannerResultRow]:
"""
Pages:
in: planner result
"""
ocr = OcrPlannerResult(lang=self.planner_lang)
results = ocr.detect_and_ocr(self.device.image)
x_total = 842
x_synthesize = 965
x_demand = 1129
def x_match(result: BoxedResult, x):
rx = area_center(result.box)[0]
return x - 50 <= rx <= x + 50
def y_match(result: BoxedResult, y):
rx = area_center(result.box)[1]
return y - 15 <= rx <= y + 15
# Split columns
list_item = [r for r in results
if not r.ocr_text.isdigit() and ocr._match_result(r.ocr_text, keyword_classes=ITEM_CLASSES)]
list_number = [r for r in results if r.ocr_text.isdigit()]
list_total = [r for r in list_number if x_match(r, x_total)]
list_synthesize = [r for r in list_number if x_match(r, x_synthesize)]
list_demand = [r for r in list_number if x_match(r, x_demand)]
# Structure
out: list[PlannerResultRow] = []
for item in list_item:
y_item = area_center(item.box)[1]
total = -1
for number in list_total:
if y_match(number, y_item):
total = int(number.ocr_text)
break
synthesize = 0
for number in list_synthesize:
if y_match(number, y_item):
synthesize = int(number.ocr_text)
break
demand = 0
for number in list_demand:
if y_match(number, y_item):
demand = int(number.ocr_text)
break
item = ocr._match_result(item.ocr_text, keyword_classes=ITEM_CLASSES)
row = PlannerResultRow(
item=item,
total=total,
synthesize=synthesize,
demand=demand
)
# Validate item
# print(row)
if row.total <= 0:
logger.warning(f'Planner row with total <= 0, {row}')
continue
if row.synthesize < 0:
# Credits always have `synthesize`=="-"
if row.item.__class__ != ItemCurrency:
logger.warning(f'Planner row with synthesize < 0, {row}')
continue
if row.demand < 0:
logger.warning(f'Planner row with demand < 0, {row}')
continue
# Add
out.append(row)
logger.info(f'parse_planner_result_page: {out}')
return out
def parse_planner_result(self, skip_first_screenshot=True) -> list[PlannerResultRow]:
"""
Pages:
in: planner result
"""
logger.hr('Parse planner result', level=2)
if not self.ui_page_appear(page_planner):
logger.error('Not in page_planner, game must in the planner result page before scanning')
raise GamePageUnknownError
scroll = AdaptiveScroll(RESULT_SCROLL.button, name=RESULT_SCROLL.name)
scroll.drag_threshold = 0.1
scroll.edge_threshold = 0.1
scroll.parameters = {
'height': 50,
'prominence': 15,
'width': 5,
}
if not skip_first_screenshot:
self.device.screenshot()
skip_first_screenshot = False
if not scroll.at_top(main=self):
scroll.set_top(main=self)
out = []
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
# Skip first page
if self.appear(CALCULATE_TITLE):
scroll.next_page(main=self, page=0.75)
continue
# Parse
rows = self.parse_planner_result_page()
for row in rows:
if row not in out:
out.append(row)
logger.attr('PlannerResult', len(rows))
# Scroll
if scroll.at_bottom(main=self):
logger.info('Reached scroll end, stop')
break
elif self.appear(DETAIL_TITLE):
logger.info('Reached DETAIL_TITLE, stop')
break
else:
scroll.next_page(main=self, page=0.8)
logger.hr('Planner Result')
for row in out:
logger.info(f'Planner item: {row.item.name}, {row.total}, {row.synthesize}, {row.demand}')
self.planner_write_results(out)
return out
def run(self):
self.device.screenshot()
self.parse_planner_result()
if __name__ == '__main__':
self = PlannerScan('src', task='PlannerScan')
self.device.screenshot()
self.parse_planner_result()