This commit is contained in:
kyO The NinjA 2020-05-29 14:36:07 -03:00
commit 0498176a32
9 changed files with 402 additions and 100 deletions

188
doc/perspective_en.md Normal file
View File

@ -0,0 +1,188 @@
# Map Detection
Map detection is the core of an Azur lane bot. If simply using `template matching` to do the enemy detection, it will inevitably appear BOSS block by enemies. AzurLaneAutoScript (Alas), provides a better approach of map detection. In module.map, you can get full information in map, such as:
```
2020-03-10 22:09:03.830 | INFO | A B C D E F G H
2020-03-10 22:09:03.830 | INFO | 1 -- ++ 2E -- -- -- -- --
2020-03-10 22:09:03.830 | INFO | 2 -- ++ ++ MY -- -- 2E --
2020-03-10 22:09:03.830 | INFO | 3 == -- FL -- -- -- 2E MY
2020-03-10 22:09:03.830 | INFO | 4 -- == -- -- -- -- ++ ++
2020-03-10 22:09:03.830 | INFO | 5 -- -- -- 2E -- 2E ++ ++
```
module.map mainly consists of the following files:
- perspective.py Perspective detection
- grids.py Grid data parsing
- camera.py Camera moving
- fleet.py Fleet moving
- map.py Map logics for enemy searching
## One Point Perspective
Before understanding how alas do map detection, we have to go through some basic knowledge of `one point perspective`. Map of Azur Lane is grid in one point perspective. Parsing perspective needs to calculate `vanish point` and `distant point`.
In one point perspective:
- the perspective of horizontal lines are still horizontal lines.
- the perspective of all vertical lines intersect at one point, called `vanish point`. The further a vanish point away from grids, the perspective of vertical lines closer to 90 degree.
![vanish_point](perspective.assets/vanish_point.png)
- All diagonals of the grids intersect at one point, called `distant point`, The further a distant point away from grids, the grid become fatter. In fact, there are 2 distant point, the following image draws the one to the left of vanish point.
![distant_point](perspective.assets/distant_point.png)
## Screenshot Pre-processing
![preprocess](perspective.assets/preprocess.png)
When perspective.py gets an screenshot, function `load_image` do such process:
- crop area of detection
- to grayscale, Using the algorithm in Photoshop, (MAX(R, G, B) + MIN(R, G, B)) // 2
- cover UI. Here use `overlay.png`
- Reverse color
(Image above is before reverse, because the reversed image is too terrified to show)
## Grid Detection
### Detecting Grid Lines
Grid lines are black lines with a transparency of 20%. In 720P, it has 3 to 4 pixel wide. During the period of "old-UI", we simply move the image 1px and divide by the origin image to detect grid lines. White frame with transparency gradient is added in "new-UI", which increase the difficulty of detection.
Function `find_peaks` use `scipy.signal.find_peaks` to find grid lines. `scipy.signal.find_peaks` can find peaks of given data.
Crop image at height == 370, use following parameters:
```
FIND_PEAKS_PARAMETERS = {
'height': (150, 255 - 40),
'width': 2,
'prominence': 10,
'distance': 35,
}
```
![find_peaks](perspective.assets/find_peaks.png)
As you can see, some grid lines are not detected and has many mistake as well. Not a big deal.
Scan every row and draw the image. (For better performance, image will be flatten to 1-D array before detection, which will reduce time cost to 1/4.)
![peaks](perspective.assets/peaks.png)
We gets 4 images so far, they are `vertical inner lines`, `horizontal inner lines`, `vertical edge lines`, `horizontal edge lines`. This process takes about 0.13 s on `I7-8700k` , and the full map detection process will take about 0.15 s.
P. S. Parameters use to detect inner lines are different from edge lines. In different maps, we should use different parameters. If you are lazy, you can use the default parameters, which is for 7-2. Those parameters can be used in Chapter 7, can even be used in `北境序曲(event_20200227_cn) D3`.
## Fitting Grid Lines
Function `hough_lines` use `cv2.HoughLines` to detect lines. Now we have 4 group of lines.
![hough_lines_1](perspective.assets/hough_lines_1.png)
Take `vertical inner lines` for example. There some incorrect lines.
We create a horizontal line at the middle of image, called `MID_Y`, (When fixing vertical lines, create a vertical one), and cross `vertical inner lines`, those crossing points are called `mid`. If the distance between two mids smaller than 3, we treat them as a group of lines, and replace them with their average. After that, we corrected the result.
## Fitting Vanish Point
As mention above, all vertical lines in one point perspective intersect at one point. There are errors in vertical lines, so we can't solve the equations to get that.
Function `_vanish_point_value` , use to calculate the distance between a point and a group of lines, and use `scipy.optimize.brute` to brute-force solve the closest point to vertical lines, which is called `vanish point`. This surface shows the sum of distance from the point to the group of vertical line. In order to ignore wrong lines far away from vanish point, it uses logarithm.
![vanish_point_distance](perspective.assets/vanish_point_distance.png)
Still remember `mid` ? we re-link then to vanish point, and act as vertical lines. This is the 2nd correction.
## Fitting Distant Point
We intersect the corrected vertical lines and the origin horizontal lines. `distant point` and `canish point` are on the same horizontal line, so we take a point on this horizontal line, and link all intersection, get `oblique lines`. Function `_distant_point_value` calculates the distance between the `mid` of oblique lines. Also use `scipy.optimize.brute` to brute-force solve the closet point, called `distant point`.
This image draws the oblique lines. Although there are many mistakes, it do gets the correct point.
![diatant_point_links](perspective.assets/diatant_point_links.png)
## Cleansing Grid Lines
With the above process, we get grid lines like this. It's generally correct, but with mistakes.
![mid_cleanse_before](perspective.assets/mid_cleanse_before.png)
Take the `mid` of vertical lines.
```
[ 185.63733413 315.65944444 441.62998244 446.89313842 573.6301653
686.40881027 701.20376316 830.27394123 959.00511191 1087.91874026
1220.58809477]
```
We know all grid has a same width, so theoretically, `mid` is an arithmetic progression, but with wrong members and missing members. Use a linear function `y = a * x + b` to describe that. Because of mistakes and missing, the `x` in linear function may not be the number `n` in arithmetic progression. As long as mistakes less than 10, there will have `x ∈ [n - 10, n + 10]` .
Then, transform the linear function as `b = -x * a + y`, and `x ∈ [n - 10, n + 10]` . If treat `a` to be independent variable and treat `b` to be dependent variable, it's a group of lines with amount of 11 * 21. Draw them.
![mid_cleanse_lines_with_circle](perspective.assets/mid_cleanse_lines_with_circle.png)
Discover that many lines intersect at where the orange circle pointed out, we call them `coincident point`. Those incorrect `mid` from incorrect lines can't intersect there, and get deleted.
Use `scipy.optimize.brute` to brute-force solve the coordinate of the best `coincident point`.
```
[-201.33197146 129.0958336]
```
So the linear function is `y = 129.0958336 * x - 201.33197146` .
> When calculating distance to the lines, it uses this function
>
> ```
> distance = 1 / (1 + np.exp(9 / distance) / distance)
> ```
> This function makes it less effect by lines far away, encourage optimizer to choose the local minimum.
>
> ![mid_cleanse_function](perspective.assets/mid_cleanse_function.png)
>How to cleanse horizontal lines?
>
>Make any line through `distant point`, link intersections and `vanish point` . This finish a map relation from horizontal lines to vertical lines. When cleanse finished, do a reversed process.
>
>![mid_cleanse_convert](perspective.assets/mid_cleanse_convert.png)
At last, generate `mid`, and crop it with the edge of map and screen. Missing `mid` get filled now. Re-link `mid` to vanish point, and the cleansing of grid lines is finished.
Draw results:
![mid_cleanse_after](perspective.assets/mid_cleanse_after-1584008112022.png)
# Grid Cropping
In fact, shipgrils, enemies, mystery are images fixed on grid center. They are scaled because of perspective.
P. S. They are scaled only, but not perspective transform. Only red border and yellow border on the ground are perspective transformed.
![crop_basic](perspective.assets/crop_basic.png)
In `grid_predictor.py`, provides function `get_relative_image` , which do crops according to grid center, and rescale to given shape. Now we can simply use template matching.
```
from PIL import Image
from module.config.config import cfg
i = Image.open(file)
grids = Grids(i, cfg)
out = Image.new('RGB', tuple((grids.shape + 1) * 105 - 5))
for loca, grid in grids.grids.items():
image = grid.get_relative_image(
(-0.415 - 0.7, -0.62 - 0.7, -0.415, -0.62), output_shape=(100, 100))
out.paste(image, tuple(np.array(loca) * 105))
out
```
![crop_scale](perspective.assets/crop_scale.png)
## Parsing Grid Data
To be continued.

View File

@ -61,12 +61,10 @@ class CampaignRun(CampaignUI, Reward, LoginHandler):
return True return True
def campaign_name_set(self, name): def campaign_name_set(self, name):
# self.config.CAMPAIGN_NAME = name if not self.campaign.config.ENABLE_SAVE_GET_ITEMS \
# folder = self.config.SCREEN_SHOT_SAVE_FOLDER_BASE + '/' + name or not len(self.campaign.config.SCREEN_SHOT_SAVE_FOLDER_BASE.strip()):
# if not os.path.exists(folder): return False
# os.mkdir(folder) # Create folder to save drop screenshot
# self.config.SCREEN_SHOT_SAVE_FOLDER = folder
folder = self.campaign.config.SCREEN_SHOT_SAVE_FOLDER_BASE + '/' + name folder = self.campaign.config.SCREEN_SHOT_SAVE_FOLDER_BASE + '/' + name
if not os.path.exists(folder): if not os.path.exists(folder):
os.mkdir(folder) os.mkdir(folder)

View File

@ -6,7 +6,7 @@ import shutil
from gooey import Gooey, GooeyParser from gooey import Gooey, GooeyParser
from alas import AzurLaneAutoScript from alas import AzurLaneAutoScript
from module.config.dictionary import dic_true_eng_to_eng, dic_eng_to_chi from module.config.dictionary import dic_true_eng_to_eng, dic_eng_to_true_eng
from module.logger import logger, pyw_name from module.logger import logger, pyw_name
@ -79,14 +79,14 @@ def main(ini_name=''):
config = update_config_from_template(config, file=config_file) config = update_config_from_template(config, file=config_file)
event_folder = [dic_eng_to_chi.get(f, f) for f in os.listdir('./campaign') if f.startswith('event_')][::-1] event_folder = [dic_eng_to_true_eng.get(f, f) for f in os.listdir('./campaign') if f.startswith('event_')][::-1]
saved_config = {} saved_config = {}
for opt, option in config.items(): for opt, option in config.items():
for key, value in option.items(): for key, value in option.items():
key = dic_eng_to_chi.get(key, key) key = dic_eng_to_true_eng.get(key, key)
if value in dic_eng_to_chi: if value in dic_eng_to_true_eng:
value = dic_eng_to_chi.get(value, value) value = dic_eng_to_true_eng.get(value, value)
if value == 'None': if value == 'None':
value = '' value = ''

View File

@ -340,11 +340,14 @@ class AzurLaneConfig:
C124_AMMO_PICK_UP = 3 C124_AMMO_PICK_UP = 3
def create_folder(self): def create_folder(self):
self.SCREEN_SHOT_SAVE_FOLDER = self.SCREEN_SHOT_SAVE_FOLDER_BASE + '/' + self.CAMPAIGN_NAME for folder in [self.ASSETS_FOLDER, self.PERSPECTIVE_ERROR_LOG_FOLDER, self.ERROR_LOG_FOLDER]:
for folder in [self.SCREEN_SHOT_SAVE_FOLDER_BASE, self.ASSETS_FOLDER, self.SCREEN_SHOT_SAVE_FOLDER,
self.PERSPECTIVE_ERROR_LOG_FOLDER, self.ERROR_LOG_FOLDER]:
if folder and not os.path.exists(folder): if folder and not os.path.exists(folder):
os.mkdir(folder) os.mkdir(folder)
self.SCREEN_SHOT_SAVE_FOLDER = self.SCREEN_SHOT_SAVE_FOLDER_BASE + '/' + self.CAMPAIGN_NAME
if self.ENABLE_SAVE_GET_ITEMS and len(self.SCREEN_SHOT_SAVE_FOLDER_BASE.strip()):
for folder in [self.SCREEN_SHOT_SAVE_FOLDER_BASE, self.SCREEN_SHOT_SAVE_FOLDER]:
if folder and not os.path.exists(folder):
os.mkdir(folder)
def merge(self, other): def merge(self, other):
""" """

View File

@ -198,6 +198,7 @@ dic_true_eng_to_eng = {
'event_20200521_en': 'event_20200521_en', 'event_20200521_en': 'event_20200521_en',
} }
dic_eng_to_true_eng = {v: k for k, v in dic_true_eng_to_eng.items()}
dic_chi_to_eng = { dic_chi_to_eng = {
# Function # Function
@ -380,7 +381,7 @@ dic_chi_to_eng = {
'穹顶下的圣咏曲': 'event_20200521_cn', '穹顶下的圣咏曲': 'event_20200521_cn',
} }
dic_eng_to_chi = {v: k for k, v in dic_true_eng_to_eng.items()} dic_eng_to_chi = {v: k for k, v in dic_chi_to_eng.items()}
def to_bool(string): def to_bool(string):

View File

@ -18,6 +18,7 @@ class Connection:
self.serial = str(self.config.SERIAL) self.serial = str(self.config.SERIAL)
self.device = self.connect(self.serial) self.device = self.connect(self.serial)
self.disable_uiautomator2_auto_quit() self.disable_uiautomator2_auto_quit()
self.check_screen_size()
@staticmethod @staticmethod
def adb_command(cmd, serial=None): def adb_command(cmd, serial=None):
@ -61,3 +62,18 @@ class Connection:
def disable_uiautomator2_auto_quit(self, port=7912, expire=300000): def disable_uiautomator2_auto_quit(self, port=7912, expire=300000):
self.adb_command(['forward', 'tcp:%s' % port, 'tcp:%s' % port], serial=self.serial) self.adb_command(['forward', 'tcp:%s' % port, 'tcp:%s' % port], serial=self.serial)
requests.post('http://127.0.0.1:%s/newCommandTimeout' % port, data=str(expire)) requests.post('http://127.0.0.1:%s/newCommandTimeout' % port, data=str(expire))
def check_screen_size(self):
width, height = self.device.window_size()
if height > width:
width, height = height, width
logger.attr('Screen_size', f'{width}x{height}')
if width == 1280 and height == 720:
return True
else:
logger.warning(f'Not supported screen size: {width}x{height}')
logger.warning('Alas requires 1280x720')
logger.hr('Script end')
exit(1)

View File

@ -9,7 +9,7 @@ from module.logger import logger
class EnemySearchingHandler(InfoHandler): class EnemySearchingHandler(InfoHandler):
MAP_ENEMY_SEARCHING_OVERLAY_TRANSPARENCY_THRESHOLD = 0.5 # Usually (0.70, 0.80). MAP_ENEMY_SEARCHING_OVERLAY_TRANSPARENCY_THRESHOLD = 0.5 # Usually (0.70, 0.80).
MAP_ENEMY_SEARCHING_TIMEOUT_SECOND = 5 MAP_ENEMY_SEARCHING_TIMEOUT_SECOND = 5
in_stage_timer = Timer(1, count=2) in_stage_timer = Timer(1.5, count=5)
def enemy_searching_color_initial(self): def enemy_searching_color_initial(self):
MAP_ENEMY_SEARCHING.load_color(self.device.image) MAP_ENEMY_SEARCHING.load_color(self.device.image)

View File

@ -106,7 +106,7 @@ class Fleet(Camera, MapOperation, AmbushHandler):
# Wait after ambushed. # Wait after ambushed.
ambushed_retry = Timer(0.5) ambushed_retry = Timer(0.5)
# If nothing happens, click again. # If nothing happens, click again.
walk_timeout = Timer(10) walk_timeout = Timer(20)
walk_timeout.start() walk_timeout.start()
while 1: while 1:

View File

@ -5,6 +5,7 @@ import cv2
import numpy as np import numpy as np
from scipy import signal from scipy import signal
from module.base.decorator import Config
from module.base.ocr import Ocr from module.base.ocr import Ocr
from module.base.timer import Timer from module.base.timer import Timer
from module.base.utils import area_offset, get_color, random_rectangle_vector from module.base.utils import area_offset, get_color, random_rectangle_vector
@ -14,10 +15,26 @@ from module.reward.assets import *
from module.ui.page import page_reward, page_commission, CAMPAIGN_CHECK from module.ui.page import page_reward, page_commission, CAMPAIGN_CHECK
from module.ui.ui import UI from module.ui.ui import UI
dictionary = { dictionary_cn = {
'major_comm': ['Self', 'Defense Exercise', 'Research Mission', 'Prep', 'Tactical Class', 'Cargo Transport'], 'major_comm': ['自主训练', '对抗演习', '科研任务', '工具整备', '战术课程', '货物运输'],
'daily_comm': ['Daily', 'Awakening'], 'daily_comm': ['日常资源开发', '高阶战术研发'],
'extra_drill': ['Sailing', 'Defense Patrol', 'Buoy Inspection'], 'extra_drill': ['航行训练', '防卫巡逻', '海域浮标检查作业'],
'extra_part': ['委托'],
'extra_cube': ['演习'],
'extra_oil': ['油田'],
'extra_book': ['商船护卫'],
'urgent_drill': ['运输部队', '侦查部队', '主力部队', '精锐部队'],
'urgent_part': ['维拉', '', '多伦瓦', '恐班纳'],
'urgent_book': ['土豪尔', '姆波罗', '马拉基', '卡波罗', '马内', '玛丽', '', '特林'],
'urgent_box': ['装备', '物资'],
'urgent_cube': ['解救', '敌袭'],
'urgent_gem': ['要员', '度假', '巡视'],
'urgent_ship': ['观舰']
}
dictionary_en = {
'major_comm': ['Self Training', 'Defense Exercise', 'Research Mission', 'Tool Prep', 'Tactical Class', 'Cargo Transport'],
'daily_comm': ['Daily Resource Extraction', 'Awakening Tactical Research'],
'extra_drill': ['Sailing Training', 'Defense Patrol', 'Buoy Inspection'],
'extra_part': ['Commission'], 'extra_part': ['Commission'],
'extra_cube': ['Exercise'], 'extra_cube': ['Exercise'],
'extra_oil': ['Oil Extraction'], 'extra_oil': ['Oil Extraction'],
@ -33,35 +50,48 @@ dictionary = {
class Commission: class Commission:
def __init__(self, image, y): button: Button
name: str
genre: str
status: str
duration: timedelta
expire: timedelta
def __init__(self, image, y, config):
self.config = config
self.y = y self.y = y
self.stack_y = y self.stack_y = y
self.area = (188, y - 119, 1199, y) self.area = (188, y - 119, 1199, y)
self.image = image self.image = image
self.valid = True self.valid = True
self.commission_parse()
@Config.when(SERVER='en')
def commission_parse(self):
# Name # Name
area = area_offset((176, 23, 420, 50), self.area[0:2]) # This is different from CN, EN has longer names
area = area_offset((176, 23, 420, 51), self.area[0:2])
button = Button(area=area, color=(), button=area, name='COMMISSION') button = Button(area=area, color=(), button=area, name='COMMISSION')
ocr = Ocr(button, lang='cnocr', back=(74, 97, 148), use_binary=False) ocr = Ocr(button, lang='cnocr', back=(74, 97, 148), use_binary=False)
self.button = button self.button = button
self.name = ocr.ocr(image) self.name = ocr.ocr(self.image)
self.genre = self.parse_name(self.name) self.genre = self.commission_name_parse(self.name)
# Duration time # Duration time
area = area_offset((290, 74, 390, 92), self.area[0:2]) area = area_offset((290, 74, 390, 92), self.area[0:2])
button = Button(area=area, color=(), button=area, name='DURATION') button = Button(area=area, color=(), button=area, name='DURATION')
ocr = Ocr(button, lang='stage', back=(57, 85, 132)) ocr = Ocr(button, lang='stage', back=(57, 85, 132))
self.duration = self.parse_time(ocr.ocr(image)) self.duration = self.parse_time(ocr.ocr(self.image))
# Expire time # Expire time
area = area_offset((-49, 68, -45, 84), self.area[0:2]) area = area_offset((-49, 68, -45, 84), self.area[0:2])
button = Button(area=area, color=(189, 65, 66), button=area, name='IS_URGENT') button = Button(area=area, color=(189, 65, 66),
if button.appear_on(image): button=area, name='IS_URGENT')
if button.appear_on(self.image):
area = area_offset((-49, 73, 45, 91), self.area[0:2]) area = area_offset((-49, 73, 45, 91), self.area[0:2])
button = Button(area=area, color=(), button=area, name='EXPIRE') button = Button(area=area, color=(), button=area, name='EXPIRE')
ocr = Ocr(button, lang='stage', back=(189, 65, 66)) ocr = Ocr(button, lang='stage', back=(189, 65, 66))
self.expire = self.parse_time(ocr.ocr(image)) self.expire = self.parse_time(ocr.ocr(self.image))
else: else:
self.expire = None self.expire = None
@ -72,7 +102,44 @@ class Commission:
1: 'running', 1: 'running',
2: 'pending' 2: 'pending'
} }
self.status = dic[int(np.argmax(get_color(image, area)))] self.status = dic[int(np.argmax(get_color(self.image, area)))]
@Config.when(SERVER=None)
def commission_parse(self):
# Name
area = area_offset((176, 23, 420, 50), self.area[0:2])
button = Button(area=area, color=(), button=area, name='COMMISSION')
ocr = Ocr(button, lang='cnocr', back=(74, 97, 148), use_binary=False)
self.button = button
self.name = ocr.ocr(self.image)
self.genre = self.commission_name_parse(self.name)
# Duration time
area = area_offset((290, 74, 390, 92), self.area[0:2])
button = Button(area=area, color=(), button=area, name='DURATION')
ocr = Ocr(button, lang='stage', back=(57, 85, 132))
self.duration = self.parse_time(ocr.ocr(self.image))
# Expire time
area = area_offset((-49, 68, -45, 84), self.area[0:2])
button = Button(area=area, color=(189, 65, 66),
button=area, name='IS_URGENT')
if button.appear_on(self.image):
area = area_offset((-49, 73, 45, 91), self.area[0:2])
button = Button(area=area, color=(), button=area, name='EXPIRE')
ocr = Ocr(button, lang='stage', back=(189, 65, 66))
self.expire = self.parse_time(ocr.ocr(self.image))
else:
self.expire = None
# Status
area = area_offset((179, 71, 187, 93), self.area[0:2])
dic = {
0: 'finished',
1: 'running',
2: 'pending'
}
self.status = dic[int(np.argmax(get_color(self.image, area)))]
def __str__(self): def __str__(self):
if self.valid: if self.valid:
@ -124,7 +191,8 @@ class Commission:
result = [int(s) for s in result.groups()] result = [int(s) for s in result.groups()]
return timedelta(hours=result[0], minutes=result[1], seconds=result[2]) return timedelta(hours=result[0], minutes=result[1], seconds=result[2])
def parse_name(self, string): @Config.when(SERVER='en')
def commission_name_parse(self, string):
""" """
Args: Args:
string (str): Commission name, such as 'NYB要员护卫'. string (str): Commission name, such as 'NYB要员护卫'.
@ -132,7 +200,25 @@ class Commission:
Returns: Returns:
str: Commission genre, such as 'urgent_gem'. str: Commission genre, such as 'urgent_gem'.
""" """
for key, value in dictionary.items(): for key, value in dictionary_en.items():
for keyword in value:
if keyword in string:
return key
logger.warning(f'Name with unknown genre: {string}')
self.valid = False
return ''
@Config.when(SERVER=None)
def commission_name_parse(self, string):
"""
Args:
string (str): Commission name, such as 'NYB要员护卫'.
Returns:
str: Commission genre, such as 'urgent_gem'.
"""
for key, value in dictionary_cn.items():
for keyword in value: for keyword in value:
if keyword in string: if keyword in string:
return key return key
@ -148,7 +234,8 @@ class CommissionGroup:
lower = int((show[3] - show[1]) / 2 - height / 2) lower = int((show[3] - show[1]) / 2 - height / 2)
template_area = (620, lower, 1154, lower + height) template_area = (620, lower, 1154, lower + height)
def __init__(self): def __init__(self, config):
self.config = config
self.template = None self.template = None
self.swipe = 0 self.swipe = 0
self.commission = [] self.commission = []
@ -183,15 +270,18 @@ class CommissionGroup:
if self.template is None: if self.template is None:
self.template = np.array(image.crop(self.template_area)) self.template = np.array(image.crop(self.template_area))
self.swipe = 0 self.swipe = 0
res = cv2.matchTemplate(self.template, np.array(image), cv2.TM_CCOEFF_NORMED) res = cv2.matchTemplate(
self.template, np.array(image), cv2.TM_CCOEFF_NORMED)
_, similarity, _, position = cv2.minMaxLoc(res) _, similarity, _, position = cv2.minMaxLoc(res)
if similarity < 0.85: if similarity < 0.85:
logger.warning(f'Low similarity when finding swipe. Similarity: {similarity}, Position: {position}') logger.warning(
f'Low similarity when finding swipe. Similarity: {similarity}, Position: {position}')
self.swipe -= position[1] - self.template_area[1] self.swipe -= position[1] - self.template_area[1]
self.template = np.array(image.crop(self.template_area)) self.template = np.array(image.crop(self.template_area))
# Find commission position # Find commission position
color_height = np.mean(image.crop((597, 0, 619, 720)).convert('L'), axis=1) color_height = np.mean(image.crop(
(597, 0, 619, 720)).convert('L'), axis=1)
parameters = {'height': 200} parameters = {'height': 200}
peaks, _ = signal.find_peaks(color_height, **parameters) peaks, _ = signal.find_peaks(color_height, **parameters)
peaks = [y for y in peaks if y > 67 + 117] peaks = [y for y in peaks if y > 67 + 117]
@ -202,76 +292,80 @@ class CommissionGroup:
diff = np.array([c.stack_y - stack_y for c in self.commission]) diff = np.array([c.stack_y - stack_y for c in self.commission])
if np.any(np.abs(diff) < 3): if np.any(np.abs(diff) < 3):
continue continue
commission = Commission(image, y=y) commission = Commission(image, y=y, config=self.config)
commission.stack_y = stack_y commission.stack_y = stack_y
logger.info(f'Add commission: {commission}') logger.info(f'Add commission: {commission}')
self.commission.append(commission) self.commission.append(commission)
def commission_choose(daily, urgent, priority, time_limit=None):
"""
Args:
daily (CommissionGroup):
urgent (CommissionGroup):
priority (dict):
time_limit (datetime):
Returns:
CommissionGroup, CommissionGroup: Chosen daily commission, Chosen urgent commission
"""
# Count Commission
commission = daily.commission + urgent.commission
running_count = int(np.sum([1 for c in commission if c.status == 'running']))
logger.attr('Running', running_count)
if running_count >= 4:
return [], []
# Calculate priority
commission = [c for c in commission if c.valid and c.status == 'pending']
comm_priority = []
for comm in commission:
pri = priority[comm.genre]
if comm.duration <= timedelta(hours=2):
pri += priority['duration_shorter_than_2']
if comm.duration >= timedelta(hours=6):
pri += priority['duration_longer_than_6']
if comm.expire:
if comm.expire <= timedelta(hours=2):
pri += priority['expire_shorter_than_2']
if comm.expire >= timedelta(hours=6):
pri += priority['expire_longer_than_6']
comm_priority.append(pri)
# Sort
commission = list(np.array(commission)[np.argsort(comm_priority)])[::-1]
if time_limit:
commission = [comm for comm in commission if datetime.now() + comm.duration <= time_limit]
commission = commission[:4 - running_count]
daily_choose, urgent_choose = CommissionGroup(), CommissionGroup()
for comm in commission:
if comm in daily:
daily_choose.commission.append(comm)
if comm in urgent:
urgent_choose.commission.append(comm)
if daily_choose:
logger.info('Choose daily commission')
for comm in daily_choose:
logger.info(comm)
if urgent_choose:
logger.info('Choose urgent commission')
for comm in urgent_choose:
logger.info(comm)
return daily_choose, urgent_choose
class RewardCommission(UI, InfoHandler): class RewardCommission(UI, InfoHandler):
daily: CommissionGroup daily: CommissionGroup
urgent: CommissionGroup urgent: CommissionGroup
daily_choose: CommissionGroup daily_choose: CommissionGroup
urgent_choose: CommissionGroup urgent_choose: CommissionGroup
def _commission_choose(self, daily, urgent, priority, time_limit=None):
"""
Args:
daily (CommissionGroup):
urgent (CommissionGroup):
priority (dict):
time_limit (datetime):
Returns:
CommissionGroup, CommissionGroup: Chosen daily commission, Chosen urgent commission
"""
# Count Commission
commission = daily.commission + urgent.commission
running_count = int(
np.sum([1 for c in commission if c.status == 'running']))
logger.attr('Running', running_count)
if running_count >= 4:
return [], []
# Calculate priority
commission = [
c for c in commission if c.valid and c.status == 'pending']
comm_priority = []
for comm in commission:
pri = priority[comm.genre]
if comm.duration <= timedelta(hours=2):
pri += priority['duration_shorter_than_2']
if comm.duration >= timedelta(hours=6):
pri += priority['duration_longer_than_6']
if comm.expire:
if comm.expire <= timedelta(hours=2):
pri += priority['expire_shorter_than_2']
if comm.expire >= timedelta(hours=6):
pri += priority['expire_longer_than_6']
comm_priority.append(pri)
# Sort
commission = list(np.array(commission)[
np.argsort(comm_priority)])[::-1]
if time_limit:
commission = [
comm for comm in commission if datetime.now() + comm.duration <= time_limit]
commission = commission[:4 - running_count]
daily_choose, urgent_choose = CommissionGroup(
self.config), CommissionGroup(self.config)
for comm in commission:
if comm in daily:
daily_choose.commission.append(comm)
if comm in urgent:
urgent_choose.commission.append(comm)
if daily_choose:
logger.info('Choose daily commission')
for comm in daily_choose:
logger.info(comm)
if urgent_choose:
logger.info('Choose urgent commission')
for comm in urgent_choose:
logger.info(comm)
return daily_choose, urgent_choose
def _commission_ensure_mode(self, mode): def _commission_ensure_mode(self, mode):
if self.appear(COMMISSION_DAILY): if self.appear(COMMISSION_DAILY):
current = 'daily' current = 'daily'
@ -311,13 +405,15 @@ class RewardCommission(UI, InfoHandler):
def _commission_swipe(self, distance=300): def _commission_swipe(self, distance=300):
# Distance of two commission is 146px # Distance of two commission is 146px
p1, p2 = random_rectangle_vector((0, -distance), box=(620, 67, 1154, 692), random_range=(-20, -5, 20, 5)) p1, p2 = random_rectangle_vector(
self.device.drag(p1, p2, segments=2, shake=(25, 0), point_random=(0, 0, 0, 0), shake_random=(-5, 0, 5, 0)) (0, -distance), box=(620, 67, 1154, 692), random_range=(-20, -5, 20, 5))
self.device.drag(p1, p2, segments=2, shake=(25, 0),
point_random=(0, 0, 0, 0), shake_random=(-5, 0, 5, 0))
self.device.sleep(0.3) self.device.sleep(0.3)
self.device.screenshot() self.device.screenshot()
def _commission_scan_list(self): def _commission_scan_list(self):
commission = CommissionGroup() commission = CommissionGroup(self.config)
commission.merge(self.device.image) commission.merge(self.device.image)
if commission.count <= 3: if commission.count <= 3:
return commission return commission
@ -352,7 +448,7 @@ class RewardCommission(UI, InfoHandler):
self.daily = daily self.daily = daily
self.urgent = urgent self.urgent = urgent
self.daily_choose, self.urgent_choose = commission_choose( self.daily_choose, self.urgent_choose = self._commission_choose(
self.daily, self.daily,
self.urgent, self.urgent,
priority=self.config.COMMISSION_PRIORITY, priority=self.config.COMMISSION_PRIORITY,
@ -396,7 +492,7 @@ class RewardCommission(UI, InfoHandler):
logger.hr(f'Finding commission') logger.hr(f'Finding commission')
logger.info(f'Finding commission {comm}') logger.info(f'Finding commission {comm}')
commission = CommissionGroup() commission = CommissionGroup(self.config)
prev = 0 prev = 0
for _ in range(15): for _ in range(15):
commission.merge(self.device.image) commission.merge(self.device.image)