mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2024-11-22 16:40:28 +00:00
556 lines
15 KiB
Python
556 lines
15 KiB
Python
import datetime
|
|
import operator
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from queue import Queue
|
|
from typing import Callable, Generator, List
|
|
|
|
import pywebio
|
|
from module.config.utils import deep_iter
|
|
from module.logger import logger
|
|
from module.webui.setting import State
|
|
from pywebio.input import PASSWORD, input
|
|
from pywebio.output import PopupSize, popup, put_html, toast
|
|
from pywebio.session import eval_js
|
|
from pywebio.session import info as session_info
|
|
from pywebio.session import register_thread, run_js
|
|
from rich.console import Console, ConsoleOptions
|
|
from rich.terminal_theme import TerminalTheme
|
|
|
|
|
|
RE_DATETIME = (
|
|
r"\d{4}\-(0\d|1[0-2])\-([0-2]\d|[3][0-1]) "
|
|
r"([0-1]\d|[2][0-3]):([0-5]\d):([0-5]\d)"
|
|
)
|
|
|
|
|
|
TRACEBACK_CODE_FORMAT = """\
|
|
<code class="rich-traceback">
|
|
<pre class="rich-traceback-code">{code}</pre>
|
|
</code>
|
|
"""
|
|
|
|
LOG_CODE_FORMAT = "{code}"
|
|
|
|
DARK_TERMINAL_THEME = TerminalTheme(
|
|
(30, 30, 30), # Background
|
|
(204, 204, 204), # Foreground
|
|
[
|
|
(0, 0, 0), # Black
|
|
(205, 49, 49), # Red
|
|
(13, 188, 121), # Green
|
|
(229, 229, 16), # Yellow
|
|
(36, 114, 200), # Blue
|
|
(188, 63, 188), # Purple / Magenta
|
|
(17, 168, 205), # Cyan
|
|
(229, 229, 229), # White
|
|
],
|
|
[ # Bright
|
|
(102, 102, 102), # Black
|
|
(241, 76, 76), # Red
|
|
(35, 209, 139), # Green
|
|
(245, 245, 67), # Yellow
|
|
(59, 142, 234), # Blue
|
|
(214, 112, 214), # Purple / Magenta
|
|
(41, 184, 219), # Cyan
|
|
(229, 229, 229), # White
|
|
],
|
|
)
|
|
|
|
LIGHT_TERMINAL_THEME = TerminalTheme(
|
|
(255, 255, 255), # Background
|
|
(97, 97, 97), # Foreground
|
|
[
|
|
(0, 0, 0), # Black
|
|
(205, 49, 49), # Red
|
|
(0, 188, 0), # Green
|
|
(148, 152, 0), # Yellow
|
|
(4, 81, 165), # Blue
|
|
(188, 5, 188), # Purple / Magenta
|
|
(5, 152, 188), # Cyan
|
|
(85, 85, 85), # White
|
|
],
|
|
[ # Bright
|
|
(102, 102, 102), # Black
|
|
(205, 49, 49), # Red
|
|
(20, 206, 20), # Green
|
|
(181, 186, 0), # Yellow
|
|
(4, 81, 165), # Blue
|
|
(188, 5, 188), # Purple / Magenta
|
|
(5, 152, 188), # Cyan
|
|
(165, 165, 165), # White
|
|
],
|
|
)
|
|
|
|
|
|
class QueueHandler:
|
|
def __init__(self, q: Queue) -> None:
|
|
self.queue = q
|
|
|
|
def write(self, s: str):
|
|
self.queue.put(s)
|
|
|
|
|
|
class Task:
|
|
def __init__(
|
|
self, g: Generator, delay: float, next_run: float = None, name: str = None
|
|
) -> None:
|
|
self.g = g
|
|
g.send(None)
|
|
self.delay = delay
|
|
self.next_run = next_run if next_run else time.time()
|
|
self.name = name if name is not None else self.g.__name__
|
|
|
|
def __str__(self) -> str:
|
|
return f"<{self.name} (delay={self.delay})>"
|
|
|
|
def __next__(self) -> None:
|
|
return next(self.g)
|
|
|
|
def send(self, obj) -> None:
|
|
return self.g.send(obj)
|
|
|
|
__repr__ = __str__
|
|
|
|
|
|
class TaskHandler:
|
|
def __init__(self) -> None:
|
|
# List of background running task
|
|
self.tasks: List[Task] = []
|
|
# List of task name to be removed
|
|
self.pending_remove_tasks: List[Task] = []
|
|
# Running task
|
|
self._task = None
|
|
# Task running thread
|
|
self._thread: threading.Thread = None
|
|
self._alive = False
|
|
self._lock = threading.Lock()
|
|
|
|
def add(self, func, delay: float, pending_delete: bool = False) -> None:
|
|
"""
|
|
Add a task running background.
|
|
Another way of `self.add_task()`.
|
|
func: Callable or Generator
|
|
"""
|
|
if isinstance(func, Callable):
|
|
g = get_generator(func)
|
|
elif isinstance(func, Generator):
|
|
g = func
|
|
self.add_task(Task(g, delay), pending_delete=pending_delete)
|
|
|
|
def add_task(self, task: Task, pending_delete: bool = False) -> None:
|
|
"""
|
|
Add a task running background.
|
|
"""
|
|
if task in self.tasks:
|
|
logger.warning(f"Task {task} already in tasks list.")
|
|
return
|
|
logger.info(f"Add task {task}")
|
|
with self._lock:
|
|
self.tasks.append(task)
|
|
if pending_delete:
|
|
self.pending_remove_tasks.append(task)
|
|
|
|
def _remove_task(self, task: Task) -> None:
|
|
if task in self.tasks:
|
|
self.tasks.remove(task)
|
|
logger.info(f"Task {task} removed.")
|
|
else:
|
|
logger.warning(
|
|
f"Failed to remove task {task}. Current tasks list: {self.tasks}"
|
|
)
|
|
|
|
def remove_task(self, task: Task, nowait: bool = False) -> None:
|
|
"""
|
|
Remove a task in `self.tasks`.
|
|
Args:
|
|
task:
|
|
nowait: if True, remove it right now,
|
|
otherwise remove when call `self.remove_pending_task`
|
|
"""
|
|
if nowait:
|
|
with self._lock:
|
|
self._remove_task(task)
|
|
else:
|
|
self.pending_remove_tasks.append(task)
|
|
|
|
def remove_pending_task(self) -> None:
|
|
"""
|
|
Remove all pending remove tasks.
|
|
"""
|
|
with self._lock:
|
|
for task in self.pending_remove_tasks:
|
|
self._remove_task(task)
|
|
self.pending_remove_tasks = []
|
|
|
|
def remove_current_task(self) -> None:
|
|
self.remove_task(self._task, nowait=True)
|
|
|
|
def get_task(self, name) -> Task:
|
|
with self._lock:
|
|
for task in self.tasks:
|
|
if task.name == name:
|
|
return task
|
|
return None
|
|
|
|
def loop(self) -> None:
|
|
"""
|
|
Start task loop.
|
|
You **should** run this function in an individual thread.
|
|
"""
|
|
self._alive = True
|
|
while self._alive:
|
|
if self.tasks:
|
|
with self._lock:
|
|
self.tasks.sort(key=operator.attrgetter("next_run"))
|
|
task = self.tasks[0]
|
|
if task.next_run < time.time():
|
|
start_time = time.time()
|
|
try:
|
|
self._task = task
|
|
# logger.debug(f'Start task {task.g.__name__}')
|
|
task.send(self)
|
|
# logger.debug(f'End task {task.g.__name__}')
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
self.remove_task(task, nowait=True)
|
|
finally:
|
|
self._task = None
|
|
end_time = time.time()
|
|
task.next_run += task.delay
|
|
with self._lock:
|
|
for task in self.tasks:
|
|
task.next_run += end_time - start_time
|
|
else:
|
|
time.sleep(0.05)
|
|
else:
|
|
time.sleep(0.5)
|
|
logger.info("End of task handler loop")
|
|
|
|
def _get_thread(self) -> threading.Thread:
|
|
thread = threading.Thread(target=self.loop, daemon=True)
|
|
return thread
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
Start task handler.
|
|
"""
|
|
logger.info("Start task handler")
|
|
if self._thread is not None and self._thread.is_alive():
|
|
logger.warning("Task handler already running!")
|
|
return
|
|
self._thread = self._get_thread()
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self.remove_pending_task()
|
|
self._alive = False
|
|
self._thread.join(timeout=2)
|
|
if not self._thread.is_alive():
|
|
logger.info("Finish task handler")
|
|
else:
|
|
logger.warning("Task handler does not stop within 2 seconds")
|
|
|
|
|
|
class WebIOTaskHandler(TaskHandler):
|
|
def _get_thread(self) -> threading.Thread:
|
|
thread = super()._get_thread()
|
|
register_thread(thread)
|
|
return thread
|
|
|
|
|
|
class Switch:
|
|
def __init__(self, status, get_state, name=None):
|
|
"""
|
|
Args:
|
|
status
|
|
(dict):A dict describes each state.
|
|
{
|
|
0: {
|
|
'func': (Callable)
|
|
},
|
|
1: {
|
|
'func'
|
|
'args': (Optional, tuple)
|
|
'kwargs': (Optional, dict)
|
|
},
|
|
2: [
|
|
func1,
|
|
{
|
|
'func': func2
|
|
'args': args2
|
|
}
|
|
]
|
|
-1: []
|
|
}
|
|
(Callable):current state will pass into this function
|
|
lambda state: do_update(state=state)
|
|
get_state:
|
|
(Callable):
|
|
return current state
|
|
(Generator):
|
|
yield current state, do nothing when state not in status
|
|
name:
|
|
"""
|
|
self._lock = threading.Lock()
|
|
self.name = name
|
|
self.status = status
|
|
self.get_state = get_state
|
|
if isinstance(get_state, Generator):
|
|
self._generator = get_state
|
|
elif isinstance(get_state, Callable):
|
|
self._generator = self._get_state()
|
|
|
|
@staticmethod
|
|
def get_state():
|
|
pass
|
|
|
|
def _get_state(self):
|
|
"""
|
|
Predefined generator when `get_state` is an callable
|
|
Customize it if you have multiple criteria on state
|
|
"""
|
|
_status = self.get_state()
|
|
yield _status
|
|
while True:
|
|
status = self.get_state()
|
|
if _status != status:
|
|
_status = status
|
|
yield _status
|
|
continue
|
|
yield -1
|
|
|
|
def switch(self):
|
|
with self._lock:
|
|
r = next(self._generator)
|
|
if callable(self.status):
|
|
self.status(r)
|
|
elif r in self.status:
|
|
f = self.status[r]
|
|
if isinstance(f, (dict, Callable)):
|
|
f = [f]
|
|
for d in f:
|
|
if isinstance(d, Callable):
|
|
d = {"func": d}
|
|
func = d["func"]
|
|
args = d.get("args", tuple())
|
|
kwargs = d.get("kwargs", dict())
|
|
func(*args, **kwargs)
|
|
|
|
def g(self) -> Generator:
|
|
g = get_generator(self.switch)
|
|
if self.name:
|
|
name = self.name
|
|
else:
|
|
name = self.get_state.__name__
|
|
g.__name__ = f"Switch_{name}_refresh"
|
|
return g
|
|
|
|
|
|
def get_generator(func: Callable):
|
|
def _g():
|
|
yield
|
|
while True:
|
|
yield func()
|
|
|
|
g = _g()
|
|
g.__name__ = func.__name__
|
|
return g
|
|
|
|
|
|
def filepath_css(filename):
|
|
return f"./assets/gui/css/{filename}.css"
|
|
|
|
|
|
def filepath_icon(filename):
|
|
return f"./assets/gui/icon/{filename}.svg"
|
|
|
|
|
|
def add_css(filepath):
|
|
with open(filepath, "r") as f:
|
|
css = f.read().replace("\n", "")
|
|
run_js(f"""$('head').append('<style>{css}</style>')""")
|
|
|
|
|
|
def _read(path):
|
|
with open(path, "r") as f:
|
|
return f.read()
|
|
|
|
|
|
class Icon:
|
|
"""
|
|
Storage html of icon.
|
|
"""
|
|
|
|
ALAS = _read(filepath_icon("alas"))
|
|
SETTING = _read(filepath_icon("setting"))
|
|
RUN = _read(filepath_icon("run"))
|
|
DEVELOP = _read(filepath_icon("develop"))
|
|
ADD = _read(filepath_icon("add"))
|
|
|
|
|
|
str2type = {
|
|
"str": str,
|
|
"float": float,
|
|
"int": int,
|
|
"bool": bool,
|
|
"ignore": lambda x: x,
|
|
}
|
|
|
|
|
|
def parse_pin_value(val, valuetype: str = None):
|
|
"""
|
|
input, textarea return str
|
|
select return its option (str or int)
|
|
checkbox return [] or [True] (define in put_checkbox_)
|
|
"""
|
|
if isinstance(val, list):
|
|
if len(val) == 0:
|
|
return False
|
|
else:
|
|
return True
|
|
elif valuetype:
|
|
return str2type[valuetype](val)
|
|
elif isinstance(val, (int, float)):
|
|
return val
|
|
else:
|
|
try:
|
|
v = float(val)
|
|
except ValueError:
|
|
return val
|
|
if v.is_integer():
|
|
return int(v)
|
|
else:
|
|
return v
|
|
|
|
|
|
def login(password):
|
|
if get_localstorage("password") == str(password):
|
|
return True
|
|
pwd = input(label="Please login below.", type=PASSWORD, placeholder="PASSWORD")
|
|
if str(pwd) == str(password):
|
|
set_localstorage("password", str(pwd))
|
|
return True
|
|
else:
|
|
toast("Wrong password!", color="error")
|
|
return False
|
|
|
|
|
|
def get_window_visibility_state():
|
|
ret = eval_js("document.visibilityState")
|
|
return False if ret == "hidden" else True
|
|
|
|
|
|
# https://pywebio.readthedocs.io/zh_CN/latest/cookbook.html#cookie-and-localstorage-manipulation
|
|
def set_localstorage(key, value):
|
|
return run_js("localStorage.setItem(key, value)", key=key, value=value)
|
|
|
|
|
|
def get_localstorage(key):
|
|
return eval_js("localStorage.getItem(key)", key=key)
|
|
|
|
|
|
def re_fullmatch(pattern, string):
|
|
if pattern == "datetime":
|
|
pattern = RE_DATETIME
|
|
# elif:
|
|
return re.fullmatch(pattern=pattern, string=string)
|
|
|
|
|
|
def get_next_time(t: datetime.time):
|
|
now = datetime.datetime.today().time()
|
|
second = (
|
|
(t.hour - now.hour) * 3600
|
|
+ (t.minute - now.minute) * 60
|
|
+ (t.second - now.second)
|
|
)
|
|
if second < 0:
|
|
second += 86400
|
|
return second
|
|
|
|
|
|
def on_task_exception(self):
|
|
logger.exception("An internal error occurred in the application")
|
|
toast_msg = (
|
|
"应用发生内部错误"
|
|
if "zh" in session_info.user_language
|
|
else "An internal error occurred in the application"
|
|
)
|
|
|
|
e_type, e_value, e_tb = sys.exc_info()
|
|
lines = traceback.format_exception(e_type, e_value, e_tb)
|
|
traceback_msg = "".join(lines)
|
|
|
|
traceback_console = Console(
|
|
color_system="truecolor", tab_size=2, record=True, width=90
|
|
)
|
|
with traceback_console.capture(): # prevent logging to stdout again
|
|
traceback_console.print_exception(
|
|
word_wrap=True, extra_lines=1, show_locals=True
|
|
)
|
|
|
|
if State.theme == "dark":
|
|
theme = DARK_TERMINAL_THEME
|
|
else:
|
|
theme = LIGHT_TERMINAL_THEME
|
|
|
|
html = traceback_console.export_html(
|
|
theme=theme, code_format=TRACEBACK_CODE_FORMAT, inline_styles=True
|
|
)
|
|
try:
|
|
popup(title=toast_msg, content=put_html(html), size=PopupSize.LARGE)
|
|
run_js(
|
|
"console.error(traceback_msg)",
|
|
traceback_msg="Internal Server Error\n" + traceback_msg,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# Monkey patch
|
|
pywebio.session.base.Session.on_task_exception = on_task_exception
|
|
|
|
|
|
def raise_exception(x=3):
|
|
"""
|
|
For testing purpose
|
|
"""
|
|
if x > 0:
|
|
raise_exception(x - 1)
|
|
else:
|
|
raise Exception("quq")
|
|
|
|
|
|
def get_alas_config_listen_path(args):
|
|
for path, d in deep_iter(args, depth=3):
|
|
if d.get("display") in ["readonly", "hide"]:
|
|
continue
|
|
yield path
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
def gen(x):
|
|
n = 0
|
|
while True:
|
|
n += x
|
|
print(n)
|
|
yield n
|
|
|
|
th = TaskHandler()
|
|
th.start()
|
|
|
|
t1 = Task(gen(1), delay=1)
|
|
t2 = Task(gen(-2), delay=3)
|
|
|
|
th.add_task(t1)
|
|
th.add_task(t2)
|
|
|
|
time.sleep(5)
|
|
th.remove_task(t2, nowait=True)
|
|
time.sleep(5)
|
|
th.stop()
|