Add: Write planner results into config

This commit is contained in:
LmeSzinc 2024-05-16 02:59:04 +08:00
parent 117704273f
commit d8ea34757a
5 changed files with 419 additions and 37 deletions

View File

@ -218,7 +218,7 @@ class AdaptiveScroll(Scroll):
""" """
Args: Args:
area (Button, tuple): A button or area of the whole scroll. area (Button, tuple): A button or area of the whole scroll.
prominence (dict): Parameters passing to scipy.find_peaks parameters (dict): Parameters passing to scipy.find_peaks
background (int): background (int):
is_vertical (bool): True if vertical, false if horizontal. is_vertical (bool): True if vertical, false if horizontal.
name (str): name (str):

View File

@ -1,25 +1,17 @@
import re import re
from pydantic import BaseModel
from module.base.timer import Timer from module.base.timer import Timer
from module.exception import ScriptError from module.exception import ScriptError
from module.logger import logger from module.logger import logger
from module.ocr.ocr import Digit from module.ocr.ocr import Digit
from tasks.base.ui import UI from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE, WAVE_MINUS, WAVE_PLUS
from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE
from tasks.dungeon.assets.assets_dungeon_obtain import * from tasks.dungeon.assets.assets_dungeon_obtain import *
from tasks.dungeon.keywords import DungeonList from tasks.dungeon.keywords import DungeonList
from tasks.planner.keywords import ITEM_CLASSES from tasks.planner.keywords import ITEM_CLASSES
from tasks.planner.keywords.classes import ItemBase from tasks.planner.model import ObtainedAmmount, PlannerProgressMixin
from tasks.planner.result import OcrItemName from tasks.planner.result import OcrItemName
class ItemAmount(BaseModel):
item: ItemBase
amount: int
class OcrItemAmount(Digit): class OcrItemAmount(Digit):
def format_result(self, result): def format_result(self, result):
res = re.split(r'[:;]', result) res = re.split(r'[:;]', result)
@ -27,7 +19,7 @@ class OcrItemAmount(Digit):
return super().format_result(result) return super().format_result(result)
class DungeonObtain(UI): class DungeonObtain(PlannerProgressMixin):
""" """
Parse items that can be obtained from dungeon Parse items that can be obtained from dungeon
@ -92,13 +84,15 @@ class DungeonObtain(UI):
else: else:
self.device.screenshot() self.device.screenshot()
if self.appear(COMBAT_PREPARE): if not self.appear(ITEM_CLOSE) and self.appear(COMBAT_PREPARE):
if self.image_color_count(WAVE_MINUS, color=(246, 246, 246), threshold=221, count=100) \
or self.image_color_count(WAVE_PLUS, color=(246, 246, 246), threshold=221, count=100):
break break
if self.appear_then_click(ITEM_CLOSE, interval=2): if self.appear_then_click(ITEM_CLOSE, interval=2):
continue continue
@staticmethod @staticmethod
def _obtain_get_entry(dungeon: DungeonList, index: int = 1, prev: ItemAmount = None): def _obtain_get_entry(dungeon: DungeonList, index: int = 1, prev: ObtainedAmmount = None):
""" """
Args: Args:
dungeon: Current dungeon dungeon: Current dungeon
@ -149,7 +143,7 @@ class DungeonObtain(UI):
raise ScriptError(f'_obtain_get_entry: Cannot get entry from {dungeon}') raise ScriptError(f'_obtain_get_entry: Cannot get entry from {dungeon}')
def _obtain_parse(self) -> ItemAmount | None: def _obtain_parse(self) -> ObtainedAmmount | None:
""" """
Pages: Pages:
in: ITEM_CLOSE in: ITEM_CLOSE
@ -163,13 +157,13 @@ class DungeonObtain(UI):
logger.warning('_obtain_parse: Unknown item name') logger.warning('_obtain_parse: Unknown item name')
return None return None
logger.info(f'Item amount: item={item}, amount={amount}') # logger.info(f'ObtainedAmmount: item={item}, value={amount}')
return ItemAmount( return ObtainedAmmount(
item=item, item=item,
amount=amount, value=amount,
) )
def obtain_get(self, dungeon=None, skip_first_screenshot=True) -> list[ItemAmount]: def obtain_get(self, dungeon=None, skip_first_screenshot=True) -> list[ObtainedAmmount]:
""" """
Args: Args:
dungeon: Current dungeon, dungeon: Current dungeon,
@ -177,7 +171,7 @@ class DungeonObtain(UI):
skip_first_screenshot: skip_first_screenshot:
Returns: Returns:
list[ItemAmount]: list[ObtainedAmmount]:
Pages: Pages:
in: COMBAT_PREPARE in: COMBAT_PREPARE
@ -204,15 +198,17 @@ class DungeonObtain(UI):
prev = item prev = item
self._obtain_close() self._obtain_close()
logger.hr('Obtain get result') logger.hr('Obtained Result')
for item in items: for item in items:
logger.info(f'ItemAmount: {item.item.name}, {item.amount}') logger.info(f'Obtained item: {item.item.name}, {item.value}')
""" """
<<< OBTAIN GET RESULT >>> <<< OBTAIN GET RESULT >>>
ItemAmount: Arrow_of_the_Starchaser, 15 ItemAmount: Arrow_of_the_Starchaser, 15
ItemAmount: Arrow_of_the_Demon_Slayer, 68 ItemAmount: Arrow_of_the_Demon_Slayer, 68
ItemAmount: Arrow_of_the_Beast_Hunter, 85 ItemAmount: Arrow_of_the_Beast_Hunter, 85
""" """
self.planner.load_obtained_amount(items)
self.planner_write()
return items return items

View File

@ -1,3 +1,5 @@
from typing import Union
import tasks.planner.keywords.item_ascension as KEYWORDS_ITEM_ASCENSION import tasks.planner.keywords.item_ascension as KEYWORDS_ITEM_ASCENSION
import tasks.planner.keywords.item_calyx as KEYWORDS_ITEM_CALYX import tasks.planner.keywords.item_calyx as KEYWORDS_ITEM_CALYX
import tasks.planner.keywords.item_currency as KEYWORDS_ITEM_CURRENCY import tasks.planner.keywords.item_currency as KEYWORDS_ITEM_CURRENCY
@ -7,3 +9,4 @@ import tasks.planner.keywords.item_weekly as KEYWORDS_ITEM_WEEKLY
from tasks.planner.keywords.classes import ItemAscension, ItemCalyx, ItemCurrency, ItemExp, ItemTrace, ItemWeekly from tasks.planner.keywords.classes import ItemAscension, ItemCalyx, ItemCurrency, ItemExp, ItemTrace, ItemWeekly
ITEM_CLASSES = [ItemAscension, ItemCalyx, ItemCurrency, ItemExp, ItemTrace, ItemWeekly] ITEM_CLASSES = [ItemAscension, ItemCalyx, ItemCurrency, ItemExp, ItemTrace, ItemWeekly]
ITEM_TYPES = Union[ItemAscension, ItemCalyx, ItemCurrency, ItemExp, ItemTrace, ItemWeekly]

373
tasks/planner/model.py Normal file
View File

@ -0,0 +1,373 @@
import typing as t
from datetime import datetime
from functools import partial
from pydantic import BaseModel, ValidationError, WrapValidator, field_validator, model_validator
from module.base.decorator import cached_property, del_cached_property
from module.config.stored.classes import now
from module.config.utils import DEFAULT_TIME
from module.exception import ScriptError
from module.logger import logger
from tasks.base.ui import UI
from tasks.dungeon.keywords import DungeonList
from tasks.planner.keywords import ITEM_TYPES
from tasks.planner.keywords.classes import ItemBase
class PlannerResultRow(BaseModel):
"""
A row of data from planner result page
"""
item: ITEM_TYPES
total: int
synthesize: int
demand: int
def __eq__(self, other):
return self.item == other.item
class ObtainedAmmount(BaseModel):
"""
A row of data from DungeonObtain detection
"""
item: ITEM_TYPES
value: int
def _fallback_to_default_validator(
get_default: t.Callable[[], t.Any],
v: t.Any,
next_: t.Callable[[t.Any], t.Any],
) -> t.Any:
try:
return next_(v)
except ValueError as e:
logger.error(e)
return get_default()
class BaseModelWithFallback(BaseModel):
"""
Pydantic model that fallbacks to default on error
https://github.com/pydantic/pydantic/discussions/8579
"""
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: t.Any) -> None:
for field in cls.model_fields.values():
if not field.is_required():
validator = WrapValidator(partial(_fallback_to_default_validator, field.get_default))
field.metadata.append(validator)
cls.model_rebuild(force=True)
class MultiValue(BaseModelWithFallback):
green: int = 0
blue: int = 0
purple: int = 0
class StoredPlannerProxy(BaseModelWithFallback):
item: ITEM_TYPES
value: int | MultiValue = 0
total: int | MultiValue = 0
synthesize: int | MultiValue = 0
progress: float = 0.
time: datetime = DEFAULT_TIME
@field_validator('item', mode='before')
def val_item(cls, v, info):
if isinstance(v, str):
v = ItemBase.find_name(v)
return v
@model_validator(mode='after')
def val_value(self):
if self.item.has_group_base:
if not isinstance(self.value, MultiValue):
self.value = MultiValue()
if not isinstance(self.total, MultiValue):
self.total = MultiValue()
if not isinstance(self.synthesize, MultiValue):
self.synthesize = MultiValue()
else:
if not isinstance(self.value, int):
self.value = 0
if not isinstance(self.total, int):
self.total = 0
if not isinstance(self.synthesize, int):
self.synthesize = 0
return self
def update_synthesize(self):
if self.item.has_group_base:
green = self.value.green - self.total.green
blue = self.value.blue - self.total.blue
purple = self.value.purple - self.total.purple
syn_blue = 0
syn_purple = 0
if green >= 3 and blue < 0:
syn = min(green // 3, -blue)
syn_blue += syn
green -= syn * 3
# blue += syn
if blue >= 3 and purple < 0:
syn = min(blue // 3, -purple)
syn_purple += syn
blue -= syn * 3
purple += syn
if green >= 9 and purple < 0:
syn = min(green // 9, -purple)
syn_purple += syn
syn_blue += syn * 3
green -= syn * 9
# purple += syn
self.synthesize.green = 0
self.synthesize.blue = syn_blue
self.synthesize.purple = syn_purple
else:
self.synthesize = 0
def revert_synthesize(self):
if self.item.has_group_base:
self.value.green += self.synthesize.blue * 3
if self.synthesize.blue > 0:
self.value.green += self.synthesize.purple * 9
else:
self.value.blue += self.synthesize.blue * 3
def update_progress(self):
if self.item.has_group_base:
total = self.total.green + self.total.blue * 3 + self.total.purple * 9
green = min(self.value.green, self.total.green)
blue = min(self.value.blue + self.synthesize.blue, self.total.blue)
purple = min(self.value.purple + self.synthesize.purple, self.total.purple)
value = green + blue * 3 + purple * 9
progress = value / total * 100
self.progress = round(min(max(progress, 0), 100), 2)
else:
progress = self.value / self.total * 100
self.progress = round(min(max(progress, 0), 100), 2)
def update(self):
self.update_progress()
self.update_synthesize()
self.time = now()
def load_value_total(self, item: ItemBase, value=None, total=None, synthesize=None):
"""
Update data from PlannerResultRow to self
"""
if self.item.has_group_base:
if item.group_base != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
else:
if item != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
if self.item.has_group_base:
if not self.item.is_rarity_purple:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but self is not in rarity purple')
if item.is_rarity_green:
if value is not None:
self.value.green = value
if total is not None:
self.total.green = total
# if synthesize is not None:
# self.synthesize.green = synthesize
elif item.is_rarity_blue:
if value is not None:
self.value.blue = value
if total is not None:
self.total.blue = total
if synthesize is not None:
self.synthesize.blue = synthesize
elif item.is_rarity_purple:
if value is not None:
self.value.purple = value
if total is not None:
self.total.purple = total
if synthesize is not None:
self.synthesize.purple = synthesize
else:
raise ScriptError(
f'load_value_total: Trying to load {item} in to {self} but item is in invalid rarity')
else:
if value is not None:
self.value = value
if total is not None:
self.total = total
def need_farm(self):
return self.progress < 100
def need_synthesize(self):
if self.item.has_group_base:
return self.synthesize.green > 0 or self.synthesize.blue > 0 or self.synthesize.purple > 0
else:
return self.synthesize > 0
def load_planner_result(self, row: PlannerResultRow):
"""
Update data from PlannerResultRow to self
"""
# Approximate value, accurate value can be update in DungeonObtain
value = row.total - row.synthesize - row.demand
self.load_value_total(item=row.item, value=value, total=row.total, synthesize=row.synthesize)
def load_item_amount(self, row: ObtainedAmmount):
"""
Update data from ObtainedAmmount to self
"""
value = row.value
self.load_value_total(item=row.item, value=value)
class PlannerProgressParser:
def __init__(self):
self.rows: dict[str, StoredPlannerProxy] = {}
def from_planner_results(self, results: list[PlannerResultRow]):
self.rows = {}
# Create objects of base items first
# them load value and total
for row in results:
base = row.item.group_base
if base.name not in self.rows:
try:
obj = StoredPlannerProxy(item=base)
except ScriptError as e:
logger.error(e)
continue
self.rows[base.name] = obj
else:
obj = self.rows[base.name]
obj.load_planner_result(row)
rows = {}
for name, row in self.rows.items():
row.revert_synthesize()
row.update()
if row.need_farm() or row.need_synthesize():
rows[name] = row
self.rows = rows
return self
def load_obtained_amount(self, results: list[ObtainedAmmount]):
for row in results:
base = row.item.group_base
try:
obj = self.rows[base.name]
except KeyError:
logger.warning(
f'load_obtained_amount() drops {row} because no need to farm')
continue
obj.load_item_amount(row)
obj.update()
return self
def from_config(self, data):
self.rows = {}
for row in data.values():
if not row:
continue
try:
row = StoredPlannerProxy(**row)
except (ScriptError, ValidationError) as e:
logger.error(e)
continue
if not row.item.is_group_base:
logger.error(f'from_config: item is not group base {row}')
continue
self.rows[row.item.name] = row
return self
def to_config(self) -> dict:
data = {}
for row in self.rows.values():
name = f'Item_{row.item.name}'
dic = row.model_dump()
dic['item'] = row.item.name
data[name] = dic
return data
def iter_item_to_farm(self) -> t.Iterable[ItemBase]:
rows = [row for row in self.rows.values() if row.need_farm()]
for row in rows:
if row.item.is_ItemWeekly:
yield row.item
for row in rows:
if row.item.is_ItemAscension:
yield row.item
for row in rows:
if row.item.is_ItemTrace:
yield row.item
for row in rows:
if row.item.is_ItemExp:
yield row.item
for row in rows:
if row.item.is_ItemCurrency:
yield row.item
def get_dungeon(self) -> DungeonList | None:
"""
Get dungeon to farm, or None if planner finished or the remaining items cannot be farmed
"""
for item in self.iter_item_to_farm():
if item.is_ItemWeekly:
continue
dungeon = item.dungeon
if dungeon is None:
logger.error(f'Item {item} has nowhere to be farmed')
continue
logger.info(f'Planner farm: {dungeon}')
return dungeon
logger.info('Planner farm empty')
return None
def get_weekly(self) -> DungeonList | None:
for item in self.iter_item_to_farm():
if not item.is_ItemWeekly:
continue
dungeon = item.dungeon
if dungeon is None:
logger.error(f'Item {item} has nowhere to be farmed')
continue
logger.info(f'Planner farm: {dungeon}')
return dungeon
logger.info('Planner farm empty')
return None
class PlannerProgressMixin(UI):
def planner_write_results(self, results: list[PlannerResultRow]):
"""
Write planner detection results info user config
"""
model = PlannerProgressParser().from_planner_results(results)
data = model.to_config()
self.config.cross_set('Dungeon.Planner', data)
@cached_property
def planner(self) -> PlannerProgressParser:
data = self.config.cross_get('Dungeon.Planner', default={})
model = PlannerProgressParser().from_config(data)
logger.hr('Planner')
for row in model.rows.values():
logger.info(row)
return model
def planner_write(self):
"""
Write planner into user config, delete planner object
"""
data = self.planner.to_config()
self.config.cross_set('Dungeon.Planner', data)
del_cached_property(self, 'planner')

View File

@ -2,7 +2,6 @@ import re
import cv2 import cv2
from pponnxcr.predict_system import BoxedResult from pponnxcr.predict_system import BoxedResult
from pydantic import BaseModel
from module.base.utils import area_center, area_in_area from module.base.utils import area_center, area_in_area
from module.logger import logger from module.logger import logger
@ -11,21 +10,15 @@ from module.ui.scroll import AdaptiveScroll
from tasks.daily.synthesize import SynthesizeUI from tasks.daily.synthesize import SynthesizeUI
from tasks.planner.assets.assets_planner_result import * from tasks.planner.assets.assets_planner_result import *
from tasks.planner.keywords import ITEM_CLASSES from tasks.planner.keywords import ITEM_CLASSES
from tasks.planner.keywords.classes import ItemBase, ItemCurrency from tasks.planner.keywords.classes import ItemCurrency
from tasks.planner.model import PlannerProgressMixin, PlannerResultRow
CALCULATE_TITLE.load_search(RESULT_CHECK.search) CALCULATE_TITLE.load_search(RESULT_CHECK.search)
MATERIAL_TITLE.load_search(RESULT_CHECK.search) MATERIAL_TITLE.load_search(RESULT_CHECK.search)
DETAIL_TITLE.load_search(RESULT_CHECK.search) DETAIL_TITLE.load_search(RESULT_CHECK.search)
class PlannerResultRow(BaseModel):
item: ItemBase
total: int
synthesize: int
demand: int
def __eq__(self, other):
return self.item == other.item
class OcrItemName(Ocr): class OcrItemName(Ocr):
@ -44,6 +37,21 @@ class OcrPlannerResult(OcrWhiteLetterOnComplexBackground, OcrItemName):
self.limited_area = OCR_RESULT.area self.limited_area = OCR_RESULT.area
self.limit_y = 720 self.limit_y = 720
def _match_result(
self,
result: str,
keyword_classes,
lang: str = 'cn',
ignore_punctuation=True,
ignore_digit=True):
return super()._match_result(
result,
keyword_classes,
lang,
ignore_punctuation,
ignore_digit,
)
def filter_detected(self, result: BoxedResult) -> bool: def filter_detected(self, result: BoxedResult) -> bool:
if not area_in_area(result.box, self.limited_area, threshold=0): if not area_in_area(result.box, self.limited_area, threshold=0):
return False return False
@ -67,7 +75,7 @@ class OcrPlannerResult(OcrWhiteLetterOnComplexBackground, OcrItemName):
return image return image
class PlannerResult(SynthesizeUI): class PlannerResult(SynthesizeUI, PlannerProgressMixin):
def is_in_planner_result(self): def is_in_planner_result(self):
if self.appear(RESULT_CHECK): if self.appear(RESULT_CHECK):
return True return True
@ -116,12 +124,12 @@ class PlannerResult(SynthesizeUI):
if y_match(number, y_item): if y_match(number, y_item):
total = int(number.ocr_text) total = int(number.ocr_text)
break break
synthesize = -1 synthesize = 0
for number in list_synthesize: for number in list_synthesize:
if y_match(number, y_item): if y_match(number, y_item):
synthesize = int(number.ocr_text) synthesize = int(number.ocr_text)
break break
demand = -1 demand = 0
for number in list_demand: for number in list_demand:
if y_match(number, y_item): if y_match(number, y_item):
demand = int(number.ocr_text) demand = int(number.ocr_text)
@ -139,7 +147,7 @@ class PlannerResult(SynthesizeUI):
logger.warning(f'Planner row with total <= 0, {row}') logger.warning(f'Planner row with total <= 0, {row}')
continue continue
if row.synthesize < 0: if row.synthesize < 0:
# Credits always have synthesize="-" # Credits always have `synthesize`=="-"
if row.item.__class__ != ItemCurrency: if row.item.__class__ != ItemCurrency:
logger.warning(f'Planner row with synthesize < 0, {row}') logger.warning(f'Planner row with synthesize < 0, {row}')
continue continue
@ -203,7 +211,9 @@ class PlannerResult(SynthesizeUI):
logger.hr('Planner Result') logger.hr('Planner Result')
for row in out: for row in out:
logger.info(f'Item: {row.item.name}, {row.total}, {row.synthesize}, {row.demand}') logger.info(f'Planner item: {row.item.name}, {row.total}, {row.synthesize}, {row.demand}')
self.planner_write_results(out)
return out return out