2023-12-09 13:31:39 +00:00
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
2023-07-02 07:30:47 +00:00
|
|
|
import module.config.server as server_
|
2023-05-30 18:20:45 +00:00
|
|
|
from module.base.button import Button, ButtonWrapper, ClickButton, match_template
|
2023-05-14 07:48:34 +00:00
|
|
|
from module.base.timer import Timer
|
|
|
|
from module.base.utils import *
|
|
|
|
from module.config.config import AzurLaneConfig
|
|
|
|
from module.device.device import Device
|
|
|
|
from module.logger import logger
|
2023-12-09 13:31:39 +00:00
|
|
|
from module.webui.setting import cached_class_property
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ModuleBase:
|
|
|
|
config: AzurLaneConfig
|
|
|
|
device: Device
|
|
|
|
|
|
|
|
def __init__(self, config, device=None, task=None):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
config (AzurLaneConfig, str):
|
|
|
|
Name of the user config under ./config
|
2023-05-16 18:54:03 +00:00
|
|
|
device (Device, str):
|
2023-05-14 07:48:34 +00:00
|
|
|
To reuse a device.
|
|
|
|
If None, create a new Device object.
|
|
|
|
If str, create a new Device object and use the given device as serial.
|
|
|
|
task (str):
|
|
|
|
Bind a task only for dev purpose. Usually to be None for auto task scheduling.
|
|
|
|
If None, use default configs.
|
|
|
|
"""
|
|
|
|
if isinstance(config, AzurLaneConfig):
|
|
|
|
self.config = config
|
|
|
|
elif isinstance(config, str):
|
|
|
|
self.config = AzurLaneConfig(config, task=task)
|
|
|
|
else:
|
|
|
|
logger.warning('Alas ModuleBase received an unknown config, assume it is AzurLaneConfig')
|
|
|
|
self.config = config
|
|
|
|
|
|
|
|
if isinstance(device, Device):
|
|
|
|
self.device = device
|
|
|
|
elif device is None:
|
|
|
|
self.device = Device(config=self.config)
|
|
|
|
elif isinstance(device, str):
|
|
|
|
self.config.override(Emulator_Serial=device)
|
|
|
|
self.device = Device(config=self.config)
|
|
|
|
else:
|
|
|
|
logger.warning('Alas ModuleBase received an unknown device, assume it is Device')
|
|
|
|
self.device = device
|
|
|
|
|
|
|
|
self.interval_timer = {}
|
|
|
|
|
2023-12-09 13:31:39 +00:00
|
|
|
@cached_class_property
|
|
|
|
def worker(self) -> ThreadPoolExecutor:
|
|
|
|
"""
|
|
|
|
A thread pool to run things at background
|
|
|
|
"""
|
|
|
|
logger.hr('Creating worker')
|
|
|
|
pool = ThreadPoolExecutor(1)
|
|
|
|
return pool
|
|
|
|
|
2023-05-17 16:44:43 +00:00
|
|
|
def match_template(self, button, interval=0, similarity=0.85):
|
2023-05-14 07:48:34 +00:00
|
|
|
"""
|
|
|
|
Args:
|
2023-05-16 18:54:03 +00:00
|
|
|
button (ButtonWrapper):
|
2023-05-14 07:48:34 +00:00
|
|
|
interval (int, float): interval between two active events.
|
2023-05-16 18:54:03 +00:00
|
|
|
similarity (int, float): 0 to 1.
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool:
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
Image detection:
|
|
|
|
```
|
|
|
|
self.device.screenshot()
|
|
|
|
self.appear(Button(area=(...), color=(...), button=(...))
|
|
|
|
self.appear(Template(file='...')
|
|
|
|
```
|
2023-05-16 18:54:03 +00:00
|
|
|
"""
|
|
|
|
self.device.stuck_record_add(button)
|
2023-05-14 07:48:34 +00:00
|
|
|
|
2023-05-16 18:54:03 +00:00
|
|
|
if interval and not self.interval_is_reached(button, interval=interval):
|
|
|
|
return False
|
|
|
|
|
|
|
|
appear = button.match_template(self.device.image, similarity=similarity)
|
|
|
|
|
|
|
|
if appear and interval:
|
|
|
|
self.interval_reset(button, interval=interval)
|
|
|
|
|
|
|
|
return appear
|
|
|
|
|
2023-05-17 16:44:43 +00:00
|
|
|
def match_color(self, button, interval=0, threshold=10):
|
2023-05-16 18:54:03 +00:00
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
button (ButtonWrapper):
|
|
|
|
interval (int, float): interval between two active events.
|
|
|
|
threshold (int): 0 to 255, smaller means more similar
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool:
|
2023-05-14 07:48:34 +00:00
|
|
|
"""
|
|
|
|
self.device.stuck_record_add(button)
|
|
|
|
|
2023-05-16 18:54:03 +00:00
|
|
|
if interval and not self.interval_is_reached(button, interval=interval):
|
|
|
|
return False
|
|
|
|
|
|
|
|
appear = button.match_color(self.device.image, threshold=threshold)
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
if appear and interval:
|
2023-05-16 18:54:03 +00:00
|
|
|
self.interval_reset(button, interval=interval)
|
|
|
|
|
|
|
|
return appear
|
|
|
|
|
2023-05-17 16:44:43 +00:00
|
|
|
def match_template_color(self, button, interval=0, similarity=0.85, threshold=30):
|
2023-05-16 18:54:03 +00:00
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
button (ButtonWrapper):
|
|
|
|
interval (int, float): interval between two active events.
|
|
|
|
similarity (int, float): 0 to 1.
|
|
|
|
threshold (int): 0 to 255, smaller means more similar
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool:
|
|
|
|
"""
|
|
|
|
self.device.stuck_record_add(button)
|
|
|
|
|
|
|
|
if interval and not self.interval_is_reached(button, interval=interval):
|
|
|
|
return False
|
|
|
|
|
|
|
|
appear = button.match_template_color(self.device.image, similarity=similarity, threshold=threshold)
|
|
|
|
|
|
|
|
if appear and interval:
|
|
|
|
self.interval_reset(button, interval=interval)
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
return appear
|
|
|
|
|
2023-05-16 18:54:03 +00:00
|
|
|
appear = match_template
|
|
|
|
|
|
|
|
def appear_then_click(self, button, interval=5, similarity=0.85):
|
|
|
|
appear = self.appear(button, interval=interval, similarity=similarity)
|
2023-05-14 07:48:34 +00:00
|
|
|
if appear:
|
|
|
|
self.device.click(button)
|
|
|
|
return appear
|
|
|
|
|
2023-05-30 18:20:45 +00:00
|
|
|
def wait_until_stable(self, button, timer=Timer(0.3, count=1), timeout=Timer(5, count=10)):
|
|
|
|
"""
|
|
|
|
A terrible method, don't rely too much on it.
|
|
|
|
"""
|
|
|
|
logger.info(f'Wait until stable: {button}')
|
|
|
|
prev_image = self.image_crop(button)
|
|
|
|
timer.reset()
|
|
|
|
timeout.reset()
|
|
|
|
while 1:
|
|
|
|
self.device.screenshot()
|
|
|
|
|
|
|
|
if timeout.reached():
|
|
|
|
logger.warning(f'wait_until_stable({button}) timeout')
|
|
|
|
break
|
|
|
|
|
|
|
|
image = self.image_crop(button)
|
|
|
|
if match_template(image, prev_image):
|
|
|
|
if timer.reached():
|
|
|
|
logger.info(f'{button} stabled')
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
prev_image = image
|
|
|
|
timer.reset()
|
|
|
|
|
2023-09-19 17:20:52 +00:00
|
|
|
def image_crop(self, button, copy=True):
|
2023-05-14 07:48:34 +00:00
|
|
|
"""Extract the area from image.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
button(Button, tuple): Button instance or area tuple.
|
2023-09-19 17:20:52 +00:00
|
|
|
copy:
|
2023-05-14 07:48:34 +00:00
|
|
|
"""
|
|
|
|
if isinstance(button, Button):
|
2023-09-19 17:20:52 +00:00
|
|
|
return crop(self.device.image, button.area, copy=copy)
|
2023-05-30 18:20:45 +00:00
|
|
|
elif isinstance(button, ButtonWrapper):
|
2023-09-19 17:20:52 +00:00
|
|
|
return crop(self.device.image, button.area, copy=copy)
|
2023-05-30 18:20:45 +00:00
|
|
|
elif hasattr(button, 'area'):
|
2023-09-19 17:20:52 +00:00
|
|
|
return crop(self.device.image, button.area, copy=copy)
|
2023-05-14 07:48:34 +00:00
|
|
|
else:
|
2023-09-19 17:20:52 +00:00
|
|
|
return crop(self.device.image, button, copy=copy)
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
def image_color_count(self, button, color, threshold=221, count=50):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
button (Button, tuple): Button instance or area.
|
|
|
|
color (tuple): RGB.
|
|
|
|
threshold: 255 means colors are the same, the lower the worse.
|
|
|
|
count (int): Pixels count.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool:
|
|
|
|
"""
|
2023-09-19 17:20:52 +00:00
|
|
|
if isinstance(button, np.ndarray):
|
|
|
|
image = button
|
|
|
|
else:
|
|
|
|
image = self.image_crop(button, copy=False)
|
|
|
|
mask = color_similarity_2d(image, color=color)
|
|
|
|
cv2.inRange(mask, threshold, 255, dst=mask)
|
|
|
|
sum_ = cv2.countNonZero(mask)
|
|
|
|
return sum_ > count
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
def image_color_button(self, area, color, color_threshold=250, encourage=5, name='COLOR_BUTTON'):
|
|
|
|
"""
|
|
|
|
Find an area with pure color on image, convert into a Button.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
area (tuple[int]): Area to search from
|
|
|
|
color (tuple[int]): Target color
|
|
|
|
color_threshold (int): 0-255, 255 means exact match
|
|
|
|
encourage (int): Radius of button
|
|
|
|
name (str): Name of the button
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Button: Or None if nothing matched.
|
|
|
|
"""
|
|
|
|
image = color_similarity_2d(self.image_crop(area), color=color)
|
|
|
|
points = np.array(np.where(image > color_threshold)).T[:, ::-1]
|
|
|
|
if points.shape[0] < encourage ** 2:
|
|
|
|
# Not having enough pixels to match
|
|
|
|
return None
|
|
|
|
|
|
|
|
point = fit_points(points, mod=image_size(image), encourage=encourage)
|
|
|
|
point = ensure_int(point + area[:2])
|
|
|
|
button_area = area_offset((-encourage, -encourage, encourage, encourage), offset=point)
|
2023-10-04 03:04:51 +00:00
|
|
|
return ClickButton(area=button_area, name=name)
|
2023-05-14 07:48:34 +00:00
|
|
|
|
2023-10-18 18:07:36 +00:00
|
|
|
def get_interval_timer(self, button, interval=5, renew=False) -> Timer:
|
2023-10-18 13:01:48 +00:00
|
|
|
if hasattr(button, 'name'):
|
|
|
|
name = button.name
|
|
|
|
elif callable(button):
|
|
|
|
name = button.__name__
|
|
|
|
else:
|
|
|
|
name = str(button)
|
|
|
|
|
|
|
|
try:
|
|
|
|
timer = self.interval_timer[name]
|
2023-10-18 18:07:36 +00:00
|
|
|
if renew and timer.limit != interval:
|
2023-10-18 13:01:48 +00:00
|
|
|
timer = Timer(interval)
|
|
|
|
self.interval_timer[name] = timer
|
|
|
|
return timer
|
|
|
|
except KeyError:
|
|
|
|
timer = Timer(interval)
|
|
|
|
self.interval_timer[name] = timer
|
|
|
|
return timer
|
|
|
|
|
2023-05-16 18:54:03 +00:00
|
|
|
def interval_reset(self, button, interval=5):
|
2023-05-14 07:48:34 +00:00
|
|
|
if isinstance(button, (list, tuple)):
|
|
|
|
for b in button:
|
2023-06-25 16:07:52 +00:00
|
|
|
self.interval_reset(b, interval)
|
2023-05-14 07:48:34 +00:00
|
|
|
return
|
|
|
|
|
2023-07-02 07:19:28 +00:00
|
|
|
if button is not None:
|
2023-10-18 13:01:48 +00:00
|
|
|
self.get_interval_timer(button, interval=interval).reset()
|
2023-05-14 07:48:34 +00:00
|
|
|
|
2023-05-16 18:54:03 +00:00
|
|
|
def interval_clear(self, button, interval=5):
|
2023-05-14 07:48:34 +00:00
|
|
|
if isinstance(button, (list, tuple)):
|
|
|
|
for b in button:
|
2023-06-25 16:07:52 +00:00
|
|
|
self.interval_clear(b, interval)
|
2023-05-14 07:48:34 +00:00
|
|
|
return
|
|
|
|
|
2023-07-02 07:19:28 +00:00
|
|
|
if button is not None:
|
2023-10-18 13:01:48 +00:00
|
|
|
self.get_interval_timer(button, interval=interval).clear()
|
2023-05-16 18:54:03 +00:00
|
|
|
|
|
|
|
def interval_is_reached(self, button, interval=5):
|
2023-10-18 18:07:36 +00:00
|
|
|
return self.get_interval_timer(button, interval=interval, renew=True).reached()
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
_image_file = ''
|
|
|
|
|
|
|
|
@property
|
|
|
|
def image_file(self):
|
|
|
|
return self._image_file
|
|
|
|
|
|
|
|
@image_file.setter
|
|
|
|
def image_file(self, value):
|
|
|
|
"""
|
|
|
|
For development.
|
|
|
|
Load image from local file system and set it to self.device.image
|
|
|
|
Test an image without taking a screenshot from emulator.
|
|
|
|
"""
|
|
|
|
if isinstance(value, Image.Image):
|
|
|
|
value = np.array(value)
|
|
|
|
elif isinstance(value, str):
|
|
|
|
value = load_image(value)
|
|
|
|
|
|
|
|
self.device.image = value
|
2023-07-02 07:30:47 +00:00
|
|
|
|
|
|
|
def set_lang(self, lang):
|
|
|
|
"""
|
|
|
|
For development.
|
|
|
|
Change lang and affect globally,
|
|
|
|
including assets and server specific methods.
|
|
|
|
"""
|
2023-09-08 14:23:57 +00:00
|
|
|
server_.set_lang(lang)
|
|
|
|
logger.attr('Lang', self.config.LANG)
|
2023-12-09 13:31:39 +00:00
|
|
|
|
|
|
|
def screenshot_tracking_add(self):
|
|
|
|
"""
|
|
|
|
Add a tracking image, image will be saved
|
|
|
|
"""
|
|
|
|
logger.info('screenshot_tracking_add')
|
|
|
|
data = self.device.screenshot_deque[-1]
|
|
|
|
image = data['image']
|
|
|
|
now = data['time']
|
|
|
|
|
|
|
|
def image_encode(im, ti):
|
|
|
|
import io
|
|
|
|
from module.handler.sensitive_info import handle_sensitive_image
|
|
|
|
|
|
|
|
output = io.BytesIO()
|
|
|
|
im = handle_sensitive_image(im)
|
|
|
|
Image.fromarray(im, mode='RGB').save(output, format='png')
|
|
|
|
output.seek(0)
|
|
|
|
|
|
|
|
self.device.screenshot_tracking.append({
|
|
|
|
'time': ti,
|
|
|
|
'image': output
|
|
|
|
})
|
|
|
|
|
|
|
|
ModuleBase.worker.submit(image_encode, image, now)
|