Merge pull request #505 from LmeSzinc/dev

Bug fix
This commit is contained in:
LmeSzinc 2024-06-11 21:39:55 +08:00 committed by GitHub
commit ae42d1be8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 489 additions and 18 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.1 KiB

View File

@ -121,7 +121,10 @@ class DataProcessInfo:
@cached_property
def name(self):
name = self.proc.name()
try:
name = self.proc.name()
except:
name = ''
return name
@cached_property
@ -130,6 +133,7 @@ class DataProcessInfo:
cmdline = self.proc.cmdline()
except:
# psutil.AccessDenied
# # NoSuchProcess: process no longer exists (pid=xxx)
cmdline = []
cmdline = ' '.join(cmdline).replace(r'\\', '/').replace('\\', '/')
return cmdline

View File

@ -88,14 +88,14 @@ class AzurLaneAutoScript:
logger.error(e)
self.save_error_log()
logger.warning(f'Game stuck, {self.device.package} will be restarted in 10 seconds')
logger.warning('If you are playing by hand, please stop Alas')
logger.warning('If you are playing by hand, please stop Src')
self.config.task_call('Restart')
self.device.sleep(10)
return False
except GameBugError as e:
logger.warning(e)
self.save_error_log()
logger.warning('An error has occurred in Azur Lane game client, Alas is unable to handle')
logger.warning('An error has occurred in Star Rail game client, Src is unable to handle')
logger.warning(f'Restarting {self.device.package} to fix it')
self.config.task_call('Restart')
self.device.sleep(10)
@ -108,7 +108,7 @@ class AzurLaneAutoScript:
self.save_error_log()
handle_notify(
self.config.Error_OnePushConfig,
title=f"Alas <{self.config_name}> crashed",
title=f"Src <{self.config_name}> crashed",
content=f"<{self.config_name}> GamePageUnknownError",
)
exit(1)
@ -123,7 +123,7 @@ class AzurLaneAutoScript:
logger.critical('This is likely to be a mistake of developers, but sometimes just random issues')
handle_notify(
self.config.Error_OnePushConfig,
title=f"Alas <{self.config_name}> crashed",
title=f"Src <{self.config_name}> crashed",
content=f"<{self.config_name}> ScriptError",
)
exit(1)
@ -131,7 +131,7 @@ class AzurLaneAutoScript:
logger.critical('Request human takeover')
handle_notify(
self.config.Error_OnePushConfig,
title=f"Alas <{self.config_name}> crashed",
title=f"Src <{self.config_name}> crashed",
content=f"<{self.config_name}> RequestHumanTakeover",
)
exit(1)
@ -140,7 +140,7 @@ class AzurLaneAutoScript:
self.save_error_log()
handle_notify(
self.config.Error_OnePushConfig,
title=f"Alas <{self.config_name}> crashed",
title=f"Src <{self.config_name}> crashed",
content=f"<{self.config_name}> Exception occured",
)
exit(1)
@ -242,7 +242,7 @@ class AzurLaneAutoScript:
if self.stop_event is not None:
if self.stop_event.is_set():
logger.info("Update event detected")
logger.info(f"Alas [{self.config_name}] exited.")
logger.info(f"[{self.config_name}] exited.")
break
# Check game server maintenance
self.checker.wait_until_available()
@ -287,7 +287,7 @@ class AzurLaneAutoScript:
logger.critical('Request human takeover')
handle_notify(
self.config.Error_OnePushConfig,
title=f"Alas <{self.config_name}> crashed",
title=f"Src <{self.config_name}> crashed",
content=f"<{self.config_name}> RequestHumanTakeover\nTask `{task}` failed 3 or more times.",
)
exit(1)

View File

@ -695,6 +695,7 @@
"Tingyun",
"TopazNumby",
"TrailblazerDestruction",
"TrailblazerHarmony",
"TrailblazerPreservation",
"Welt",
"Xueyi",
@ -1613,6 +1614,7 @@
"Tingyun",
"TopazNumby",
"TrailblazerDestruction",
"TrailblazerHarmony",
"TrailblazerPreservation",
"Welt",
"Xueyi",

View File

@ -97,7 +97,7 @@ class ConfigGenerator:
options=[dungeon.name for dungeon in DungeonList.instances.values() if dungeon.is_Echo_of_War])
# Insert characters
from tasks.character.keywords import CharacterList
unsupported_characters = ["TrailblazerHarmony"]
unsupported_characters = []
characters = [character.name for character in CharacterList.instances.values()
if character.name not in unsupported_characters]
option_add(keys='DungeonSupport.Character.option', options=characters)

View File

@ -416,6 +416,7 @@
"Tingyun": "Tingyun",
"TopazNumby": "Topaz & Numby",
"TrailblazerDestruction": "Trailblazer Destruction",
"TrailblazerHarmony": "Trailblazer Harmony",
"TrailblazerPreservation": "Trailblazer Preservation",
"Welt": "Welt",
"Xueyi": "Xueyi",

View File

@ -416,6 +416,7 @@
"Tingyun": "Tingyun",
"TopazNumby": "Topaz y Conti",
"TrailblazerDestruction": "Trailblazer de Destrucción",
"TrailblazerHarmony": "Trailblazer de Armonía",
"TrailblazerPreservation": "Trailblazer de Conservación",
"Welt": "Welt",
"Xueyi": "Xueyi",

View File

@ -416,6 +416,7 @@
"Tingyun": "停雲",
"TopazNumby": "トパーズ&カブ",
"TrailblazerDestruction": "開拓者・壊滅",
"TrailblazerHarmony": "開拓者・調和",
"TrailblazerPreservation": "開拓者・存護",
"Welt": "ヴェルト",
"Xueyi": "雪衣",

View File

@ -416,6 +416,7 @@
"Tingyun": "停云",
"TopazNumby": "托帕&账账",
"TrailblazerDestruction": "开拓者•毁灭",
"TrailblazerHarmony": "开拓者•同谐",
"TrailblazerPreservation": "开拓者•存护",
"Welt": "瓦尔特",
"Xueyi": "雪衣",

View File

@ -416,6 +416,7 @@
"Tingyun": "停雲",
"TopazNumby": "托帕&帳帳",
"TrailblazerDestruction": "開拓者•毀滅",
"TrailblazerHarmony": "開拓者•同諧",
"TrailblazerPreservation": "開拓者•存護",
"Welt": "瓦爾特",
"Xueyi": "雪衣",

View File

@ -471,8 +471,9 @@ class EmulatorManager(EmulatorManagerBase):
try:
exe = proc.cmdline()
exe = exe[0].replace(r'\\', '/').replace('\\', '/')
except (psutil.AccessDenied, IndexError):
except (psutil.AccessDenied, psutil.NoSuchProcess, IndexError):
# psutil.AccessDenied
# NoSuchProcess: process no longer exists (pid=xxx)
continue
if Emulator.is_emulator(exe):

View File

@ -38,14 +38,14 @@ MAY_OBTAIN = ButtonWrapper(
cn=Button(
file='./assets/cn/combat/obtain/MAY_OBTAIN.png',
area=(813, 379, 893, 397),
search=(812, 373, 895, 468),
search=(812, 330, 895, 468),
color=(63, 71, 87),
button=(813, 379, 893, 397),
),
en=Button(
file='./assets/en/combat/obtain/MAY_OBTAIN.png',
area=(813, 379, 922, 397),
search=(813, 373, 923, 468),
search=(812, 330, 922, 468),
color=(53, 61, 78),
button=(813, 379, 922, 397),
),

View File

@ -53,6 +53,16 @@ SYNTHESIZE_AMOUNT = ButtonWrapper(
button=(683, 548, 1034, 568),
),
)
SYNTHESIZE_INVENTORY = ButtonWrapper(
name='SYNTHESIZE_INVENTORY',
share=Button(
file='./assets/share/item/synthesize/SYNTHESIZE_INVENTORY.png',
area=(116, 71, 453, 634),
search=(96, 51, 473, 654),
color=(255, 255, 255),
button=(116, 71, 453, 634),
),
)
SYNTHESIZE_MINUS = ButtonWrapper(
name='SYNTHESIZE_MINUS',
share=Button(

320
tasks/item/inventory.py Normal file
View File

@ -0,0 +1,320 @@
import cv2
import numpy as np
from scipy import signal
from module.base.base import ModuleBase
from module.base.button import ButtonWrapper
from module.base.decorator import cached_property, del_cached_property
from module.base.timer import Timer
from module.base.utils import Lines, area_center, area_offset, color_similarity_2d
from module.exception import ScriptError
from module.logger import logger
def xywh2xyxy(area):
"""
Convert (x, y, width, height) to (x1, y1, x2, y2)
"""
x, y, w, h = area
return x, y, x + w, y + h
def xyxy2xywh(area):
"""
Convert (x1, y1, x2, y2) to (x, y, width, height)
"""
x1, y1, x2, y2 = area
return min(x1, x2), min(y1, y2), abs(x2 - x1), abs(y2 - y1)
class InventoryItem:
def __init__(self, main: ModuleBase, loca: tuple[int, int], point: tuple[int, int]):
self.main = main
self.loca = loca
self.point = point
def __str__(self):
return f'Item({self.loca})'
__repr__ = __str__
def crop(self, area, copy=False):
area = area_offset(area, offset=self.point)
return self.main.image_crop(area, copy=copy)
@cached_property
def button(self):
area = area_offset((-40, -20, 40, 20), offset=self.point)
return area
@cached_property
def is_selected(self):
image = self.crop((-60, -100, 60, 40))
image = color_similarity_2d(image, (255, 255, 255))
param = {
'height': 160,
}
hori = cv2.reduce(image, 1, cv2.REDUCE_AVG).flatten()
peaks, _ = signal.find_peaks(hori, **param)
if len(peaks) != 2:
return False
vert = cv2.reduce(image, 0, cv2.REDUCE_AVG).flatten()
peaks, _ = signal.find_peaks(vert, **param)
if len(peaks) != 2:
return False
return True
class InventoryManager:
GRID_DELTA = (104, 124)
ERROR_LINES_TOLERANCE = (-10, 10)
COINCIDENT_POINT_ENCOURAGE_DISTANCE = 1.
def __init__(self, main: ModuleBase, inventory: ButtonWrapper):
"""
max_count: expected max count of this inventory page
"""
self.main = main
self.inventory = inventory
self.items: dict[tuple[int, int], InventoryItem] = {}
self.selected: InventoryItem | None = None
def mid_cleanse(self, mids, mid_diff_range, edge_range):
"""
Args:
mids:
mid_diff_range:
edge_range:
Returns:
"""
count = len(mids)
if count == 1:
return mids
elif count == 2:
return mids
# print(mids)
encourage = self.COINCIDENT_POINT_ENCOURAGE_DISTANCE ** 2
# Drawing lines
def iter_lines():
for index, mid in enumerate(mids):
for n in range(self.ERROR_LINES_TOLERANCE[0], self.ERROR_LINES_TOLERANCE[1] + 1):
theta = np.arctan(index + n)
rho = mid * np.cos(theta)
yield [rho, theta]
def coincident_point_value(point):
"""Value that measures how close a point to the coincident point. The smaller the better.
Coincident point may be many.
Use an activation function to encourage a group of coincident lines and ignore wrong lines.
"""
x, y = point
# Do not use:
# distance = coincident.distance_to_point(point)
distance = np.abs(x - coincident.get_x(y))
# print((distance * 1).astype(int).reshape(len(mids), np.diff(self.config.ERROR_LINES_TOLERANCE)[0]+1))
# Activation function
# distance = 1 / (1 + np.exp(16 / distance - distance))
distance = 1 / (1 + np.exp(encourage / distance) / distance)
distance = np.sum(distance)
return distance
# Fitting mid
coincident = Lines(np.vstack(list(iter_lines())), is_horizontal=False)
coincident_point_range = (
(
-abs(self.ERROR_LINES_TOLERANCE[0]) * mid_diff_range[1] + edge_range[0],
abs(self.ERROR_LINES_TOLERANCE[1]) * mid_diff_range[1] + edge_range[1]
),
mid_diff_range
)
from scipy import optimize
coincident_point = optimize.brute(coincident_point_value, coincident_point_range)
# print(coincident_point)
# Filling mid
left, right = edge_range
mids = np.arange(-25, 25) * coincident_point[1] + coincident_point[0]
mids = mids[(mids > left) & (mids < right)]
# print(mids)
return mids
def update(self):
image = self.main.image_crop(self.inventory, copy=False)
image = color_similarity_2d(image, color=(252, 200, 109))
# Search rarity stars
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel, dst=image)
# image_star = cv2.inRange(image, 221, 255)
# Close rarity stars as item
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (25, 3))
cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel, dst=image)
image_item = cv2.inRange(image, 221, 255)
# from PIL import Image
# Image.fromarray(image_star).show()
def iter_area(im):
# Iter matched area from given image
contours, _ = cv2.findContours(im, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
for cont in contours:
rect = cv2.boundingRect(cv2.convexHull(cont).astype(np.float32))
# width < 5stars and height < 1star
if not (65 > rect[2] >= 5 and 10 > rect[3]):
continue
rect = xywh2xyxy(rect)
rect = area_center(rect)
yield rect
area_item = list(iter_area(image_item))
# Re-generate a correct xy array
points = np.array(area_item)
points += self.inventory.area[:2]
area = self.inventory.area
x_list = np.unique(np.sort(points[:, 0]))
y_list = np.unique(np.sort(points[:, 1]))
x_list = self.mid_cleanse(
x_list,
mid_diff_range=(self.GRID_DELTA[0] - 3, self.GRID_DELTA[0] + 3),
edge_range=(area[0], area[2])
)
y_list = self.mid_cleanse(
y_list,
mid_diff_range=(self.GRID_DELTA[1] - 3, self.GRID_DELTA[1] + 3),
edge_range=(area[1], area[3])
)
def is_near_existing(p):
diff = np.linalg.norm(points - p, axis=1)
return np.any(diff < 3)
def iter_items():
y_max = -1
for y in y_list:
for x in x_list:
if is_near_existing((x, y)):
y_max = y
break
for yi, y in enumerate(y_list):
if y < y_max:
# Fill items
for xi, x in enumerate(x_list):
yield InventoryItem(main=self.main, loca=(xi, yi), point=(int(x), int(y)))
elif y == y_max:
# Fill until the last item
for xi, x in enumerate(x_list):
if is_near_existing((x, y)):
yield InventoryItem(main=self.main, loca=(xi, yi), point=(int(x), int(y)))
else:
break
# Re-generate items
self.items = {}
selected = []
for item in iter_items():
self.items[item.loca] = item
if item.is_selected:
selected.append(item)
# Check selected
self.selected = None
count = len(selected)
if count == 0:
# logger.warning('Inventory has no item selected')
pass
elif count > 1:
logger.warning(f'Inventory has multiple items selected: {selected}')
self.selected = selected[0]
else:
self.selected = selected[0]
logger.info(f'Inventory: {len(self.items)} items, selected {self.selected}')
def get_row_first(self, row=1, first=0) -> InventoryItem | None:
"""
Get the first item of the next row
Args:
row: 1 for next row, -1 for prev row
first: 0 for the first_item
"""
if self.selected == None:
return None
loca = self.selected.loca
loca = (first, loca[1] + row)
try:
return self.items[loca]
except KeyError:
return None
def get_right(self) -> InventoryItem | None:
"""
Get the right item of the selected
"""
if self.selected == None:
return None
loca = self.selected.loca
loca = (loca[0] + 1, loca[1])
try:
return self.items[loca]
except KeyError:
return None
def get_first(self) -> InventoryItem | None:
"""
Get the first item of inventory
"""
try:
return self.items[(0, 0)]
except KeyError:
return None
def select(self, item, skip_first_screenshot=True):
logger.info(f'Inventory select {item}')
if isinstance(item, InventoryItem):
pass
else:
try:
item = self.items[item]
except KeyError:
raise ScriptError(f'Inventory select {item} but is not in current items')
interval = Timer(2, count=6)
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.main.device.screenshot()
# End
del_cached_property(item, 'is_selected')
if item.is_selected:
logger.info('Inventory item selected')
break
# Click
if interval.reached():
self.main.device.click(item)
interval.reset()
self.update()
def wait_selected(self, skip_first_screenshot=True):
timeout = Timer(2, count=6).start()
while 1:
if skip_first_screenshot:
skip_first_screenshot = False
else:
self.main.device.screenshot()
self.update()
if self.selected is not None:
break
if timeout.reached():
logger.warning('Wait inventory selected timeout')
break

View File

@ -1,14 +1,18 @@
import cv2
import numpy as np
from module.base.decorator import cached_property
from module.base.timer import Timer
from module.base.utils import color_similarity_2d, crop, image_size
from module.base.utils import SelectedGrids, color_similarity_2d, crop, image_size
from module.exception import ScriptError
from module.logger import logger
from module.ocr.ocr import Ocr
from tasks.base.page import page_synthesize
from tasks.combat.obtain import CombatObtain
from tasks.item.assets.assets_item_synthesize import *
from tasks.planner.keywords import ITEM_CLASSES
from tasks.item.inventory import InventoryManager
from tasks.planner.keywords import ITEM_CLASSES, ItemCalyx, ItemTrace
from tasks.planner.keywords.classes import ItemBase
from tasks.planner.model import ObtainedAmmount
from tasks.planner.scan import OcrItemName
@ -44,6 +48,44 @@ class SynthesizeItemName(OcrItemName, WhiteStrip):
pass
class SynthesizeInventoryManager(InventoryManager):
@cached_property
def dic_item_index(self):
"""
Index of items in synthesize inventory.
Basically ItemTrace then ItemCalyx, but high rarity items first
Returns:
dict: Key: item name, Value: index starting from 1
"""
data = {}
index = 0
items = SelectedGrids(ItemBase.instances.values()).select(is_ItemTrace=True)
items.create_index('item_group')
for item_group in items.indexes.values():
for item in item_group[::-1]:
index += 1
data[item.name] = index
items = SelectedGrids(ItemBase.instances.values()).select(is_ItemCalyx=True)
items.create_index('item_group')
for item_group in items.indexes.values():
for item in item_group[::-1]:
index += 1
data[item.name] = index
return data
def is_item_below(self, item1: ItemBase, item2: ItemBase) -> bool:
"""
If item2 is on the right or below item1
"""
try:
id1 = self.dic_item_index[item1.name]
id2 = self.dic_item_index[item2.name]
except KeyError:
raise ScriptError(f'is_item_below: {item1} {item2} not in SynthesizeInventoryManager')
return id2 > id1
class Synthesize(CombatObtain):
def item_get_rarity(self, button) -> str | None:
"""
@ -135,7 +177,7 @@ class Synthesize(CombatObtain):
Returns:
bool: If success
"""
logger.hr('synthesize rarity reset')
current = self.item_get_rarity_retry(ENTRY_ITEM_FROM)
if current == 'blue':
r1, r2 = 'green', 'blue'
@ -195,7 +237,7 @@ class Synthesize(CombatObtain):
self.planner_write()
return items
def synthesize_get_item(self):
def synthesize_get_item(self) -> ItemBase | None:
ocr = SynthesizeItemName(ITEM_NAME)
item = ocr.matched_single_line(self.device.image, keyword_classes=ITEM_CLASSES)
if item is None:
@ -204,6 +246,93 @@ class Synthesize(CombatObtain):
return item
@cached_property
def synthesize_inventory(self):
return SynthesizeInventoryManager(main=self, inventory=SYNTHESIZE_INVENTORY)
def synthesize_inventory_select(self, item: ItemTrace | ItemCalyx | str):
"""
Select item from inventory list.
Inventory list must be at top be fore selecting.
This method is kind of a magic to lower maintenance cost
by doing OCR on item names instead of matching item templates.
- Iter first item of each row
- If current item index > target item index, switch back to prev row and iter prev row
- If item matches or item group matches, stop
"""
logger.hr('Synthesize select', level=1)
logger.info(f'Synthesize select {item}')
if isinstance(item, str):
item = ItemBase.find(item)
if not isinstance(item, (ItemTrace, ItemCalyx)):
raise ScriptError(f'synthesize_inventory_select: '
f'Trying to select item {item} but it is not an ItemTrace or ItemCalyx object')
inv = self.synthesize_inventory
inv.update()
first = inv.get_first()
if first.is_selected:
logger.info('first item selected')
else:
inv.select(first)
logger.hr('Synthesize select view', level=2)
switch_row = True
while 1:
# End
current = self.synthesize_get_item()
if current == item:
logger.info('Selected at target item')
return True
if current.item_group == item.item_group:
logger.info('Selected at target item group')
return True
# Switch rows
if switch_row and inv.is_item_below(current, item):
# Reached end, reset rarity to set current item at top
next_row = inv.get_row_first(row=1)
if next_row is None:
if inv.selected.loca[1] >= 3:
logger.info('Reached inventory view end, reset view')
self.synthesize_rarity_reset()
inv.wait_selected()
logger.hr('Synthesize select view', level=2)
continue
else:
logger.info('Reached inventory list end, no more rows')
switch_row = False
logger.hr('Synthesize select row', level=2)
else:
logger.info('Item below current, select next row')
inv.select(next_row)
continue
# Over switched, target item is at prev row
elif switch_row:
logger.info('Item above current, stop switch_row')
switch_row = False
logger.hr('Synthesize select row', level=2)
prev2 = inv.get_row_first(row=-1, first=1)
if prev2 is None:
logger.error(f'Current selected item {inv.selected} has not prev2')
else:
inv.select(prev2)
continue
# switch_row = False
if not switch_row:
right = inv.get_right()
if right is None:
logger.error('No target item, inventory ended')
return False
else:
inv.select(right)
continue
else:
logger.error(f'Unexpected switch_row={switch_row} during loop')
return False
if __name__ == '__main__':
self = Synthesize('src')

View File

@ -377,7 +377,7 @@ class RogueEntry(RouteBase, RogueRewardHandler, RoguePathHandler, DungeonUI):
'Reached weekly point limit but still continue to farm materials')
logger.attr(
"Farming Counter", self.config.stored.SimulatedUniverseFarm.to_counter())
if self.config.is_cloud_game and not self.config.stored.CloudRemainSeasonPass:
if self.config.is_cloud_game and not self.config.stored.CloudRemainSeasonPass.value:
logger.warning('Running WeeklyFarming on cloud game without season pass may cause fee, skip')
raise RogueReachedWeeklyPointLimit
else: