StarRailCopilot/module/webui/app.py

1351 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import argparse
import queue
import threading
import time
from datetime import datetime
from functools import partial
from typing import Dict, List, Optional
from pywebio import config as webconfig
from pywebio.output import (
Output,
clear,
close_popup,
popup,
put_button,
put_buttons,
put_collapse,
put_column,
put_error,
put_html,
put_link,
put_loading,
put_markdown,
put_row,
put_scope,
put_table,
put_text,
put_warning,
toast,
use_scope,
)
from pywebio.pin import pin, pin_on_change
from pywebio.session import go_app, info, local, register_thread, run_js, set_env
import module.webui.lang as lang
from module.config.config import AzurLaneConfig, Function
from module.config.utils import (
alas_instance,
alas_template,
deep_get,
deep_iter,
deep_set,
dict_to_kv,
filepath_args,
filepath_config,
read_file,
)
from module.logger import logger
from module.webui.base import Frame
from module.webui.fake import (
get_config_mod,
load_config,
)
from module.webui.fastapi import asgi_app
from module.webui.lang import _t, t
from module.webui.pin import put_input, put_select
from module.webui.process_manager import ProcessManager
from module.webui.remote_access import RemoteAccess
from module.webui.setting import State
from module.webui.updater import updater
from module.webui.utils import (
Icon,
Switch,
TaskHandler,
add_css,
filepath_css,
get_alas_config_listen_path,
get_localstorage,
get_window_visibility_state,
login,
parse_pin_value,
raise_exception,
re_fullmatch,
to_pin_value,
)
from module.webui.widgets import (
BinarySwitchButton,
RichLog,
T_Output_Kwargs,
put_icon_buttons,
put_loading_text,
put_none,
put_output,
)
task_handler = TaskHandler()
class AlasGUI(Frame):
ALAS_MENU: Dict[str, Dict[str, List[str]]]
ALAS_ARGS: Dict[str, Dict[str, Dict[str, Dict[str, str]]]]
ALAS_STORED: Dict[str, Dict[str, Dict[str, str]]]
theme = "default"
def initial(self) -> None:
self.ALAS_MENU = read_file(filepath_args("menu", self.alas_mod))
self.ALAS_ARGS = read_file(filepath_args("args", self.alas_mod))
self.ALAS_STORED = read_file(filepath_args("stored", self.alas_mod))
self._init_alas_config_watcher()
def __init__(self) -> None:
super().__init__()
# modified keys, return values of pin_wait_change()
self.modified_config_queue = queue.Queue()
# alas config name
self.alas_name = ""
self.alas_mod = "alas"
self.alas_config = AzurLaneConfig("template")
self.alas_config_hidden = set()
self.initial()
@use_scope("aside", clear=True)
def set_aside(self) -> None:
# TODO: update put_icon_buttons()
put_icon_buttons(
Icon.DEVELOP,
buttons=[
{"label": t("Gui.Aside.Home"), "value": "Home", "color": "aside"}
],
onclick=[self.ui_develop],
),
for name in alas_instance():
put_icon_buttons(
Icon.RUN,
buttons=[{"label": name, "value": name, "color": "aside"}],
onclick=self.ui_alas,
)
put_icon_buttons(
Icon.ADD,
buttons=[
{"label": t("Gui.Aside.AddAlas"), "value": "AddAlas", "color": "aside"}
],
onclick=[self.ui_add_alas],
),
@use_scope("header_status")
def set_status(self, state: int) -> None:
"""
Args:
state (int):
1 (running)
2 (not running)
3 (warning, stop unexpectedly)
4 (stop for update)
0 (hide)
-1 (*state not changed)
"""
if state == -1:
return
clear()
if state == 1:
put_loading_text(t("Gui.Status.Running"), color="success")
elif state == 2:
put_loading_text(t("Gui.Status.Inactive"), color="secondary", fill=True)
elif state == 3:
put_loading_text(t("Gui.Status.Warning"), shape="grow", color="warning")
elif state == 4:
put_loading_text(t("Gui.Status.Updating"), shape="grow", color="success")
@classmethod
def set_theme(cls, theme="default") -> None:
cls.theme = theme
State.deploy_config.Theme = theme
State.theme = theme
webconfig(theme=theme)
@use_scope("menu", clear=True)
def alas_set_menu(self) -> None:
"""
Set menu
"""
put_buttons(
[{
"label": t("Gui.MenuAlas.Overview"),
"value": "Overview",
"color": "menu",
}],
onclick=[self.alas_overview],
).style(f"--menu-Overview--")
for menu, task_data in self.ALAS_MENU.items():
if task_data.get("page") == "tool":
_onclick = self.alas_daemon_overview
else:
_onclick = self.alas_set_group
if task_data.get("menu") == "collapse":
task_btn_list = [
put_buttons(
[{
"label": t(f"Task.{task}.name"),
"value": task,
"color": "menu",
}],
onclick=_onclick,
).style(f"--menu-{task}--")
for task in task_data.get("tasks", [])
]
put_collapse(title=t(f"Menu.{menu}.name"), content=task_btn_list)
else:
title = t(f"Menu.{menu}.name")
put_html('<div class="hr-task-group-box">'
'<span class="hr-task-group-line"></span>'
f'<span class="hr-task-group-text">{title}</span>'
'<span class="hr-task-group-line"></span>'
'</div>'
)
for task in task_data.get("tasks", []):
put_buttons(
[{
"label": t(f"Task.{task}.name"),
"value": task,
"color": "menu",
}],
onclick=_onclick,
).style(f"--menu-{task}--").style(f"padding-left: 0.75rem")
self.alas_overview()
@use_scope("content", clear=True)
def alas_set_group(self, task: str) -> None:
"""
Set arg groups from dict
"""
self.init_menu(name=task)
self.set_title(t(f"Task.{task}.name"))
put_scope("_groups", [put_none(), put_scope("groups"), put_scope("navigator")])
task_help: str = t(f"Task.{task}.help")
if task_help:
put_scope(
"group__info",
scope="groups",
content=[put_text(task_help).style("font-size: 1rem")],
)
config = self.alas_config.read_file(self.alas_name)
self.alas_config_hidden = self.alas_config.get_hidden_args(config)
for group, arg_dict in deep_iter(self.ALAS_ARGS[task], depth=1):
if self.set_group(group, arg_dict, config, task):
self.set_navigator(group)
@use_scope("groups")
def set_group(self, group, arg_dict, config, task):
group_name = group[0]
output_list: List[Output] = []
for arg, arg_dict in deep_iter(arg_dict, depth=1):
output_kwargs: T_Output_Kwargs = arg_dict.copy()
# Skip hide
display: Optional[str] = output_kwargs.pop("display", None)
if display == "hide":
continue
# Disable
elif display == "disabled":
output_kwargs["disabled"] = True
# Output type
output_kwargs["widget_type"] = output_kwargs.pop("type")
arg_name = arg[0] # [arg_name,]
# Internal pin widget name
output_kwargs["name"] = f"{task}_{group_name}_{arg_name}"
# Display title
output_kwargs["title"] = t(f"{group_name}.{arg_name}.name")
# Get value from config
value = deep_get(
config, [task, group_name, arg_name], output_kwargs["value"]
)
# idk
value = str(value) if isinstance(value, datetime) else value
# Default value
output_kwargs["value"] = value
# Options
output_kwargs["options"] = options = output_kwargs.pop("option", [])
# Options label
options_label = []
for opt in options:
options_label.append(t(f"{group_name}.{arg_name}.{opt}"))
output_kwargs["options_label"] = options_label
# Help
arg_help = t(f"{group_name}.{arg_name}.help")
if arg_help == "" or not arg_help:
arg_help = None
output_kwargs["help"] = arg_help
# Invalid feedback
output_kwargs["invalid_feedback"] = t("Gui.Text.InvalidFeedBack", value)
o = put_output(output_kwargs)
if o is not None:
# output will inherit current scope when created, override here
o.spec["scope"] = f"#pywebio-scope-group_{group_name}"
# Add hidden-arg
if f"{task}.{group_name}.{arg_name}" in self.alas_config_hidden:
o.style("display:none")
output_list.append(o)
if not output_list:
return 0
with use_scope(f"group_{group_name}"):
put_text(t(f"{group_name}._info.name"))
group_help = t(f"{group_name}._info.help")
if group_help != "":
put_text(group_help)
put_html('<hr class="hr-group">')
for output in output_list:
output.show()
return len(output_list)
@use_scope("navigator")
def set_navigator(self, group):
js = f"""
$("#pywebio-scope-groups").scrollTop(
$("#pywebio-scope-group_{group[0]}").position().top
+ $("#pywebio-scope-groups").scrollTop() - 59
)
"""
put_button(
label=t(f"{group[0]}._info.name"),
onclick=lambda: run_js(js),
color="navigator",
)
def set_dashboard(self, arg, arg_dict, config):
i18n = arg_dict.get('i18n')
if i18n:
name = t(i18n)
else:
name = arg
color = arg_dict.get("color", "#777777")
nodata = t("Gui.Dashboard.NoData")
def set_value(dic):
if "total" in dic.get("attrs", []) and config.get("total") is not None:
return [
put_text(config.get("value", nodata)).style("--dashboard-value--"),
put_text(f' / {config.get("total", "")}').style("--dashboard-time--"),
]
elif "comment" in dic.get("attrs", []) and config.get("comment") is not None:
return [
put_text(config.get("value", nodata)).style("--dashboard-value--"),
put_text(f' {config.get("comment", "")}').style("--dashboard-time--"),
]
else:
return [
put_text(config.get("value", nodata)).style("--dashboard-value--"),
]
with use_scope(f"dashboard-row-{arg}", clear=True):
put_html(f'<div><div class="dashboard-icon" style="background-color:{color}"></div>'),
put_scope(f"dashboard-content-{arg}", [
put_scope(f"dashboard-value-{arg}", set_value(arg_dict)),
put_scope(f"dashboard-time-{arg}", [
put_text(f"{name} - {lang.readable_time(config.get('time', ''))}").style("--dashboard-time--"),
])
])
@use_scope("content", clear=True)
def alas_overview(self) -> None:
self.init_menu(name="Overview")
self.set_title(t(f"Gui.MenuAlas.Overview"))
put_scope("overview", [put_scope("schedulers"), put_scope("logs")])
with use_scope("schedulers"):
put_scope(
"scheduler-bar",
[
put_text(t("Gui.Overview.Scheduler")).style(
"font-size: 1.25rem; margin: auto .5rem auto;"
),
put_scope("scheduler_btn"),
],
)
put_scope(
"running",
[
put_text(t("Gui.Overview.Running")),
put_html('<hr class="hr-group">'),
put_scope("running_tasks"),
],
)
put_scope(
"pending",
[
put_text(t("Gui.Overview.Pending")),
put_html('<hr class="hr-group">'),
put_scope("pending_tasks"),
],
)
put_scope(
"waiting",
[
put_text(t("Gui.Overview.Waiting")),
put_html('<hr class="hr-group">'),
put_scope("waiting_tasks"),
],
)
switch_scheduler = BinarySwitchButton(
label_on=t("Gui.Button.Stop"),
label_off=t("Gui.Button.Start"),
onclick_on=lambda: self.alas.stop(),
onclick_off=lambda: self.alas.start(None, updater.event),
get_state=lambda: self.alas.alive,
color_on="off",
color_off="on",
scope="scheduler_btn",
)
log = RichLog("log")
with use_scope("logs"):
put_scope("log-bar", [
put_scope("log-title", [
put_text(t("Gui.Overview.Log")).style("font-size: 1.25rem; margin: auto .5rem auto;"),
put_scope("log-title-btns", [
put_scope("log_scroll_btn"),
]),
]),
put_html('<hr class="hr-group">'),
put_scope("dashboard", [
# Empty dashboard, values will be updated in alas_update_overview_task()
put_scope(f"dashboard-row-{arg}", [])
for arg in self.ALAS_STORED.keys() if deep_get(self.ALAS_STORED, keys=[arg, "order"], default=0)
# Empty content to left-align last row
] + [put_html("<i></i>")] * min(len(self.ALAS_STORED), 4))
])
put_scope("log", [put_html("")])
log.console.width = log.get_width()
switch_log_scroll = BinarySwitchButton(
label_on=t("Gui.Button.ScrollON"),
label_off=t("Gui.Button.ScrollOFF"),
onclick_on=lambda: log.set_scroll(False),
onclick_off=lambda: log.set_scroll(True),
get_state=lambda: log.keep_bottom,
color_on="on",
color_off="off",
scope="log_scroll_btn",
)
self.task_handler.add(switch_scheduler.g(), 1, True)
self.task_handler.add(switch_log_scroll.g(), 1, True)
self.task_handler.add(self.alas_update_overview_task, 10, True)
self.task_handler.add(log.put_log(self.alas), 0.25, True)
def _init_alas_config_watcher(self) -> None:
def put_queue(path, value):
self.modified_config_queue.put({"name": path, "value": value})
for path in get_alas_config_listen_path(self.ALAS_ARGS):
pin_on_change(
name="_".join(path), onchange=partial(put_queue, ".".join(path))
)
logger.info("Init config watcher done.")
def _alas_thread_update_config(self) -> None:
modified = {}
while self.alive:
try:
d = self.modified_config_queue.get(timeout=10)
config_name = self.alas_name
config_updater = self.alas_config
except queue.Empty:
continue
modified[d["name"]] = d["value"]
while True:
try:
d = self.modified_config_queue.get(timeout=1)
modified[d["name"]] = d["value"]
except queue.Empty:
self._save_config(modified, config_name, config_updater)
modified.clear()
break
def _save_config(
self,
modified: Dict[str, str],
config_name: str,
config_updater: AzurLaneConfig = State.config_updater,
) -> None:
try:
valid = []
invalid = []
config = config_updater.read_file(config_name)
for k, v in modified.copy().items():
valuetype = deep_get(self.ALAS_ARGS, k + ".valuetype")
v = parse_pin_value(v, valuetype)
validate = deep_get(self.ALAS_ARGS, k + ".validate")
if not len(str(v)):
default = deep_get(self.ALAS_ARGS, k + ".value")
modified[k] = default
deep_set(config, k, default)
valid.append(k)
pin["_".join(k.split("."))] = default
elif not validate or re_fullmatch(validate, v):
deep_set(config, k, v)
modified[k] = v
valid.append(k)
for set_key, set_value in config_updater.save_callback(k, v):
modified[set_key] = set_value
deep_set(config, set_key, set_value)
valid.append(set_key)
pin["_".join(set_key.split("."))] = to_pin_value(set_value)
else:
modified.pop(k)
invalid.append(k)
logger.warning(f"Invalid value {v} for key {k}, skip saving.")
self.pin_remove_invalid_mark(valid)
self.pin_set_invalid_mark(invalid)
new_hidden_args = config_updater.get_hidden_args(config)
for k in new_hidden_args - self.alas_config_hidden:
self.pin_set_hidden_arg(k, type_=deep_get(self.ALAS_ARGS, f"{k}.type"))
for k in self.alas_config_hidden - new_hidden_args:
self.pin_remove_hidden_arg(k, type_=deep_get(self.ALAS_ARGS, f"{k}.type"))
self.alas_config_hidden = new_hidden_args
if modified:
toast(
t("Gui.Toast.ConfigSaved"),
duration=1,
position="right",
color="success",
)
logger.info(
f"Save config {filepath_config(config_name)}, {dict_to_kv(modified)}"
)
config_updater.write_file(config_name, config)
except Exception as e:
logger.exception(e)
def alas_update_overview_task(self) -> None:
if not self.visible:
return
self.alas_config.load()
self.alas_config.get_next_task()
alive = self.alas.alive
if len(self.alas_config.pending_task) >= 1:
if self.alas.alive:
running = self.alas_config.pending_task[:1]
pending = self.alas_config.pending_task[1:]
else:
running = []
pending = self.alas_config.pending_task[:]
else:
running = []
pending = []
waiting = self.alas_config.waiting_task
def put_task(func: Function):
with use_scope(f"overview-task_{func.command}"):
put_column(
[
put_text(t(f"Task.{func.command}.name")).style("--arg-title--"),
put_text(str(func.next_run)).style("--arg-help--"),
],
size="auto auto",
)
put_button(
label=t("Gui.Button.Setting"),
onclick=lambda: self.alas_set_group(func.command),
color="off",
)
if self.scope_expired_then_add("pending_task", [
alive,
self.alas_config.pending_task
]):
clear("running_tasks")
clear("pending_tasks")
clear("waiting_tasks")
with use_scope("running_tasks"):
if running:
for task in running:
put_task(task)
else:
put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--")
with use_scope("pending_tasks"):
if pending:
for task in pending:
put_task(task)
else:
put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--")
with use_scope("waiting_tasks"):
if waiting:
for task in waiting:
put_task(task)
else:
put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--")
for arg, arg_dict in self.ALAS_STORED.items():
# Skip order=0
if not arg_dict.get("order", 0):
continue
path = arg_dict["path"]
if self.scope_expired_then_add(f"dashboard-time-value-{arg}", [
deep_get(self.alas_config.data, keys=f"{path}.value"),
lang.readable_time(deep_get(self.alas_config.data, keys=f"{path}.time")),
]):
self.set_dashboard(arg, arg_dict, deep_get(self.alas_config.data, keys=path, default={}))
@use_scope("content", clear=True)
def alas_daemon_overview(self, task: str) -> None:
self.init_menu(name=task)
self.set_title(t(f"Task.{task}.name"))
log = RichLog("log")
if self.is_mobile:
put_scope(
"daemon-overview",
[
put_scope("scheduler-bar"),
put_scope("groups"),
put_scope("daemon-log-bar"),
put_scope("log", [put_html("")]),
],
)
else:
put_scope(
"daemon-overview",
[
put_none(),
put_scope(
"_daemon",
[
put_scope(
"_daemon_upper",
[put_scope("scheduler-bar"), put_scope("daemon-log-bar")],
),
put_scope("groups"),
put_scope("log", [put_html("")]),
],
),
put_none(),
],
)
log.console.width = log.get_width()
with use_scope("scheduler-bar"):
put_text(t("Gui.Overview.Scheduler")).style(
"font-size: 1.25rem; margin: auto .5rem auto;"
)
put_scope("scheduler_btn")
switch_scheduler = BinarySwitchButton(
label_on=t("Gui.Button.Stop"),
label_off=t("Gui.Button.Start"),
onclick_on=lambda: self.alas.stop(),
onclick_off=lambda: self.alas.start(task),
get_state=lambda: self.alas.alive,
color_on="off",
color_off="on",
scope="scheduler_btn",
)
with use_scope("daemon-log-bar"):
with use_scope("log-title"):
put_text(t("Gui.Overview.Log")).style(
"font-size: 1.25rem; margin: auto .5rem auto;"
)
put_scope(
"log-bar-btns",
[
put_scope("log_scroll_btn"),
],
)
switch_log_scroll = BinarySwitchButton(
label_on=t("Gui.Button.ScrollON"),
label_off=t("Gui.Button.ScrollOFF"),
onclick_on=lambda: log.set_scroll(False),
onclick_off=lambda: log.set_scroll(True),
get_state=lambda: log.keep_bottom,
color_on="on",
color_off="off",
scope="log_scroll_btn",
)
config = self.alas_config.read_file(self.alas_name)
for group, arg_dict in deep_iter(self.ALAS_ARGS[task], depth=1):
if group[0] == "Storage":
continue
self.set_group(group, arg_dict, config, task)
run_js("""
$("#pywebio-scope-log").css(
"grid-row-start",
-2 - $("#pywebio-scope-_daemon").children().filter(
function(){
return $(this).css("display") === "none";
}
).length
);
$("#pywebio-scope-log").css(
"grid-row-end",
-1
);
""")
self.task_handler.add(switch_scheduler.g(), 1, True)
self.task_handler.add(switch_log_scroll.g(), 1, True)
self.task_handler.add(log.put_log(self.alas), 0.25, True)
@use_scope("menu", clear=True)
def dev_set_menu(self) -> None:
self.init_menu(collapse_menu=False, name="Develop")
put_button(
label=t("Gui.MenuDevelop.HomePage"),
onclick=self.show,
color="menu",
).style(f"--menu-HomePage--")
# put_button(
# label=t("Gui.MenuDevelop.Translate"),
# onclick=self.dev_translate,
# color="menu",
# ).style(f"--menu-Translate--")
put_button(
label=t("Gui.MenuDevelop.Update"),
onclick=self.dev_update,
color="menu",
).style(f"--menu-Update--")
put_button(
label=t("Gui.MenuDevelop.Remote"),
onclick=self.dev_remote,
color="menu",
).style(f"--menu-Remote--")
put_button(
label=t("Gui.MenuDevelop.Utils"),
onclick=self.dev_utils,
color="menu",
).style(f"--menu-Utils--")
def dev_translate(self) -> None:
go_app("translate", new_window=True)
lang.TRANSLATE_MODE = True
self.show()
@use_scope("content", clear=True)
def dev_update(self) -> None:
self.init_menu(name="Update")
self.set_title(t("Gui.MenuDevelop.Update"))
if State.restart_event is None:
put_warning(t("Gui.Update.DisabledWarn"))
put_row(
content=[put_scope("updater_loading"), None, put_scope("updater_state")],
size="auto .25rem 1fr",
)
put_scope("updater_btn")
put_scope("updater_info")
def update_table():
with use_scope("updater_info", clear=True):
local_commit = updater.get_commit(short_sha1=True)
upstream_commit = updater.get_commit(
f"origin/{updater.Branch}", short_sha1=True
)
put_table(
[
[t("Gui.Update.Local"), *local_commit],
[t("Gui.Update.Upstream"), *upstream_commit],
],
header=[
"",
"SHA1",
t("Gui.Update.Author"),
t("Gui.Update.Time"),
t("Gui.Update.Message"),
],
)
with use_scope("updater_detail", clear=True):
put_text(t("Gui.Update.DetailedHistory"))
history = updater.get_commit(
f"origin/{updater.Branch}", n=20, short_sha1=True
)
put_table(
[commit for commit in history],
header=[
"SHA1",
t("Gui.Update.Author"),
t("Gui.Update.Time"),
t("Gui.Update.Message"),
],
)
def u(state):
if state == -1:
return
clear("updater_loading")
clear("updater_state")
clear("updater_btn")
if state == 0:
put_loading("border", "secondary", "updater_loading").style(
"--loading-border-fill--"
)
put_text(t("Gui.Update.UpToDate"), scope="updater_state")
put_button(
t("Gui.Button.CheckUpdate"),
onclick=updater.check_update,
color="info",
scope="updater_btn",
)
update_table()
elif state == 1:
put_loading("grow", "success", "updater_loading").style(
"--loading-grow--"
)
put_text(t("Gui.Update.HaveUpdate"), scope="updater_state")
put_button(
t("Gui.Button.ClickToUpdate"),
onclick=updater.run_update,
color="success",
scope="updater_btn",
)
update_table()
elif state == "checking":
put_loading("border", "primary", "updater_loading").style(
"--loading-border--"
)
put_text(t("Gui.Update.UpdateChecking"), scope="updater_state")
elif state == "failed":
put_loading("grow", "danger", "updater_loading").style(
"--loading-grow--"
)
put_text(t("Gui.Update.UpdateFailed"), scope="updater_state")
put_button(
t("Gui.Button.RetryUpdate"),
onclick=updater.run_update,
color="primary",
scope="updater_btn",
)
elif state == "start":
put_loading("border", "primary", "updater_loading").style(
"--loading-border--"
)
put_text(t("Gui.Update.UpdateStart"), scope="updater_state")
put_button(
t("Gui.Button.CancelUpdate"),
onclick=updater.cancel,
color="danger",
scope="updater_btn",
)
elif state == "wait":
put_loading("border", "primary", "updater_loading").style(
"--loading-border--"
)
put_text(t("Gui.Update.UpdateWait"), scope="updater_state")
put_button(
t("Gui.Button.CancelUpdate"),
onclick=updater.cancel,
color="danger",
scope="updater_btn",
)
elif state == "run update":
put_loading("border", "primary", "updater_loading").style(
"--loading-border--"
)
put_text(t("Gui.Update.UpdateRun"), scope="updater_state")
put_button(
t("Gui.Button.CancelUpdate"),
onclick=updater.cancel,
color="danger",
scope="updater_btn",
disabled=True,
)
elif state == "reload":
put_loading("grow", "success", "updater_loading").style(
"--loading-grow--"
)
put_text(t("Gui.Update.UpdateSuccess"), scope="updater_state")
update_table()
elif state == "finish":
put_loading("grow", "success", "updater_loading").style(
"--loading-grow--"
)
put_text(t("Gui.Update.UpdateFinish"), scope="updater_state")
update_table()
elif state == "cancel":
put_loading("border", "danger", "updater_loading").style(
"--loading-border--"
)
put_text(t("Gui.Update.UpdateCancel"), scope="updater_state")
put_button(
t("Gui.Button.CancelUpdate"),
onclick=updater.cancel,
color="danger",
scope="updater_btn",
disabled=True,
)
else:
put_text(
"Something went wrong, please contact develops",
scope="updater_state",
)
put_text(f"state: {state}", scope="updater_state")
updater_switch = Switch(
status=u, get_state=lambda: updater.state, name="updater"
)
update_table()
self.task_handler.add(updater_switch.g(), delay=0.5, pending_delete=True)
updater.check_update()
@use_scope("content", clear=True)
def dev_utils(self) -> None:
self.init_menu(name="Utils")
self.set_title(t("Gui.MenuDevelop.Utils"))
put_button(label="Raise exception", onclick=raise_exception)
def _force_restart():
if State.restart_event is not None:
toast("Alas will restart in 3 seconds", duration=0, color="error")
clearup()
State.restart_event.set()
else:
toast("Reload not enabled", color="error")
put_button(label="Force restart", onclick=_force_restart)
@use_scope("content", clear=True)
def dev_remote(self) -> None:
self.init_menu(name="Remote")
self.set_title(t("Gui.MenuDevelop.Remote"))
put_row(
content=[put_scope("remote_loading"), None, put_scope("remote_state")],
size="auto .25rem 1fr",
)
put_scope("remote_info")
def u(state):
if state == -1:
return
clear("remote_loading")
clear("remote_state")
clear("remote_info")
if state in (1, 2):
put_loading("grow", "success", "remote_loading").style(
"--loading-grow--"
)
put_text(t("Gui.Remote.Running"), scope="remote_state")
put_text(t("Gui.Remote.EntryPoint"), scope="remote_info")
entrypoint = RemoteAccess.get_entry_point()
if entrypoint:
if State.electron: # Prevent click into url in electron client
put_text(entrypoint, scope="remote_info").style(
"text-decoration-line: underline"
)
else:
put_link(name=entrypoint, url=entrypoint, scope="remote_info")
else:
put_text("Loading...", scope="remote_info")
elif state in (0, 3):
put_loading("border", "secondary", "remote_loading").style(
"--loading-border-fill--"
)
if (
State.deploy_config.EnableRemoteAccess
and State.deploy_config.Password
):
put_text(t("Gui.Remote.NotRunning"), scope="remote_state")
else:
put_text(t("Gui.Remote.NotEnable"), scope="remote_state")
put_text(t("Gui.Remote.ConfigureHint"), scope="remote_info")
url = "http://app.azurlane.cloud" + (
"" if State.deploy_config.Language.startswith("zh") else "/en.html"
)
put_html(
f'<a href="{url}" target="_blank">{url}</a>', scope="remote_info"
)
if state == 3:
put_warning(
t("Gui.Remote.SSHNotInstall"),
closable=False,
scope="remote_info",
)
remote_switch = Switch(
status=u, get_state=RemoteAccess.get_state, name="remote"
)
self.task_handler.add(remote_switch.g(), delay=1, pending_delete=True)
def ui_develop(self) -> None:
if not self.is_mobile:
self.show()
return
self.init_aside(name="Home")
self.set_title(t("Gui.Aside.Home"))
self.dev_set_menu()
self.alas_name = ""
if hasattr(self, "alas"):
del self.alas
self.state_switch.switch()
def ui_alas(self, config_name: str) -> None:
if config_name == self.alas_name:
self.expand_menu()
return
self.init_aside(name=config_name)
clear("content")
self.alas_name = config_name
self.alas_mod = get_config_mod(config_name)
self.alas = ProcessManager.get_manager(config_name)
self.alas_config = load_config(config_name)
self.state_switch.switch()
self.initial()
self.alas_set_menu()
def ui_add_alas(self) -> None:
with popup(t("Gui.AddAlas.PopupTitle")) as s:
def get_unused_name():
all_name = alas_instance()
for i in range(2, 100):
if f"src{i}" not in all_name:
return f"src{i}"
else:
return ""
def add():
name = pin["AddAlas_name"]
origin = pin["AddAlas_copyfrom"]
if name in alas_instance():
err = "Gui.AddAlas.FileExist"
elif set(name) & set(".\\/:*?\"'<>|"):
err = "Gui.AddAlas.InvalidChar"
elif name.lower().startswith("template"):
err = "Gui.AddAlas.InvalidPrefixTemplate"
else:
err = ""
if err:
clear(s)
put(name, origin)
put_error(t(err), scope=s)
return
r = load_config(origin).read_file(origin)
State.config_updater.write_file(name, r, get_config_mod(origin))
self.set_aside()
self.active_button("aside", self.alas_name)
close_popup()
def put(name=None, origin=None):
put_input(
name="AddAlas_name",
label=t("Gui.AddAlas.NewName"),
value=name or get_unused_name(),
scope=s,
),
put_select(
name="AddAlas_copyfrom",
label=t("Gui.AddAlas.CopyFrom"),
options=alas_template() + alas_instance(),
value=origin or "template-src",
scope=s,
),
put_button(label=t("Gui.AddAlas.Confirm"), onclick=add, scope=s)
put()
def show(self) -> None:
self._show()
self.set_aside()
self.init_aside(name="Home")
self.dev_set_menu()
self.init_menu(name="HomePage")
self.alas_name = ""
if hasattr(self, "alas"):
del self.alas
self.set_status(0)
def set_language(l):
lang.set_language(l)
self.show()
def set_theme(t):
self.set_theme(t)
run_js("location.reload()")
with use_scope("content"):
put_text("Select your language / 选择语言").style("text-align: center")
put_buttons(
[
{"label": "简体中文", "value": "zh-CN"},
{"label": "繁體中文", "value": "zh-TW"},
{"label": "English", "value": "en-US"},
{"label": "日本語", "value": "ja-JP"},
{"label": "Español", "value": "es-ES"},
],
onclick=lambda l: set_language(l),
).style("text-align: center")
put_text("Change theme / 更改主题").style("text-align: center")
put_buttons(
[
{"label": "Light", "value": "default", "color": "light"},
{"label": "Dark", "value": "dark", "color": "dark"},
],
onclick=lambda t: set_theme(t),
).style("text-align: center")
# show something
put_markdown(
"""
SRC is a free open source software, if you paid for SRC from any channel, please refund.
SRC 是一款免费开源软件如果你在任何渠道付费购买了SRC请退款。
Project repository 项目地址:`https://github.com/LmeSzinc/StarRailCopilot`
"""
).style("text-align: center")
if lang.TRANSLATE_MODE:
lang.reload()
def _disable():
lang.TRANSLATE_MODE = False
self.show()
toast(
_t("Gui.Toast.DisableTranslateMode"),
duration=0,
position="right",
onclick=_disable,
)
def run(self) -> None:
# setup gui
set_env(title="SRC", output_animation=False)
add_css(filepath_css("alas"))
if self.is_mobile:
add_css(filepath_css("alas-mobile"))
else:
add_css(filepath_css("alas-pc"))
if self.theme == "dark":
add_css(filepath_css("dark-alas"))
else:
add_css(filepath_css("light-alas"))
# Auto refresh when lost connection
# [For develop] Disable by run `reload=0` in console
run_js(
"""
reload = 1;
WebIO._state.CurrentSession.on_session_close(
()=>{
setTimeout(
()=>{
if (reload == 1){
location.reload();
}
}, 4000
)
}
);
"""
)
aside = get_localstorage("aside")
self.show()
# init config watcher
self._init_alas_config_watcher()
# save config
_thread_save_config = threading.Thread(target=self._alas_thread_update_config)
register_thread(_thread_save_config)
_thread_save_config.start()
visibility_state_switch = Switch(
status={
True: [
lambda: self.__setattr__("visible", True),
lambda: self.alas_update_overview_task()
if self.page == "Overview"
else 0,
lambda: self.task_handler._task.__setattr__("delay", 15),
],
False: [
lambda: self.__setattr__("visible", False),
lambda: self.task_handler._task.__setattr__("delay", 1),
],
},
get_state=get_window_visibility_state,
name="visibility_state",
)
self.state_switch = Switch(
status=self.set_status,
get_state=lambda: getattr(getattr(self, "alas", -1), "state", 0),
name="state",
)
def goto_update():
self.ui_develop()
self.dev_update()
update_switch = Switch(
status={
1: lambda: toast(
t("Gui.Toast.ClickToUpdate"),
duration=0,
position="right",
color="success",
onclick=goto_update,
)
},
get_state=lambda: updater.state,
name="update_state",
)
self.task_handler.add(self.state_switch.g(), 2)
self.task_handler.add(visibility_state_switch.g(), 15)
self.task_handler.add(update_switch.g(), 1)
self.task_handler.start()
# Return to previous page
if aside not in ["Home", None]:
self.ui_alas(aside)
def debug():
"""For interactive python.
$ python3
>>> from module.webui.app import *
>>> debug()
>>>
"""
startup()
AlasGUI().run()
def startup():
State.init()
lang.reload()
updater.event = State.manager.Event()
if updater.delay > 0:
task_handler.add(updater.check_update, updater.delay)
task_handler.add(updater.schedule_update(), 86400)
task_handler.start()
# if State.deploy_config.DiscordRichPresence:
# init_discord_rpc()
# if State.deploy_config.StartOcrServer:
# start_ocr_server_process(State.deploy_config.OcrServerPort)
if (
State.deploy_config.EnableRemoteAccess
and State.deploy_config.Password is not None
):
task_handler.add(RemoteAccess.keep_ssh_alive(), 60)
def clearup():
"""
Notice: Ensure run it before uvicorn reload app,
all process will NOT EXIT after close electron app.
"""
logger.info("Start clearup")
RemoteAccess.kill_ssh_process()
# close_discord_rpc()
# stop_ocr_server_process()
for alas in ProcessManager._processes.values():
alas.stop()
State.clearup()
task_handler.stop()
logger.info("Alas closed.")
def app():
parser = argparse.ArgumentParser(description="Alas web service")
parser.add_argument(
"-k", "--key", type=str, help="Password of alas. No password by default"
)
parser.add_argument(
"--cdn",
action="store_true",
help="Use jsdelivr cdn for pywebio static files (css, js). Self host cdn by default.",
)
parser.add_argument(
"--run",
nargs="+",
type=str,
help="Run alas by config names on startup",
)
args, _ = parser.parse_known_args()
# Apply config
AlasGUI.set_theme(theme=State.deploy_config.Theme)
lang.LANG = State.deploy_config.Language
key = args.key or State.deploy_config.Password
cdn = args.cdn if args.cdn else State.deploy_config.CDN
runs = None
if args.run:
runs = args.run
elif State.deploy_config.Run:
# TODO: refactor poor_yaml_read() to support list
tmp = State.deploy_config.Run.split(",")
runs = [l.strip(" ['\"]") for l in tmp if len(l)]
instances: List[str] = runs
logger.hr("Webui configs")
logger.attr("Theme", State.deploy_config.Theme)
logger.attr("Language", lang.LANG)
logger.attr("Password", True if key else False)
logger.attr("CDN", cdn)
def index():
if key is not None and not login(key):
logger.warning(f"{info.user_ip} login failed.")
time.sleep(1.5)
run_js("location.reload();")
return
gui = AlasGUI()
local.gui = gui
gui.run()
app = asgi_app(
applications=[index],
cdn=cdn,
static_dir=None,
debug=True,
on_startup=[
startup,
lambda: ProcessManager.restart_processes(
instances=instances, ev=updater.event
),
],
on_shutdown=[clearup],
)
return app