StarRailCopilot/tasks/planner/model.py
2024-05-27 03:03:11 +08:00

506 lines
17 KiB
Python

import typing as t
from datetime import datetime
from functools import partial
from pydantic import BaseModel, ValidationError, WrapValidator, field_validator, model_validator
from module.base.decorator import cached_property, del_cached_property
from module.config.stored.classes import now
from module.config.utils import DEFAULT_TIME
from module.exception import ScriptError
from module.logger import logger
from tasks.base.ui import UI
from tasks.dungeon.keywords import DungeonList
from tasks.planner.keywords import ITEM_TYPES
from tasks.planner.keywords.classes import ItemBase
class PlannerResultRow(BaseModel):
"""
A row of data from planner result page
"""
item: ITEM_TYPES
total: int
synthesize: int
demand: int
def __eq__(self, other):
return self.item == other.item
class ObtainedAmmount(BaseModel):
"""
A row of data from DungeonObtain detection
"""
item: ITEM_TYPES
value: int
def _fallback_to_default_validator(
get_default: t.Callable[[], t.Any],
v: t.Any,
next_: t.Callable[[t.Any], t.Any],
) -> t.Any:
try:
return next_(v)
except ValueError as e:
logger.error(e)
return get_default()
class BaseModelWithFallback(BaseModel):
"""
Pydantic model that fallbacks to default on error
https://github.com/pydantic/pydantic/discussions/8579
"""
@classmethod
def __pydantic_init_subclass__(cls, **kwargs: t.Any) -> None:
for field in cls.model_fields.values():
if not field.is_required():
validator = WrapValidator(partial(_fallback_to_default_validator, field.get_default))
field.metadata.append(validator)
cls.model_rebuild(force=True)
class MultiValue(BaseModelWithFallback):
green: int = 0
blue: int = 0
purple: int = 0
def add(self, other: "MultiValue"):
self.green += other.green
self.blue += other.blue
self.purple += other.purple
def equivalent_green(self):
return self.green + self.blue * 3 + self.purple * 9
class StoredPlannerProxy(BaseModelWithFallback):
item: ITEM_TYPES
value: int | MultiValue = 0
total: int | MultiValue = 0
synthesize: int | MultiValue = 0
progress: float = 0.
time: datetime = DEFAULT_TIME
@field_validator('item', mode='before')
def val_item(cls, v, info):
if isinstance(v, str):
v = ItemBase.find_name(v)
return v
@model_validator(mode='after')
def val_value(self):
if self.item.has_group_base:
if not isinstance(self.value, MultiValue):
self.value = MultiValue()
if not isinstance(self.total, MultiValue):
self.total = MultiValue()
if not isinstance(self.synthesize, MultiValue):
self.synthesize = MultiValue()
else:
if not isinstance(self.value, int):
self.value = 0
if not isinstance(self.total, int):
self.total = 0
if not isinstance(self.synthesize, int):
self.synthesize = 0
return self
def update_synthesize(self):
if self.item.has_group_base:
green = self.value.green - self.total.green
blue = self.value.blue - self.total.blue
purple = self.value.purple - self.total.purple
syn_blue = 0
syn_purple = 0
if green >= 3 and blue < 0:
syn = min(green // 3, -blue)
syn_blue += syn
green -= syn * 3
# blue += syn
if blue >= 3 and purple < 0:
syn = min(blue // 3, -purple)
syn_purple += syn
blue -= syn * 3
purple += syn
if green >= 9 and purple < 0:
syn = min(green // 9, -purple)
syn_purple += syn
syn_blue += syn * 3
green -= syn * 9
# purple += syn
self.synthesize.green = 0
self.synthesize.blue = syn_blue
self.synthesize.purple = syn_purple
else:
self.synthesize = 0
def revert_synthesize(self):
if self.item.has_group_base:
self.value.green += self.synthesize.blue * 3
if self.synthesize.blue > 0:
self.value.green += self.synthesize.purple * 9
else:
self.value.blue += self.synthesize.purple * 3
def is_approaching_total(self):
"""
Returns:
bool: True if the future value may >= total after next combat
"""
if self.item.dungeon.is_Calyx_Golden_Treasures:
return self.value + 24000 >= self.total
if self.item.dungeon.is_Calyx_Golden_Memories:
# purple, blue, green = 5, 1, 0
value = self.value.equivalent_green()
total = self.total.equivalent_green()
return value + 48 >= total
if self.item.dungeon.Calyx_Golden_Aether:
# purple, blue, green = 1, 2, 2.5
value = self.value.equivalent_green()
total = self.total.equivalent_green()
return value + 17.5 >= total
if self.item.is_ItemAscension:
return self.value + 3 >= self.total
if self.item.is_ItemTrace:
# purple, blue, green = 0.155, 1, 1.25
value = self.value.equivalent_green()
total = self.total.equivalent_green()
return value + 33.87 >= total
if self.item.is_ItemWeekly:
return self.value + 3 >= self.total
return False
def update_progress(self):
if self.item.has_group_base:
total = self.total.equivalent_green()
green = min(self.value.green, self.total.green)
blue = min(self.value.blue + self.synthesize.blue, self.total.blue)
purple = min(self.value.purple + self.synthesize.purple, self.total.purple)
value = green + blue * 3 + purple * 9
progress = value / total * 100
self.progress = round(min(max(progress, 0), 100), 2)
else:
progress = self.value / self.total * 100
self.progress = round(min(max(progress, 0), 100), 2)
def update(self):
self.update_synthesize()
self.update_progress()
self.time = now()
def load_value_total(self, item: ItemBase, value=None, total=None, synthesize=None):
"""
Update data from PlannerResultRow to self
"""
if self.item.has_group_base:
if item.group_base != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
else:
if item != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
if self.item.has_group_base:
if not self.item.is_rarity_purple:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but self is not in rarity purple')
if item.is_rarity_green:
if value is not None:
self.value.green = value
if total is not None:
self.total.green = total
# Cannot synthesize green
# if synthesize is not None:
# self.synthesize.green = synthesize
self.synthesize.green = 0
elif item.is_rarity_blue:
if value is not None:
self.value.blue = value
if total is not None:
self.total.blue = total
if synthesize is not None:
self.synthesize.blue = synthesize
elif item.is_rarity_purple:
if value is not None:
self.value.purple = value
if total is not None:
self.total.purple = total
if synthesize is not None:
self.synthesize.purple = synthesize
else:
raise ScriptError(
f'load_value_total: Trying to load {item} in to {self} but item is in invalid rarity')
else:
# Cannot synthesize if item doesn't have multiple rarity
self.synthesize = 0
if value is not None:
self.value = value
if total is not None:
self.total = total
def add_planner_result(self, row: "StoredPlannerProxy"):
"""
Add data from another StoredPlannerProxy to self
"""
item = row.item
if self.item.has_group_base:
if item.group_base != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
else:
if item != self.item:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but they are different items')
if self.item.has_group_base:
if not self.item.is_rarity_purple:
raise ScriptError(
f'load_value_total: Trying to load {item} into {self} but self is not in rarity purple')
# Add `total` only
# `synthesize` will be updated later
# `value` remains unchanged since you still having that many items
self.total.add(row.total)
else:
self.value += row.value
self.total += row.total
self.synthesize += row.synthesize
def need_farm(self):
return self.progress < 100
def need_synthesize(self):
if self.item.has_group_base:
return self.synthesize.green > 0 or self.synthesize.blue > 0 or self.synthesize.purple > 0
else:
return self.synthesize > 0
def load_planner_result(self, row: PlannerResultRow):
"""
Update data from PlannerResultRow to self
"""
# Approximate value, accurate value can be update in DungeonObtain
value = row.total - row.synthesize - row.demand
self.load_value_total(item=row.item, value=value, total=row.total, synthesize=row.synthesize)
def load_item_amount(self, row: ObtainedAmmount):
"""
Update data from ObtainedAmmount to self
"""
value = row.value
self.load_value_total(item=row.item, value=value)
class PlannerProgressParser:
def __init__(self):
self.rows: dict[str, StoredPlannerProxy] = {}
def from_planner_results(self, results: list[PlannerResultRow]):
self.rows = {}
# Create objects of base items first
# them load value and total
for row in results:
base = row.item.group_base
if base.name not in self.rows:
try:
obj = StoredPlannerProxy(item=base)
except ScriptError as e:
logger.error(e)
continue
self.rows[base.name] = obj
else:
obj = self.rows[base.name]
obj.load_planner_result(row)
rows = {}
for name, row in self.rows.items():
row.revert_synthesize()
row.update()
if row.need_farm() or row.need_synthesize():
rows[name] = row
self.rows = rows
return self
def load_obtained_amount(self, results: list[ObtainedAmmount]):
for row in results:
base = row.item.group_base
try:
obj = self.rows[base.name]
except KeyError:
logger.warning(
f'load_obtained_amount() drops {row} because no need to farm')
continue
obj.load_item_amount(row)
obj.update()
return self
def from_config(self, data):
self.rows = {}
for row in data.values():
if not row:
continue
try:
row = StoredPlannerProxy(**row)
except (ScriptError, ValidationError) as e:
logger.error(e)
continue
if not row.item.is_group_base:
logger.error(f'from_config: item is not group base {row}')
continue
row.update_synthesize()
row.update_progress()
self.rows[row.item.name] = row
return self
def add_planner_result(self, planner: "PlannerProgressParser"):
"""
Add another planner result to self
"""
for name, row in planner.rows.items():
if name in self.rows:
self_row = self.rows[name]
self_row.add_planner_result(row)
else:
self.rows[name] = row
for row in self.rows.values():
row.update()
def to_config(self) -> dict:
data = {}
for row in self.rows.values():
name = f'Item_{row.item.name}'
dic = row.model_dump()
dic['item'] = row.item.name
data[name] = dic
return data
def iter_row_to_farm(self, need_farm=True) -> t.Iterable[StoredPlannerProxy]:
"""
Args:
need_farm: True if filter rows that need farm
Yields:
"""
if need_farm:
rows = [row for row in self.rows.values() if row.need_farm()]
else:
rows = self.rows.values()
for row in rows:
if row.item.is_ItemWeekly:
yield row
for row in rows:
if row.item.is_ItemAscension:
yield row
for row in rows:
if row.item.is_ItemTrace:
yield row
for row in rows:
if row.item.is_ItemExp:
yield row
for row in rows:
if row.item.is_ItemCurrency:
yield row
def get_dungeon(self, double_calyx=False) -> DungeonList | None:
"""
Get dungeon to farm, or None if planner finished or the remaining items cannot be farmed
"""
for row in self.iter_row_to_farm():
item = row.item
if item.is_ItemWeekly:
continue
dungeon = item.dungeon
if dungeon is None:
logger.error(f'Item {item} has nowhere to be farmed')
continue
if double_calyx:
if dungeon.is_Calyx:
logger.info(f'Planner farm (double_calyx): {dungeon}')
return dungeon
else:
logger.info(f'Planner farm: {dungeon}')
return dungeon
logger.info('Planner farm empty')
return None
def get_weekly(self) -> DungeonList | None:
for row in self.iter_row_to_farm():
item = row.item
if not item.is_ItemWeekly:
continue
dungeon = item.dungeon
if dungeon is None:
logger.error(f'Item {item} has nowhere to be farmed')
continue
logger.info(f'Planner weekly farm: {dungeon}')
return dungeon
logger.info('Planner weekly farm empty')
return None
def row_come_from_dungeon(self, dungeon: DungeonList | None) -> StoredPlannerProxy | None:
"""
If any items in planner is able to be farmed from given dungeon
"""
if dungeon is None:
return None
for row in self.iter_row_to_farm(need_farm=False):
if row.item.dungeon == dungeon:
logger.info(f'Planner {row} come from {dungeon}')
return row
return None
class PlannerMixin(UI):
def planner_write_results(self, results: list[PlannerResultRow]):
"""
Write planner detection results info user config
"""
add = self.config.PlannerScan_ResultAdd
logger.attr('ResultAdd', add)
planner = PlannerProgressParser().from_planner_results(results)
if add:
planner.add_planner_result(self.planner)
self.planner_write(planner)
@cached_property
def planner(self) -> PlannerProgressParser:
data = self.config.cross_get('Dungeon.Planner', default={})
model = PlannerProgressParser().from_config(data)
logger.hr('Planner')
for row in model.rows.values():
logger.info(row)
return model
def planner_write(self, planner=None):
"""
Write planner into user config, delete planner object
"""
if planner is None:
planner = self.planner
data = planner.to_config()
with self.config.multi_set():
# Set value
for key, value in data.items():
self.config.cross_set(f'Dungeon.Planner.{key}', value)
# Remove other value
remove = []
for key, value in self.config.cross_get('Dungeon.Planner', default={}).items():
if value != {} and key not in data:
remove.append(key)
for key in remove:
self.config.cross_set(f'Dungeon.Planner.{key}', {})
del_cached_property(self, 'planner')