import argparse import queue import threading import time from datetime import datetime from functools import partial from typing import Dict, List, Optional from pywebio import config as webconfig from pywebio.output import ( Output, clear, close_popup, popup, put_button, put_buttons, put_collapse, put_column, put_error, put_html, put_link, put_loading, put_markdown, put_row, put_scope, put_table, put_text, put_warning, toast, use_scope, ) from pywebio.pin import pin, pin_on_change from pywebio.session import go_app, info, local, register_thread, run_js, set_env import module.webui.lang as lang from module.config.config import AzurLaneConfig, Function from module.config.utils import ( alas_instance, alas_template, deep_get, deep_iter, deep_set, dict_to_kv, filepath_args, filepath_config, read_file, ) from module.logger import logger from module.webui.base import Frame from module.webui.fake import ( get_config_mod, load_config, ) from module.webui.fastapi import asgi_app from module.webui.lang import _t, t from module.webui.pin import put_input, put_select from module.webui.process_manager import ProcessManager from module.webui.remote_access import RemoteAccess from module.webui.setting import State from module.webui.updater import updater from module.webui.utils import ( Icon, Switch, TaskHandler, add_css, filepath_css, get_alas_config_listen_path, get_localstorage, get_window_visibility_state, login, parse_pin_value, raise_exception, re_fullmatch, ) from module.webui.widgets import ( BinarySwitchButton, RichLog, T_Output_Kwargs, put_icon_buttons, put_loading_text, put_none, put_output, ) task_handler = TaskHandler() class AlasGUI(Frame): ALAS_MENU: Dict[str, Dict[str, List[str]]] ALAS_ARGS: Dict[str, Dict[str, Dict[str, Dict[str, str]]]] ALAS_STORED: Dict[str, Dict[str, Dict[str, str]]] theme = "default" def initial(self) -> None: self.ALAS_MENU = read_file(filepath_args("menu", self.alas_mod)) self.ALAS_ARGS = read_file(filepath_args("args", self.alas_mod)) self.ALAS_STORED = read_file(filepath_args("stored", self.alas_mod)) self._init_alas_config_watcher() def __init__(self) -> None: super().__init__() # modified keys, return values of pin_wait_change() self.modified_config_queue = queue.Queue() # alas config name self.alas_name = "" self.alas_mod = "alas" self.alas_config = AzurLaneConfig("template") self.initial() @use_scope("aside", clear=True) def set_aside(self) -> None: # TODO: update put_icon_buttons() put_icon_buttons( Icon.DEVELOP, buttons=[ {"label": t("Gui.Aside.Home"), "value": "Home", "color": "aside"} ], onclick=[self.ui_develop], ), for name in alas_instance(): put_icon_buttons( Icon.RUN, buttons=[{"label": name, "value": name, "color": "aside"}], onclick=self.ui_alas, ) put_icon_buttons( Icon.ADD, buttons=[ {"label": t("Gui.Aside.AddAlas"), "value": "AddAlas", "color": "aside"} ], onclick=[self.ui_add_alas], ), @use_scope("header_status") def set_status(self, state: int) -> None: """ Args: state (int): 1 (running) 2 (not running) 3 (warning, stop unexpectedly) 4 (stop for update) 0 (hide) -1 (*state not changed) """ if state == -1: return clear() if state == 1: put_loading_text(t("Gui.Status.Running"), color="success") elif state == 2: put_loading_text(t("Gui.Status.Inactive"), color="secondary", fill=True) elif state == 3: put_loading_text(t("Gui.Status.Warning"), shape="grow", color="warning") elif state == 4: put_loading_text(t("Gui.Status.Updating"), shape="grow", color="success") @classmethod def set_theme(cls, theme="default") -> None: cls.theme = theme State.deploy_config.Theme = theme State.theme = theme webconfig(theme=theme) @use_scope("menu", clear=True) def alas_set_menu(self) -> None: """ Set menu """ put_buttons( [{ "label": t("Gui.MenuAlas.Overview"), "value": "Overview", "color": "menu", }], onclick=[self.alas_overview], ).style(f"--menu-Overview--") for menu, task_data in self.ALAS_MENU.items(): if task_data.get("page") == "tool": _onclick = self.alas_daemon_overview else: _onclick = self.alas_set_group if task_data.get("menu") == "collapse": task_btn_list = [ put_buttons( [{ "label": t(f"Task.{task}.name"), "value": task, "color": "menu", }], onclick=_onclick, ).style(f"--menu-{task}--") for task in task_data.get("tasks", []) ] put_collapse(title=t(f"Menu.{menu}.name"), content=task_btn_list) else: title = t(f"Menu.{menu}.name") put_html('
' '' f'{title}' '' '
' ) for task in task_data.get("tasks", []): put_buttons( [{ "label": t(f"Task.{task}.name"), "value": task, "color": "menu", }], onclick=_onclick, ).style(f"--menu-{task}--").style(f"padding-left: 0.75rem") self.alas_overview() @use_scope("content", clear=True) def alas_set_group(self, task: str) -> None: """ Set arg groups from dict """ self.init_menu(name=task) self.set_title(t(f"Task.{task}.name")) put_scope("_groups", [put_none(), put_scope("groups"), put_scope("navigator")]) task_help: str = t(f"Task.{task}.help") if task_help: put_scope( "group__info", scope="groups", content=[put_text(task_help).style("font-size: 1rem")], ) config = self.alas_config.read_file(self.alas_name) for group, arg_dict in deep_iter(self.ALAS_ARGS[task], depth=1): if self.set_group(group, arg_dict, config, task): self.set_navigator(group) @use_scope("groups") def set_group(self, group, arg_dict, config, task): group_name = group[0] output_list: List[Output] = [] for arg, arg_dict in deep_iter(arg_dict, depth=1): output_kwargs: T_Output_Kwargs = arg_dict.copy() # Skip hide display: Optional[str] = output_kwargs.pop("display", None) if display == "hide": continue # Disable elif display == "disabled": output_kwargs["disabled"] = True # Output type output_kwargs["widget_type"] = output_kwargs.pop("type") arg_name = arg[0] # [arg_name,] # Internal pin widget name output_kwargs["name"] = f"{task}_{group_name}_{arg_name}" # Display title output_kwargs["title"] = t(f"{group_name}.{arg_name}.name") # Get value from config value = deep_get( config, [task, group_name, arg_name], output_kwargs["value"] ) # idk value = str(value) if isinstance(value, datetime) else value # Default value output_kwargs["value"] = value # Options output_kwargs["options"] = options = output_kwargs.pop("option", []) # Options label options_label = [] for opt in options: options_label.append(t(f"{group_name}.{arg_name}.{opt}")) output_kwargs["options_label"] = options_label # Help arg_help = t(f"{group_name}.{arg_name}.help") if arg_help == "" or not arg_help: arg_help = None output_kwargs["help"] = arg_help # Invalid feedback output_kwargs["invalid_feedback"] = t("Gui.Text.InvalidFeedBack", value) o = put_output(output_kwargs) if o is not None: # output will inherit current scope when created, override here o.spec["scope"] = f"#pywebio-scope-group_{group_name}" output_list.append(o) if not output_list: return 0 with use_scope(f"group_{group_name}"): put_text(t(f"{group_name}._info.name")) group_help = t(f"{group_name}._info.help") if group_help != "": put_text(group_help) put_html('
') for output in output_list: output.show() return len(output_list) @use_scope("navigator") def set_navigator(self, group): js = f""" $("#pywebio-scope-groups").scrollTop( $("#pywebio-scope-group_{group[0]}").position().top + $("#pywebio-scope-groups").scrollTop() - 59 ) """ put_button( label=t(f"{group[0]}._info.name"), onclick=lambda: run_js(js), color="navigator", ) def set_dashboard(self, arg, arg_dict, config): i18n = arg_dict.get('i18n') if i18n: name = t(i18n) else: name = arg color = arg_dict.get("color", "#777777") nodata = t("Gui.Dashboard.NoData") def set_value(dic): if "total" in dic.get("attrs", []) and config.get("total") is not None: return [ put_text(config.get("value", nodata)).style("--dashboard-value--"), put_text(f' / {config.get("total", "")}').style("--dashboard-time--"), ] else: return [ put_text(config.get("value", nodata)).style("--dashboard-value--"), ] with use_scope(f"dashboard-row-{arg}", clear=True): put_html(f'
'), put_scope(f"dashboard-content-{arg}", [ put_scope(f"dashboard-value-{arg}", set_value(arg_dict)), put_scope(f"dashboard-time-{arg}", [ put_text(f"{name} - {lang.readable_time(config.get('time', ''))}").style("--dashboard-time--"), ]) ]) @use_scope("content", clear=True) def alas_overview(self) -> None: self.init_menu(name="Overview") self.set_title(t(f"Gui.MenuAlas.Overview")) put_scope("overview", [put_scope("schedulers"), put_scope("logs")]) with use_scope("schedulers"): put_scope( "scheduler-bar", [ put_text(t("Gui.Overview.Scheduler")).style( "font-size: 1.25rem; margin: auto .5rem auto;" ), put_scope("scheduler_btn"), ], ) put_scope( "running", [ put_text(t("Gui.Overview.Running")), put_html('
'), put_scope("running_tasks"), ], ) put_scope( "pending", [ put_text(t("Gui.Overview.Pending")), put_html('
'), put_scope("pending_tasks"), ], ) put_scope( "waiting", [ put_text(t("Gui.Overview.Waiting")), put_html('
'), put_scope("waiting_tasks"), ], ) switch_scheduler = BinarySwitchButton( label_on=t("Gui.Button.Stop"), label_off=t("Gui.Button.Start"), onclick_on=lambda: self.alas.stop(), onclick_off=lambda: self.alas.start(None, updater.event), get_state=lambda: self.alas.alive, color_on="off", color_off="on", scope="scheduler_btn", ) log = RichLog("log") with use_scope("logs"): put_scope("log-bar", [ put_scope("log-title", [ put_text(t("Gui.Overview.Log")).style("font-size: 1.25rem; margin: auto .5rem auto;"), put_scope("log-title-btns", [ put_scope("log_scroll_btn"), ]), ]), put_html('
'), put_scope("dashboard", [ # Empty dashboard, values will be updated in alas_update_overview_task() put_scope(f"dashboard-row-{arg}", []) for arg in self.ALAS_STORED.keys() if deep_get(self.ALAS_STORED, keys=[arg, "order"], default=0) # Empty content to left-align last row ] + [put_html("")] * min(len(self.ALAS_STORED), 4)) ]) put_scope("log", [put_html("")]) log.console.width = log.get_width() switch_log_scroll = BinarySwitchButton( label_on=t("Gui.Button.ScrollON"), label_off=t("Gui.Button.ScrollOFF"), onclick_on=lambda: log.set_scroll(False), onclick_off=lambda: log.set_scroll(True), get_state=lambda: log.keep_bottom, color_on="on", color_off="off", scope="log_scroll_btn", ) self.task_handler.add(switch_scheduler.g(), 1, True) self.task_handler.add(switch_log_scroll.g(), 1, True) self.task_handler.add(self.alas_update_overview_task, 10, True) self.task_handler.add(log.put_log(self.alas), 0.25, True) def _init_alas_config_watcher(self) -> None: def put_queue(path, value): self.modified_config_queue.put({"name": path, "value": value}) for path in get_alas_config_listen_path(self.ALAS_ARGS): pin_on_change( name="_".join(path), onchange=partial(put_queue, ".".join(path)) ) logger.info("Init config watcher done.") def _alas_thread_update_config(self) -> None: modified = {} while self.alive: try: d = self.modified_config_queue.get(timeout=10) config_name = self.alas_name config_updater = self.alas_config except queue.Empty: continue modified[d["name"]] = d["value"] while True: try: d = self.modified_config_queue.get(timeout=1) modified[d["name"]] = d["value"] except queue.Empty: self._save_config(modified, config_name, config_updater) modified.clear() break def _save_config( self, modified: Dict[str, str], config_name: str, config_updater: AzurLaneConfig = State.config_updater, ) -> None: try: valid = [] invalid = [] config = config_updater.read_file(config_name) for k, v in modified.copy().items(): valuetype = deep_get(self.ALAS_ARGS, k + ".valuetype") v = parse_pin_value(v, valuetype) validate = deep_get(self.ALAS_ARGS, k + ".validate") if not len(str(v)): default = deep_get(self.ALAS_ARGS, k + ".value") modified[k] = default deep_set(config, k, default) valid.append(k) pin["_".join(k.split("."))] = default elif not validate or re_fullmatch(validate, v): deep_set(config, k, v) modified[k] = v valid.append(k) for set_key, set_value in config_updater.save_callback(k, v): logger.info([set_key, set_value, pin["_".join(set_key.split("."))]]) modified[set_key] = set_value deep_set(config, set_key, set_value) valid.append(set_key) pin["_".join(set_key.split("."))] = set_value else: modified.pop(k) invalid.append(k) logger.warning(f"Invalid value {v} for key {k}, skip saving.") self.pin_remove_invalid_mark(valid) self.pin_set_invalid_mark(invalid) if modified: toast( t("Gui.Toast.ConfigSaved"), duration=1, position="right", color="success", ) logger.info( f"Save config {filepath_config(config_name)}, {dict_to_kv(modified)}" ) config_updater.write_file(config_name, config) except Exception as e: logger.exception(e) def alas_update_overview_task(self) -> None: if not self.visible: return self.alas_config.load() self.alas_config.get_next_task() alive = self.alas.alive if len(self.alas_config.pending_task) >= 1: if self.alas.alive: running = self.alas_config.pending_task[:1] pending = self.alas_config.pending_task[1:] else: running = [] pending = self.alas_config.pending_task[:] else: running = [] pending = [] waiting = self.alas_config.waiting_task def put_task(func: Function): with use_scope(f"overview-task_{func.command}"): put_column( [ put_text(t(f"Task.{func.command}.name")).style("--arg-title--"), put_text(str(func.next_run)).style("--arg-help--"), ], size="auto auto", ) put_button( label=t("Gui.Button.Setting"), onclick=lambda: self.alas_set_group(func.command), color="off", ) if self.scope_expired_then_add("pending_task", [ alive, self.alas_config.pending_task ]): clear("running_tasks") clear("pending_tasks") clear("waiting_tasks") with use_scope("running_tasks"): if running: for task in running: put_task(task) else: put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--") with use_scope("pending_tasks"): if pending: for task in pending: put_task(task) else: put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--") with use_scope("waiting_tasks"): if waiting: for task in waiting: put_task(task) else: put_text(t("Gui.Overview.NoTask")).style("--overview-notask-text--") for arg, arg_dict in self.ALAS_STORED.items(): # Skip order=0 if not arg_dict.get("order", 0): continue path = arg_dict["path"] if self.scope_expired_then_add(f"dashboard-time-value-{arg}", [ deep_get(self.alas_config.data, keys=f"{path}.value"), lang.readable_time(deep_get(self.alas_config.data, keys=f"{path}.time")), ]): self.set_dashboard(arg, arg_dict, deep_get(self.alas_config.data, keys=path, default={})) @use_scope("content", clear=True) def alas_daemon_overview(self, task: str) -> None: self.init_menu(name=task) self.set_title(t(f"Task.{task}.name")) log = RichLog("log") if self.is_mobile: put_scope( "daemon-overview", [ put_scope("scheduler-bar"), put_scope("groups"), put_scope("log-bar"), put_scope("log", [put_html("")]), ], ) else: put_scope( "daemon-overview", [ put_none(), put_scope( "_daemon", [ put_scope( "_daemon_upper", [put_scope("scheduler-bar"), put_scope("log-bar")], ), put_scope("groups"), put_scope("log", [put_html("")]), ], ), put_none(), ], ) log.console.width = log.get_width() with use_scope("scheduler-bar"): put_text(t("Gui.Overview.Scheduler")).style( "font-size: 1.25rem; margin: auto .5rem auto;" ) put_scope("scheduler_btn") switch_scheduler = BinarySwitchButton( label_on=t("Gui.Button.Stop"), label_off=t("Gui.Button.Start"), onclick_on=lambda: self.alas.stop(), onclick_off=lambda: self.alas.start(task), get_state=lambda: self.alas.alive, color_on="off", color_off="on", scope="scheduler_btn", ) with use_scope("log-bar"): put_text(t("Gui.Overview.Log")).style( "font-size: 1.25rem; margin: auto .5rem auto;" ) put_scope( "log-bar-btns", [ put_scope("log_scroll_btn"), ], ) switch_log_scroll = BinarySwitchButton( label_on=t("Gui.Button.ScrollON"), label_off=t("Gui.Button.ScrollOFF"), onclick_on=lambda: log.set_scroll(False), onclick_off=lambda: log.set_scroll(True), get_state=lambda: log.keep_bottom, color_on="on", color_off="off", scope="log_scroll_btn", ) config = self.alas_config.read_file(self.alas_name) for group, arg_dict in deep_iter(self.ALAS_ARGS[task], depth=1): if group[0] == "Storage": continue self.set_group(group, arg_dict, config, task) run_js(""" $("#pywebio-scope-log").css( "grid-row-start", -2 - $("#pywebio-scope-_daemon").children().filter( function(){ return $(this).css("display") === "none"; } ).length ); $("#pywebio-scope-log").css( "grid-row-end", -1 ); """) self.task_handler.add(switch_scheduler.g(), 1, True) self.task_handler.add(switch_log_scroll.g(), 1, True) self.task_handler.add(log.put_log(self.alas), 0.25, True) @use_scope("menu", clear=True) def dev_set_menu(self) -> None: self.init_menu(collapse_menu=False, name="Develop") put_button( label=t("Gui.MenuDevelop.HomePage"), onclick=self.show, color="menu", ).style(f"--menu-HomePage--") # put_button( # label=t("Gui.MenuDevelop.Translate"), # onclick=self.dev_translate, # color="menu", # ).style(f"--menu-Translate--") put_button( label=t("Gui.MenuDevelop.Update"), onclick=self.dev_update, color="menu", ).style(f"--menu-Update--") put_button( label=t("Gui.MenuDevelop.Remote"), onclick=self.dev_remote, color="menu", ).style(f"--menu-Remote--") put_button( label=t("Gui.MenuDevelop.Utils"), onclick=self.dev_utils, color="menu", ).style(f"--menu-Utils--") def dev_translate(self) -> None: go_app("translate", new_window=True) lang.TRANSLATE_MODE = True self.show() @use_scope("content", clear=True) def dev_update(self) -> None: self.init_menu(name="Update") self.set_title(t("Gui.MenuDevelop.Update")) if State.restart_event is None: put_warning(t("Gui.Update.DisabledWarn")) put_row( content=[put_scope("updater_loading"), None, put_scope("updater_state")], size="auto .25rem 1fr", ) put_scope("updater_btn") put_scope("updater_info") def update_table(): with use_scope("updater_info", clear=True): local_commit = updater.get_commit(short_sha1=True) upstream_commit = updater.get_commit( f"origin/{updater.Branch}", short_sha1=True ) put_table( [ [t("Gui.Update.Local"), *local_commit], [t("Gui.Update.Upstream"), *upstream_commit], ], header=[ "", "SHA1", t("Gui.Update.Author"), t("Gui.Update.Time"), t("Gui.Update.Message"), ], ) with use_scope("updater_detail", clear=True): put_text(t("Gui.Update.DetailedHistory")) history = updater.get_commit( f"origin/{updater.Branch}", n=20, short_sha1=True ) put_table( [commit for commit in history], header=[ "SHA1", t("Gui.Update.Author"), t("Gui.Update.Time"), t("Gui.Update.Message"), ], ) def u(state): if state == -1: return clear("updater_loading") clear("updater_state") clear("updater_btn") if state == 0: put_loading("border", "secondary", "updater_loading").style( "--loading-border-fill--" ) put_text(t("Gui.Update.UpToDate"), scope="updater_state") put_button( t("Gui.Button.CheckUpdate"), onclick=updater.check_update, color="info", scope="updater_btn", ) update_table() elif state == 1: put_loading("grow", "success", "updater_loading").style( "--loading-grow--" ) put_text(t("Gui.Update.HaveUpdate"), scope="updater_state") put_button( t("Gui.Button.ClickToUpdate"), onclick=updater.run_update, color="success", scope="updater_btn", ) update_table() elif state == "checking": put_loading("border", "primary", "updater_loading").style( "--loading-border--" ) put_text(t("Gui.Update.UpdateChecking"), scope="updater_state") elif state == "failed": put_loading("grow", "danger", "updater_loading").style( "--loading-grow--" ) put_text(t("Gui.Update.UpdateFailed"), scope="updater_state") put_button( t("Gui.Button.RetryUpdate"), onclick=updater.run_update, color="primary", scope="updater_btn", ) elif state == "start": put_loading("border", "primary", "updater_loading").style( "--loading-border--" ) put_text(t("Gui.Update.UpdateStart"), scope="updater_state") put_button( t("Gui.Button.CancelUpdate"), onclick=updater.cancel, color="danger", scope="updater_btn", ) elif state == "wait": put_loading("border", "primary", "updater_loading").style( "--loading-border--" ) put_text(t("Gui.Update.UpdateWait"), scope="updater_state") put_button( t("Gui.Button.CancelUpdate"), onclick=updater.cancel, color="danger", scope="updater_btn", ) elif state == "run update": put_loading("border", "primary", "updater_loading").style( "--loading-border--" ) put_text(t("Gui.Update.UpdateRun"), scope="updater_state") put_button( t("Gui.Button.CancelUpdate"), onclick=updater.cancel, color="danger", scope="updater_btn", disabled=True, ) elif state == "reload": put_loading("grow", "success", "updater_loading").style( "--loading-grow--" ) put_text(t("Gui.Update.UpdateSuccess"), scope="updater_state") update_table() elif state == "finish": put_loading("grow", "success", "updater_loading").style( "--loading-grow--" ) put_text(t("Gui.Update.UpdateFinish"), scope="updater_state") update_table() elif state == "cancel": put_loading("border", "danger", "updater_loading").style( "--loading-border--" ) put_text(t("Gui.Update.UpdateCancel"), scope="updater_state") put_button( t("Gui.Button.CancelUpdate"), onclick=updater.cancel, color="danger", scope="updater_btn", disabled=True, ) else: put_text( "Something went wrong, please contact develops", scope="updater_state", ) put_text(f"state: {state}", scope="updater_state") updater_switch = Switch( status=u, get_state=lambda: updater.state, name="updater" ) update_table() self.task_handler.add(updater_switch.g(), delay=0.5, pending_delete=True) updater.check_update() @use_scope("content", clear=True) def dev_utils(self) -> None: self.init_menu(name="Utils") self.set_title(t("Gui.MenuDevelop.Utils")) put_button(label="Raise exception", onclick=raise_exception) def _force_restart(): if State.restart_event is not None: toast("Alas will restart in 3 seconds", duration=0, color="error") clearup() State.restart_event.set() else: toast("Reload not enabled", color="error") put_button(label="Force restart", onclick=_force_restart) @use_scope("content", clear=True) def dev_remote(self) -> None: self.init_menu(name="Remote") self.set_title(t("Gui.MenuDevelop.Remote")) put_row( content=[put_scope("remote_loading"), None, put_scope("remote_state")], size="auto .25rem 1fr", ) put_scope("remote_info") def u(state): if state == -1: return clear("remote_loading") clear("remote_state") clear("remote_info") if state in (1, 2): put_loading("grow", "success", "remote_loading").style( "--loading-grow--" ) put_text(t("Gui.Remote.Running"), scope="remote_state") put_text(t("Gui.Remote.EntryPoint"), scope="remote_info") entrypoint = RemoteAccess.get_entry_point() if entrypoint: if State.electron: # Prevent click into url in electron client put_text(entrypoint, scope="remote_info").style( "text-decoration-line: underline" ) else: put_link(name=entrypoint, url=entrypoint, scope="remote_info") else: put_text("Loading...", scope="remote_info") elif state in (0, 3): put_loading("border", "secondary", "remote_loading").style( "--loading-border-fill--" ) if ( State.deploy_config.EnableRemoteAccess and State.deploy_config.Password ): put_text(t("Gui.Remote.NotRunning"), scope="remote_state") else: put_text(t("Gui.Remote.NotEnable"), scope="remote_state") put_text(t("Gui.Remote.ConfigureHint"), scope="remote_info") url = "http://app.azurlane.cloud" + ( "" if State.deploy_config.Language.startswith("zh") else "/en.html" ) put_html( f'{url}', scope="remote_info" ) if state == 3: put_warning( t("Gui.Remote.SSHNotInstall"), closable=False, scope="remote_info", ) remote_switch = Switch( status=u, get_state=RemoteAccess.get_state, name="remote" ) self.task_handler.add(remote_switch.g(), delay=1, pending_delete=True) def ui_develop(self) -> None: if not self.is_mobile: self.show() return self.init_aside(name="Home") self.set_title(t("Gui.Aside.Home")) self.dev_set_menu() self.alas_name = "" if hasattr(self, "alas"): del self.alas self.state_switch.switch() def ui_alas(self, config_name: str) -> None: if config_name == self.alas_name: self.expand_menu() return self.init_aside(name=config_name) clear("content") self.alas_name = config_name self.alas_mod = get_config_mod(config_name) self.alas = ProcessManager.get_manager(config_name) self.alas_config = load_config(config_name) self.state_switch.switch() self.initial() self.alas_set_menu() def ui_add_alas(self) -> None: with popup(t("Gui.AddAlas.PopupTitle")) as s: def get_unused_name(): all_name = alas_instance() for i in range(2, 100): if f"src{i}" not in all_name: return f"src{i}" else: return "" def add(): name = pin["AddAlas_name"] origin = pin["AddAlas_copyfrom"] if name in alas_instance(): err = "Gui.AddAlas.FileExist" elif set(name) & set(".\\/:*?\"'<>|"): err = "Gui.AddAlas.InvalidChar" elif name.lower().startswith("template"): err = "Gui.AddAlas.InvalidPrefixTemplate" else: err = "" if err: clear(s) put(name, origin) put_error(t(err), scope=s) return r = load_config(origin).read_file(origin) State.config_updater.write_file(name, r, get_config_mod(origin)) self.set_aside() self.active_button("aside", self.alas_name) close_popup() def put(name=None, origin=None): put_input( name="AddAlas_name", label=t("Gui.AddAlas.NewName"), value=name or get_unused_name(), scope=s, ), put_select( name="AddAlas_copyfrom", label=t("Gui.AddAlas.CopyFrom"), options=alas_template() + alas_instance(), value=origin or "template-src", scope=s, ), put_button(label=t("Gui.AddAlas.Confirm"), onclick=add, scope=s) put() def show(self) -> None: self._show() self.set_aside() self.init_aside(name="Home") self.dev_set_menu() self.init_menu(name="HomePage") self.alas_name = "" if hasattr(self, "alas"): del self.alas self.set_status(0) def set_language(l): lang.set_language(l) self.show() def set_theme(t): self.set_theme(t) run_js("location.reload()") with use_scope("content"): put_text("Select your language / 选择语言").style("text-align: center") put_buttons( [ {"label": "简体中文", "value": "zh-CN"}, {"label": "繁體中文", "value": "zh-TW"}, {"label": "English", "value": "en-US"}, {"label": "日本語", "value": "ja-JP"}, {"label": "Español", "value": "es-ES"}, ], onclick=lambda l: set_language(l), ).style("text-align: center") put_text("Change theme / 更改主题").style("text-align: center") put_buttons( [ {"label": "Light", "value": "default", "color": "light"}, {"label": "Dark", "value": "dark", "color": "dark"}, ], onclick=lambda t: set_theme(t), ).style("text-align: center") # show something put_markdown( """ SRC is a free open source software, if you paid for SRC from any channel, please refund. SRC 是一款免费开源软件,如果你在任何渠道付费购买了SRC,请退款。 Project repository 项目地址:`https://github.com/LmeSzinc/StarRailCopilot` """ ).style("text-align: center") if lang.TRANSLATE_MODE: lang.reload() def _disable(): lang.TRANSLATE_MODE = False self.show() toast( _t("Gui.Toast.DisableTranslateMode"), duration=0, position="right", onclick=_disable, ) def run(self) -> None: # setup gui set_env(title="SRC", output_animation=False) add_css(filepath_css("alas")) if self.is_mobile: add_css(filepath_css("alas-mobile")) else: add_css(filepath_css("alas-pc")) if self.theme == "dark": add_css(filepath_css("dark-alas")) else: add_css(filepath_css("light-alas")) # Auto refresh when lost connection # [For develop] Disable by run `reload=0` in console run_js( """ reload = 1; WebIO._state.CurrentSession.on_session_close( ()=>{ setTimeout( ()=>{ if (reload == 1){ location.reload(); } }, 4000 ) } ); """ ) aside = get_localstorage("aside") self.show() # init config watcher self._init_alas_config_watcher() # save config _thread_save_config = threading.Thread(target=self._alas_thread_update_config) register_thread(_thread_save_config) _thread_save_config.start() visibility_state_switch = Switch( status={ True: [ lambda: self.__setattr__("visible", True), lambda: self.alas_update_overview_task() if self.page == "Overview" else 0, lambda: self.task_handler._task.__setattr__("delay", 15), ], False: [ lambda: self.__setattr__("visible", False), lambda: self.task_handler._task.__setattr__("delay", 1), ], }, get_state=get_window_visibility_state, name="visibility_state", ) self.state_switch = Switch( status=self.set_status, get_state=lambda: getattr(getattr(self, "alas", -1), "state", 0), name="state", ) def goto_update(): self.ui_develop() self.dev_update() update_switch = Switch( status={ 1: lambda: toast( t("Gui.Toast.ClickToUpdate"), duration=0, position="right", color="success", onclick=goto_update, ) }, get_state=lambda: updater.state, name="update_state", ) self.task_handler.add(self.state_switch.g(), 2) self.task_handler.add(visibility_state_switch.g(), 15) self.task_handler.add(update_switch.g(), 1) self.task_handler.start() # Return to previous page if aside not in ["Home", None]: self.ui_alas(aside) def debug(): """For interactive python. $ python3 >>> from module.webui.app import * >>> debug() >>> """ startup() AlasGUI().run() def startup(): State.init() lang.reload() updater.event = State.manager.Event() if updater.delay > 0: task_handler.add(updater.check_update, updater.delay) task_handler.add(updater.schedule_update(), 86400) task_handler.start() # if State.deploy_config.DiscordRichPresence: # init_discord_rpc() # if State.deploy_config.StartOcrServer: # start_ocr_server_process(State.deploy_config.OcrServerPort) if ( State.deploy_config.EnableRemoteAccess and State.deploy_config.Password is not None ): task_handler.add(RemoteAccess.keep_ssh_alive(), 60) def clearup(): """ Notice: Ensure run it before uvicorn reload app, all process will NOT EXIT after close electron app. """ logger.info("Start clearup") RemoteAccess.kill_ssh_process() # close_discord_rpc() # stop_ocr_server_process() for alas in ProcessManager._processes.values(): alas.stop() State.clearup() task_handler.stop() logger.info("Alas closed.") def app(): parser = argparse.ArgumentParser(description="Alas web service") parser.add_argument( "-k", "--key", type=str, help="Password of alas. No password by default" ) parser.add_argument( "--cdn", action="store_true", help="Use jsdelivr cdn for pywebio static files (css, js). Self host cdn by default.", ) parser.add_argument( "--run", nargs="+", type=str, help="Run alas by config names on startup", ) args, _ = parser.parse_known_args() # Apply config AlasGUI.set_theme(theme=State.deploy_config.Theme) lang.LANG = State.deploy_config.Language key = args.key or State.deploy_config.Password cdn = args.cdn if args.cdn else State.deploy_config.CDN runs = None if args.run: runs = args.run elif State.deploy_config.Run: # TODO: refactor poor_yaml_read() to support list tmp = State.deploy_config.Run.split(",") runs = [l.strip(" ['\"]") for l in tmp if len(l)] instances: List[str] = runs logger.hr("Webui configs") logger.attr("Theme", State.deploy_config.Theme) logger.attr("Language", lang.LANG) logger.attr("Password", True if key else False) logger.attr("CDN", cdn) def index(): if key is not None and not login(key): logger.warning(f"{info.user_ip} login failed.") time.sleep(1.5) run_js("location.reload();") return gui = AlasGUI() local.gui = gui gui.run() app = asgi_app( applications=[index], cdn=cdn, static_dir=None, debug=True, on_startup=[ startup, lambda: ProcessManager.restart_processes( instances=instances, ev=updater.event ), ], on_shutdown=[clearup], ) return app