Add: Parse planner results
BIN
assets/share/planner/result/CALCULATE_TITLE.png
Normal file
After Width: | Height: | Size: 7.7 KiB |
BIN
assets/share/planner/result/DETAIL_TITLE.png
Normal file
After Width: | Height: | Size: 8.5 KiB |
BIN
assets/share/planner/result/MATERIAL_TITLE.png
Normal file
After Width: | Height: | Size: 8.2 KiB |
BIN
assets/share/planner/result/OCR_RESULT.png
Normal file
After Width: | Height: | Size: 7.1 KiB |
BIN
assets/share/planner/result/RESULT_CHECK.SEARCH.png
Normal file
After Width: | Height: | Size: 6.8 KiB |
BIN
assets/share/planner/result/RESULT_CHECK.png
Normal file
After Width: | Height: | Size: 7.4 KiB |
BIN
assets/share/planner/result/RESULT_SCROLL.png
Normal file
After Width: | Height: | Size: 8.0 KiB |
65
tasks/planner/assets/assets_planner_result.py
Normal file
@ -0,0 +1,65 @@
|
||||
from module.base.button import Button, ButtonWrapper
|
||||
|
||||
# This file was auto-generated, do not modify it manually. To generate:
|
||||
# ``` python -m dev_tools.button_extract ```
|
||||
|
||||
CALCULATE_TITLE = ButtonWrapper(
|
||||
name='CALCULATE_TITLE',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/CALCULATE_TITLE.png',
|
||||
area=(264, 187, 346, 207),
|
||||
search=(244, 167, 366, 227),
|
||||
color=(141, 143, 161),
|
||||
button=(264, 187, 346, 207),
|
||||
),
|
||||
)
|
||||
DETAIL_TITLE = ButtonWrapper(
|
||||
name='DETAIL_TITLE',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/DETAIL_TITLE.png',
|
||||
area=(263, 402, 346, 422),
|
||||
search=(243, 382, 366, 442),
|
||||
color=(144, 146, 164),
|
||||
button=(263, 402, 346, 422),
|
||||
),
|
||||
)
|
||||
MATERIAL_TITLE = ButtonWrapper(
|
||||
name='MATERIAL_TITLE',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/MATERIAL_TITLE.png',
|
||||
area=(263, 194, 346, 214),
|
||||
search=(243, 174, 366, 234),
|
||||
color=(135, 136, 156),
|
||||
button=(263, 194, 346, 214),
|
||||
),
|
||||
)
|
||||
OCR_RESULT = ButtonWrapper(
|
||||
name='OCR_RESULT',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/OCR_RESULT.png',
|
||||
area=(331, 104, 1201, 592),
|
||||
search=(311, 84, 1221, 612),
|
||||
color=(254, 254, 254),
|
||||
button=(331, 104, 1201, 592),
|
||||
),
|
||||
)
|
||||
RESULT_CHECK = ButtonWrapper(
|
||||
name='RESULT_CHECK',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/RESULT_CHECK.png',
|
||||
area=(263, 128, 324, 148),
|
||||
search=(255, 103, 355, 593),
|
||||
color=(132, 119, 92),
|
||||
button=(263, 128, 324, 148),
|
||||
),
|
||||
)
|
||||
RESULT_SCROLL = ButtonWrapper(
|
||||
name='RESULT_SCROLL',
|
||||
share=Button(
|
||||
file='./assets/share/planner/result/RESULT_SCROLL.png',
|
||||
area=(1238, 104, 1245, 592),
|
||||
search=(1218, 84, 1265, 612),
|
||||
color=(33, 44, 62),
|
||||
button=(1238, 104, 1245, 592),
|
||||
),
|
||||
)
|
202
tasks/planner/result.py
Normal file
@ -0,0 +1,202 @@
|
||||
import cv2
|
||||
from pponnxcr.predict_system import BoxedResult
|
||||
from pydantic import BaseModel
|
||||
|
||||
from module.base.utils import area_center, area_in_area
|
||||
from module.logger import logger
|
||||
from module.ocr.ocr import OcrWhiteLetterOnComplexBackground
|
||||
from module.ui.scroll import AdaptiveScroll
|
||||
from tasks.daily.synthesize import SynthesizeUI
|
||||
from tasks.planner.assets.assets_planner_result import *
|
||||
from tasks.planner.keywords import ITEM_CLASSES
|
||||
from tasks.planner.keywords.classes import ItemBase, ItemCurrency
|
||||
|
||||
CALCULATE_TITLE.load_search(RESULT_CHECK.search)
|
||||
MATERIAL_TITLE.load_search(RESULT_CHECK.search)
|
||||
DETAIL_TITLE.load_search(RESULT_CHECK.search)
|
||||
|
||||
|
||||
class PlannerResultRow(BaseModel):
|
||||
item: ItemBase
|
||||
total: int
|
||||
synthesize: int
|
||||
demand: int
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.item == other.item
|
||||
|
||||
|
||||
class OcrPlannerResult(OcrWhiteLetterOnComplexBackground):
|
||||
def __init__(self):
|
||||
# Planner currently CN only
|
||||
super().__init__(OCR_RESULT, lang='cn')
|
||||
self.limited_area = OCR_RESULT.area
|
||||
self.limit_y = 720
|
||||
|
||||
def filter_detected(self, result: BoxedResult) -> bool:
|
||||
if not area_in_area(result.box, self.limited_area, threshold=0):
|
||||
return False
|
||||
if area_center(result.box)[1] > self.limit_y:
|
||||
return False
|
||||
return True
|
||||
|
||||
def detect_and_ocr(self, image, *args, **kwargs):
|
||||
# Remove rows below DETAIL_TITLE
|
||||
if DETAIL_TITLE.match_template(image):
|
||||
self.limit_y = DETAIL_TITLE.button[3]
|
||||
else:
|
||||
self.limit_y = 720
|
||||
return super().detect_and_ocr(image, *args, **kwargs)
|
||||
|
||||
def pre_process(self, image):
|
||||
r, g, b = cv2.split(image)
|
||||
cv2.max(r, g, dst=r)
|
||||
cv2.max(r, b, dst=r)
|
||||
image = cv2.merge([r, r, r])
|
||||
return image
|
||||
|
||||
|
||||
class PlannerResult(SynthesizeUI):
|
||||
def is_in_planner_result(self):
|
||||
if self.appear(RESULT_CHECK):
|
||||
return True
|
||||
if self.appear(CALCULATE_TITLE):
|
||||
return True
|
||||
if self.appear(MATERIAL_TITLE):
|
||||
return True
|
||||
if self.appear(DETAIL_TITLE):
|
||||
return True
|
||||
return False
|
||||
|
||||
def parse_planner_result_page(self) -> list[PlannerResultRow]:
|
||||
"""
|
||||
Pages:
|
||||
in: planner result
|
||||
"""
|
||||
ocr = OcrPlannerResult()
|
||||
results = ocr.detect_and_ocr(self.device.image)
|
||||
|
||||
x_total = 842
|
||||
x_synthesize = 965
|
||||
x_demand = 1129
|
||||
|
||||
def x_match(result: BoxedResult, x):
|
||||
rx = area_center(result.box)[0]
|
||||
return x - 50 <= rx <= x + 50
|
||||
|
||||
def y_match(result: BoxedResult, y):
|
||||
rx = area_center(result.box)[1]
|
||||
return y - 15 <= rx <= y + 15
|
||||
|
||||
# Split columns
|
||||
list_item = [r for r in results
|
||||
if not r.ocr_text.isdigit() and ocr._match_result(r.ocr_text, keyword_classes=ITEM_CLASSES)]
|
||||
list_number = [r for r in results if r.ocr_text.isdigit()]
|
||||
list_total = [r for r in list_number if x_match(r, x_total)]
|
||||
list_synthesize = [r for r in list_number if x_match(r, x_synthesize)]
|
||||
list_demand = [r for r in list_number if x_match(r, x_demand)]
|
||||
|
||||
# Structure
|
||||
out: list[PlannerResultRow] = []
|
||||
for item in list_item:
|
||||
y_item = area_center(item.box)[1]
|
||||
total = -1
|
||||
for number in list_total:
|
||||
if y_match(number, y_item):
|
||||
total = int(number.ocr_text)
|
||||
break
|
||||
synthesize = -1
|
||||
for number in list_synthesize:
|
||||
if y_match(number, y_item):
|
||||
synthesize = int(number.ocr_text)
|
||||
break
|
||||
demand = -1
|
||||
for number in list_demand:
|
||||
if y_match(number, y_item):
|
||||
demand = int(number.ocr_text)
|
||||
break
|
||||
item = ocr._match_result(item.ocr_text, keyword_classes=ITEM_CLASSES)
|
||||
row = PlannerResultRow(
|
||||
item=item,
|
||||
total=total,
|
||||
synthesize=synthesize,
|
||||
demand=demand
|
||||
)
|
||||
# Validate item
|
||||
# print(row)
|
||||
if row.total <= 0:
|
||||
logger.warning(f'Planner row with total <= 0, {row}')
|
||||
continue
|
||||
if row.synthesize < 0:
|
||||
# Credits always have synthesize="-"
|
||||
if row.item.__class__ != ItemCurrency:
|
||||
logger.warning(f'Planner row with synthesize < 0, {row}')
|
||||
continue
|
||||
if row.demand < 0:
|
||||
logger.warning(f'Planner row with demand < 0, {row}')
|
||||
continue
|
||||
# Add
|
||||
out.append(row)
|
||||
|
||||
logger.info(f'parse_planner_result_page: {out}')
|
||||
return out
|
||||
|
||||
def parse_planner_result(self, skip_first_screenshot=True) -> list[PlannerResultRow]:
|
||||
"""
|
||||
Pages:
|
||||
in: planner result
|
||||
"""
|
||||
logger.hr('Parse planner result')
|
||||
scroll = AdaptiveScroll(RESULT_SCROLL.button, name=RESULT_SCROLL.name)
|
||||
scroll.drag_threshold = 0.1
|
||||
scroll.edge_threshold = 0.1
|
||||
scroll.parameters = {
|
||||
'height': 50,
|
||||
'prominence': 15,
|
||||
'width': 5,
|
||||
}
|
||||
if not skip_first_screenshot:
|
||||
self.device.screenshot()
|
||||
skip_first_screenshot = False
|
||||
if not scroll.at_top(main=self):
|
||||
scroll.set_top(main=self)
|
||||
|
||||
out = []
|
||||
while 1:
|
||||
if skip_first_screenshot:
|
||||
skip_first_screenshot = False
|
||||
else:
|
||||
self.device.screenshot()
|
||||
|
||||
# Skip first page
|
||||
if self.appear(CALCULATE_TITLE):
|
||||
scroll.next_page(main=self, page=0.75)
|
||||
continue
|
||||
|
||||
# Parse
|
||||
rows = self.parse_planner_result_page()
|
||||
for row in rows:
|
||||
if row not in out:
|
||||
out.append(row)
|
||||
logger.attr('PlannerResult', len(rows))
|
||||
|
||||
# Scroll
|
||||
if scroll.at_bottom(main=self):
|
||||
logger.info('Reached scroll end, stop')
|
||||
break
|
||||
elif self.appear(DETAIL_TITLE):
|
||||
logger.info('Reached DETAIL_TITLE, stop')
|
||||
break
|
||||
else:
|
||||
scroll.next_page(main=self, page=0.8)
|
||||
|
||||
logger.hr('Planner Result')
|
||||
for row in out:
|
||||
logger.info(f'Item: {row.item.name}, {row.total}, {row.synthesize}, {row.demand}')
|
||||
return out
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
self = PlannerResult('src')
|
||||
self.device.screenshot()
|
||||
self.parse_planner_result()
|