StarRailCopilot/tasks/combat/obtain.py

315 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from module.base.timer import Timer
from module.exception import ScriptError
from module.logger import logger
from module.ocr.ocr import Digit
from tasks.combat.assets.assets_combat_obtain import *
from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE
from tasks.dungeon.keywords import DungeonList
from tasks.planner.keywords import ITEM_CLASSES, KEYWORDS_ITEM_CURRENCY
from tasks.planner.model import ObtainedAmmount, PlannerMixin
from tasks.planner.scan import OcrItemName
class OcrItemAmount(Digit):
def format_result(self, result):
res = re.split(r'[:;]', result)
result = res[-1]
return super().format_result(result)
class CombatObtain(PlannerMixin):
"""
Parse items that can be obtained from dungeon
Pages:
in: COMBAT_PREPARE
"""
# False to click again when combat ends
# True to exit and reenter to get obtained items
obtain_frequent_check = False
def _obtain_enter(self, entry, appear_button, skip_first_screenshot=True):
"""
Args:
entry: Item entry
appear_button:
skip_first_screenshot:
Pages:
in: COMBAT_PREPARE
out: ITEM_CLOSE
"""
logger.info(f'Obtain enter {entry}')
self.interval_clear(appear_button)
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
if self.appear(ITEM_CLOSE):
break
if self.appear(appear_button, interval=2):
self.device.click(entry)
self.interval_reset(appear_button)
continue
# Wait animation
timeout = Timer(1.4, count=7).start()
skip_first_screenshot = True
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
if self.image_color_count(ITEM_NAME, color=(0, 0, 0), threshold=221, count=50):
break
if timeout.reached():
logger.warning('Wait obtain item timeout')
break
def _obtain_close(self, check_button, skip_first_screenshot=True):
"""
Args:
check_button:
skip_first_screenshot:
Pages:
in: ITEM_CLOSE
out: COMBAT_PREPARE
"""
logger.info(f'Obtain close')
self.interval_clear(ITEM_CLOSE)
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
if not self.appear(ITEM_CLOSE):
if callable(check_button):
if check_button():
break
else:
if self.appear(check_button):
break
if self.appear_then_click(ITEM_CLOSE, interval=2):
continue
@staticmethod
def _obtain_get_entry(dungeon: DungeonList, index: int = 1, prev: ObtainedAmmount = None, start: int = 0):
"""
Args:
dungeon: Current dungeon
index: 1 to 3, index to check
prev: Previous item checked
Returns:
int: Item entry index, or None if no more check needed
"""
if (index > 1 and prev is None) or (index <= 1 and prev is not None):
raise ScriptError(f'_obtain_get_entry: index and prev must be set together, index={index}, prev={prev}')
if index > 3:
return None
def may_obtain_one():
if prev is None:
if start:
return 1 + start
else:
return 1
else:
return None
def may_obtain_multi():
if prev is None:
if start:
return 1 + start
else:
return 1
# End at the item with the lowest rarity
if prev.item.is_rarity_green:
return None
# End at credict
if prev.item == KEYWORDS_ITEM_CURRENCY.Credit:
return None
if start:
return index + start
else:
return index
if dungeon is None:
return may_obtain_multi()
if dungeon.is_Echo_of_War:
return may_obtain_one()
if dungeon.is_Cavern_of_Corrosion:
return None
if dungeon.is_Stagnant_Shadow:
return may_obtain_one()
if dungeon.is_Calyx_Golden:
if dungeon.is_Calyx_Golden_Treasures:
return may_obtain_one()
else:
return may_obtain_multi()
if dungeon.is_Calyx_Crimson:
return may_obtain_multi()
raise ScriptError(f'_obtain_get_entry: Cannot get entry from {dungeon}')
def _obtain_parse(self) -> ObtainedAmmount | None:
"""
Pages:
in: ITEM_CLOSE
"""
ocr = OcrItemName(ITEM_NAME)
item = ocr.matched_single_line(self.device.image, keyword_classes=ITEM_CLASSES)
ocr = OcrItemAmount(ITEM_AMOUNT)
amount = ocr.ocr_single_line(self.device.image)
if item is None:
logger.warning('_obtain_parse: Unknown item name')
return None
# logger.info(f'ObtainedAmmount: item={item}, value={amount}')
return ObtainedAmmount(
item=item,
value=amount,
)
def obtain_get(self, dungeon=None, skip_first_screenshot=True) -> list[ObtainedAmmount]:
"""
Args:
dungeon: Current dungeon,
or None for no early stop optimization
skip_first_screenshot:
Returns:
list[ObtainedAmmount]:
Pages:
in: COMBAT_PREPARE
out: COMBAT_PREPARE
"""
logger.hr('Obtain get', level=2)
if not skip_first_screenshot:
self.device.screenshot()
index = 1
prev = None
items = []
dic_entry = {
1: OBTAIN_1,
2: OBTAIN_2,
3: OBTAIN_3,
4: OBTAIN_4,
}
self._find_may_obtain()
trailblaze_exp = False
for _ in range(5):
if not trailblaze_exp and self.appear(OBTAIN_TRAILBLAZE_EXP):
trailblaze_exp = True
logger.attr('trailblaze_exp', trailblaze_exp)
entry_index = self._obtain_get_entry(dungeon, index=index, prev=prev, start=int(trailblaze_exp))
if entry_index is None:
logger.info('Obtain get end')
break
try:
entry = dic_entry[entry_index]
except KeyError:
logger.error(f'No obtain entry for {entry_index}')
break
self._obtain_enter(entry, appear_button=COMBAT_PREPARE)
item = self._obtain_parse()
if item is not None:
if item.item == KEYWORDS_ITEM_CURRENCY.Trailblaze_EXP:
logger.warning('Trailblaze_EXP is in obtain list, OBTAIN_TRAILBLAZE_EXP may need to verify')
index += 1
prev = item
else:
items.append(item)
index += 1
prev = item
else:
index += 1
self._obtain_close(check_button=MAY_OBTAIN)
logger.hr('Obtained Result')
for item in items:
# Pretend everything is full
# item.value += 1000
logger.info(f'Obtained item: {item.item.name}, {item.value}')
"""
<<< OBTAIN GET RESULT >>>
ItemAmount: Arrow_of_the_Starchaser, 15
ItemAmount: Arrow_of_the_Demon_Slayer, 68
ItemAmount: Arrow_of_the_Beast_Hunter, 85
"""
self.planner.load_obtained_amount(items)
with self.config.multi_set():
self.planner_write()
# Sync to dashboard
for item in items:
if item.item.name == 'Credit':
self.config.stored.Credit.value = item.value
return items
def obtained_is_full(self, dungeon: DungeonList | None, wave_done=0, obtain_get=True) -> bool:
if dungeon is None:
self.obtain_frequent_check = False
return False
row = self.planner.row_come_from_dungeon(dungeon)
if row is None:
self.obtain_frequent_check = False
return False
# Update
if obtain_get:
self.obtain_get(dungeon)
# Check progress
row = self.planner.row_come_from_dungeon(dungeon)
if row is None:
logger.error(f'obtained_is_full: Row disappeared after obtain_get')
self.obtain_frequent_check = False
return False
if not row.need_farm():
logger.info('Planner row full')
self.obtain_frequent_check = False
return True
# obtain_frequent_check
approaching = row.is_approaching_total(wave_done)
logger.attr('is_approaching_total', approaching)
self.obtain_frequent_check = approaching
return False
def _find_may_obtain(self, skip_first_screenshot=True):
logger.info('Find may obtain')
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.device.screenshot()
if MAY_OBTAIN.match_template(self.device.image):
OBTAIN_1.load_offset(MAY_OBTAIN)
OBTAIN_2.load_offset(MAY_OBTAIN)
OBTAIN_3.load_offset(MAY_OBTAIN)
OBTAIN_4.load_offset(MAY_OBTAIN)
return True
if __name__ == '__main__':
self = CombatObtain('src')
self.device.screenshot()
self.obtain_get()