2024-05-11 18:31:42 +00:00
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
from module.base.timer import Timer
|
|
|
|
|
from module.exception import ScriptError
|
|
|
|
|
from module.logger import logger
|
|
|
|
|
from module.ocr.ocr import Digit
|
2024-05-17 18:35:13 +00:00
|
|
|
|
from tasks.combat.assets.assets_combat_obtain import *
|
2024-05-19 17:49:50 +00:00
|
|
|
|
from tasks.combat.assets.assets_combat_prepare import COMBAT_PREPARE
|
2024-05-11 18:31:42 +00:00
|
|
|
|
from tasks.dungeon.keywords import DungeonList
|
|
|
|
|
from tasks.planner.keywords import ITEM_CLASSES
|
2024-05-17 18:35:13 +00:00
|
|
|
|
from tasks.planner.model import ObtainedAmmount, PlannerMixin
|
2024-05-19 16:21:56 +00:00
|
|
|
|
from tasks.planner.scan import OcrItemName
|
2024-05-11 18:31:42 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OcrItemAmount(Digit):
|
|
|
|
|
def format_result(self, result):
|
|
|
|
|
res = re.split(r'[::;;]', result)
|
|
|
|
|
result = res[-1]
|
|
|
|
|
return super().format_result(result)
|
|
|
|
|
|
|
|
|
|
|
2024-05-17 18:35:13 +00:00
|
|
|
|
class CombatObtain(PlannerMixin):
|
2024-05-11 18:31:42 +00:00
|
|
|
|
"""
|
|
|
|
|
Parse items that can be obtained from dungeon
|
|
|
|
|
|
|
|
|
|
Pages:
|
|
|
|
|
in: COMBAT_PREPARE
|
|
|
|
|
"""
|
2024-05-17 18:35:13 +00:00
|
|
|
|
# False to click again when combat ends
|
|
|
|
|
# True to exit and reenter to get obtained items
|
|
|
|
|
obtain_frequent_check = False
|
2024-05-11 18:31:42 +00:00
|
|
|
|
|
|
|
|
|
def _obtain_enter(self, entry, skip_first_screenshot=True):
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
entry: Item entry
|
|
|
|
|
skip_first_screenshot:
|
|
|
|
|
|
|
|
|
|
Pages:
|
|
|
|
|
in: COMBAT_PREPARE
|
|
|
|
|
out: ITEM_CLOSE
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f'Obtain enter {entry}')
|
|
|
|
|
self.interval_clear(COMBAT_PREPARE)
|
|
|
|
|
while 1:
|
|
|
|
|
if skip_first_screenshot:
|
|
|
|
|
skip_first_screenshot = False
|
|
|
|
|
else:
|
|
|
|
|
self.device.screenshot()
|
|
|
|
|
|
|
|
|
|
if self.appear(ITEM_CLOSE):
|
|
|
|
|
break
|
|
|
|
|
if self.appear(COMBAT_PREPARE, interval=2):
|
|
|
|
|
self.device.click(entry)
|
|
|
|
|
self.interval_reset(COMBAT_PREPARE)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Wait animation
|
|
|
|
|
timeout = Timer(1.4, count=7).start()
|
|
|
|
|
skip_first_screenshot = True
|
|
|
|
|
while 1:
|
|
|
|
|
if skip_first_screenshot:
|
|
|
|
|
skip_first_screenshot = False
|
|
|
|
|
else:
|
|
|
|
|
self.device.screenshot()
|
|
|
|
|
|
|
|
|
|
if self.image_color_count(ITEM_NAME, color=(0, 0, 0), threshold=221, count=50):
|
|
|
|
|
break
|
|
|
|
|
if timeout.reached():
|
|
|
|
|
logger.warning('Wait obtain item timeout')
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
def _obtain_close(self, skip_first_screenshot=True):
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
skip_first_screenshot:
|
|
|
|
|
|
|
|
|
|
Pages:
|
|
|
|
|
in: ITEM_CLOSE
|
|
|
|
|
out: COMBAT_PREPARE
|
|
|
|
|
"""
|
|
|
|
|
logger.info(f'Obtain close')
|
|
|
|
|
self.interval_clear(ITEM_CLOSE)
|
|
|
|
|
while 1:
|
|
|
|
|
if skip_first_screenshot:
|
|
|
|
|
skip_first_screenshot = False
|
|
|
|
|
else:
|
|
|
|
|
self.device.screenshot()
|
|
|
|
|
|
2024-05-19 17:49:50 +00:00
|
|
|
|
if not self.appear(ITEM_CLOSE) and self.appear(COMBAT_PREPARE) and self.appear(MAY_OBTAIN):
|
|
|
|
|
break
|
2024-05-11 18:31:42 +00:00
|
|
|
|
if self.appear_then_click(ITEM_CLOSE, interval=2):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
2024-05-15 18:59:04 +00:00
|
|
|
|
def _obtain_get_entry(dungeon: DungeonList, index: int = 1, prev: ObtainedAmmount = None):
|
2024-05-11 18:31:42 +00:00
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
dungeon: Current dungeon
|
|
|
|
|
index: 1 to 3, index to check
|
|
|
|
|
prev: Previous item checked
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
ButtonWrapper: Item entry, or None if no more check needed
|
|
|
|
|
"""
|
|
|
|
|
if (index > 1 and prev is None) or (index <= 1 and prev is not None):
|
|
|
|
|
raise ScriptError(f'_obtain_get_entry: index and prev must be set together, index={index}, prev={prev}')
|
|
|
|
|
|
|
|
|
|
if index > 3:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def may_obtain_one():
|
|
|
|
|
if prev is None:
|
|
|
|
|
return OBTAIN_1
|
|
|
|
|
else:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def may_obtain_multi():
|
|
|
|
|
if prev is None:
|
|
|
|
|
return OBTAIN_1
|
|
|
|
|
# End at the item with the lowest rarity
|
|
|
|
|
if prev.item.is_rarity_green:
|
|
|
|
|
return None
|
|
|
|
|
if index == 2:
|
|
|
|
|
return OBTAIN_2
|
|
|
|
|
if index == 3:
|
|
|
|
|
return OBTAIN_3
|
|
|
|
|
|
|
|
|
|
if dungeon is None:
|
|
|
|
|
return may_obtain_multi()
|
|
|
|
|
if dungeon.is_Echo_of_War:
|
|
|
|
|
return may_obtain_one()
|
|
|
|
|
if dungeon.is_Cavern_of_Corrosion:
|
|
|
|
|
return None
|
|
|
|
|
if dungeon.is_Stagnant_Shadow:
|
|
|
|
|
return may_obtain_one()
|
|
|
|
|
if dungeon.is_Calyx_Golden:
|
|
|
|
|
if dungeon.is_Calyx_Golden_Treasures:
|
|
|
|
|
return may_obtain_one()
|
|
|
|
|
else:
|
|
|
|
|
return may_obtain_multi()
|
|
|
|
|
if dungeon.is_Calyx_Crimson:
|
|
|
|
|
return may_obtain_multi()
|
|
|
|
|
|
|
|
|
|
raise ScriptError(f'_obtain_get_entry: Cannot get entry from {dungeon}')
|
|
|
|
|
|
2024-05-15 18:59:04 +00:00
|
|
|
|
def _obtain_parse(self) -> ObtainedAmmount | None:
|
2024-05-11 18:31:42 +00:00
|
|
|
|
"""
|
|
|
|
|
Pages:
|
|
|
|
|
in: ITEM_CLOSE
|
|
|
|
|
"""
|
|
|
|
|
ocr = OcrItemName(ITEM_NAME)
|
|
|
|
|
item = ocr.matched_single_line(self.device.image, keyword_classes=ITEM_CLASSES)
|
|
|
|
|
ocr = OcrItemAmount(ITEM_AMOUNT)
|
|
|
|
|
amount = ocr.ocr_single_line(self.device.image)
|
|
|
|
|
|
|
|
|
|
if item is None:
|
|
|
|
|
logger.warning('_obtain_parse: Unknown item name')
|
|
|
|
|
return None
|
|
|
|
|
|
2024-05-15 18:59:04 +00:00
|
|
|
|
# logger.info(f'ObtainedAmmount: item={item}, value={amount}')
|
|
|
|
|
return ObtainedAmmount(
|
2024-05-11 18:31:42 +00:00
|
|
|
|
item=item,
|
2024-05-15 18:59:04 +00:00
|
|
|
|
value=amount,
|
2024-05-11 18:31:42 +00:00
|
|
|
|
)
|
|
|
|
|
|
2024-05-15 18:59:04 +00:00
|
|
|
|
def obtain_get(self, dungeon=None, skip_first_screenshot=True) -> list[ObtainedAmmount]:
|
2024-05-11 18:31:42 +00:00
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
dungeon: Current dungeon,
|
|
|
|
|
or None for no early stop optimization
|
|
|
|
|
skip_first_screenshot:
|
|
|
|
|
|
|
|
|
|
Returns:
|
2024-05-15 18:59:04 +00:00
|
|
|
|
list[ObtainedAmmount]:
|
2024-05-11 18:31:42 +00:00
|
|
|
|
|
|
|
|
|
Pages:
|
|
|
|
|
in: COMBAT_PREPARE
|
|
|
|
|
out: COMBAT_PREPARE
|
|
|
|
|
"""
|
|
|
|
|
logger.hr('Obtain get', level=2)
|
|
|
|
|
if not skip_first_screenshot:
|
|
|
|
|
self.device.screenshot()
|
|
|
|
|
|
|
|
|
|
index = 1
|
|
|
|
|
prev = None
|
|
|
|
|
items = []
|
2024-05-19 17:49:50 +00:00
|
|
|
|
|
|
|
|
|
self._find_may_obtain()
|
|
|
|
|
|
2024-05-11 18:31:42 +00:00
|
|
|
|
for _ in range(5):
|
|
|
|
|
entry = self._obtain_get_entry(dungeon, index=index, prev=prev)
|
|
|
|
|
if entry is None:
|
|
|
|
|
logger.info('Obtain get end')
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
self._obtain_enter(entry)
|
|
|
|
|
item = self._obtain_parse()
|
|
|
|
|
if item is not None:
|
|
|
|
|
items.append(item)
|
|
|
|
|
index += 1
|
|
|
|
|
prev = item
|
|
|
|
|
self._obtain_close()
|
|
|
|
|
|
2024-05-15 18:59:04 +00:00
|
|
|
|
logger.hr('Obtained Result')
|
2024-05-11 18:31:42 +00:00
|
|
|
|
for item in items:
|
2024-05-17 18:35:13 +00:00
|
|
|
|
# Pretend everything is full
|
|
|
|
|
# item.value += 1000
|
2024-05-15 18:59:04 +00:00
|
|
|
|
logger.info(f'Obtained item: {item.item.name}, {item.value}')
|
2024-05-11 18:31:42 +00:00
|
|
|
|
"""
|
|
|
|
|
<<< OBTAIN GET RESULT >>>
|
|
|
|
|
ItemAmount: Arrow_of_the_Starchaser, 15
|
|
|
|
|
ItemAmount: Arrow_of_the_Demon_Slayer, 68
|
|
|
|
|
ItemAmount: Arrow_of_the_Beast_Hunter, 85
|
|
|
|
|
"""
|
2024-05-15 18:59:04 +00:00
|
|
|
|
self.planner.load_obtained_amount(items)
|
|
|
|
|
self.planner_write()
|
2024-05-11 18:31:42 +00:00
|
|
|
|
return items
|
|
|
|
|
|
2024-05-17 18:35:13 +00:00
|
|
|
|
def obtained_is_full(self, dungeon: DungeonList | None) -> bool:
|
|
|
|
|
if dungeon is None:
|
|
|
|
|
self.obtain_frequent_check = False
|
|
|
|
|
return False
|
|
|
|
|
row = self.planner.row_come_from_dungeon(dungeon)
|
|
|
|
|
if row is None:
|
|
|
|
|
self.obtain_frequent_check = False
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Update
|
|
|
|
|
self.obtain_get(dungeon)
|
|
|
|
|
|
|
|
|
|
# Check progress
|
|
|
|
|
row = self.planner.row_come_from_dungeon(dungeon)
|
|
|
|
|
if row is None:
|
|
|
|
|
logger.error(f'obtained_is_full: Row disappeared after obtain_get')
|
|
|
|
|
self.obtain_frequent_check = False
|
|
|
|
|
return False
|
|
|
|
|
if not row.need_farm():
|
|
|
|
|
logger.info('Planner row full')
|
|
|
|
|
self.obtain_frequent_check = False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# obtain_frequent_check
|
2024-05-20 20:44:42 +00:00
|
|
|
|
# approaching = row.is_approaching_total()
|
|
|
|
|
# logger.attr('is_approaching_total', approaching)
|
|
|
|
|
# self.obtain_frequent_check = approaching
|
2024-05-17 18:35:13 +00:00
|
|
|
|
return False
|
|
|
|
|
|
2024-05-19 17:49:50 +00:00
|
|
|
|
def _find_may_obtain(self, skip_first_screenshot=True):
|
|
|
|
|
logger.info('Find may obtain')
|
|
|
|
|
while 1:
|
|
|
|
|
if skip_first_screenshot:
|
|
|
|
|
skip_first_screenshot = False
|
|
|
|
|
else:
|
|
|
|
|
self.device.screenshot()
|
2024-05-21 04:35:45 +00:00
|
|
|
|
|
2024-05-19 17:49:50 +00:00
|
|
|
|
if MAY_OBTAIN.match_template(self.device.image):
|
|
|
|
|
OBTAIN_1.load_offset(MAY_OBTAIN)
|
2024-05-21 04:35:45 +00:00
|
|
|
|
OBTAIN_2.load_offset(MAY_OBTAIN)
|
|
|
|
|
OBTAIN_3.load_offset(MAY_OBTAIN)
|
2024-05-19 17:49:50 +00:00
|
|
|
|
return True
|
|
|
|
|
|
2024-05-11 18:31:42 +00:00
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2024-05-17 18:35:13 +00:00
|
|
|
|
self = CombatObtain('src')
|
2024-05-11 18:31:42 +00:00
|
|
|
|
self.device.screenshot()
|
|
|
|
|
self.obtain_get()
|