StarRailCopilot/module/config/config.py
2024-01-15 00:20:36 +08:00

598 lines
18 KiB
Python

import copy
import datetime
import operator
import threading
import pywebio
from module.base.decorator import cached_property, del_cached_property
from module.base.filter import Filter
from module.config.config_generated import GeneratedConfig
from module.config.config_manual import ManualConfig, OutputConfig
from module.config.config_updater import ConfigUpdater
from module.config.stored.classes import iter_attribute
from module.config.stored.stored_generated import StoredGenerated
from module.config.utils import *
from module.config.watcher import ConfigWatcher
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
class TaskEnd(Exception):
pass
class Function:
def __init__(self, data):
self.enable = deep_get(data, keys="Scheduler.Enable", default=False)
self.command = deep_get(data, keys="Scheduler.Command", default="Unknown")
self.next_run = deep_get(data, keys="Scheduler.NextRun", default=DEFAULT_TIME)
def __str__(self):
enable = "Enable" if self.enable else "Disable"
return f"{self.command} ({enable}, {str(self.next_run)})"
__repr__ = __str__
def __eq__(self, other):
if not isinstance(other, Function):
return False
if self.command == other.command and self.next_run == other.next_run:
return True
else:
return False
def name_to_function(name):
"""
Args:
name (str):
Returns:
Function:
"""
function = Function({})
function.command = name
function.enable = True
return function
class AzurLaneConfig(ConfigUpdater, ManualConfig, GeneratedConfig, ConfigWatcher):
stop_event: threading.Event = None
bound = {}
# Class property
is_hoarding_task = True
def __setattr__(self, key, value):
if key in self.bound:
path = self.bound[key]
self.modified[path] = value
if self.auto_update:
self.update()
else:
super().__setattr__(key, value)
def __init__(self, config_name, task=None):
logger.attr("Lang", self.LANG)
# This will read ./config/<config_name>.json
self.config_name = config_name
# Raw json data in yaml file.
self.data = {}
# Modified arguments. Key: Argument path in yaml file. Value: Modified value.
# All variable modifications will be record here and saved in method `save()`.
self.modified = {}
# Key: Argument name in GeneratedConfig. Value: Path in `data`.
self.bound = {}
# If write after every variable modification.
self.auto_update = True
# Force override variables
# Key: Argument name in GeneratedConfig. Value: Modified value.
self.overridden = {}
# Scheduler queue, will be updated in `get_next_task()`, list of Function objects
# pending_task: Run time has been reached, but haven't been run due to task scheduling.
# waiting_task: Run time haven't been reached, wait needed.
self.pending_task = []
self.waiting_task = []
# Task to run and bind.
# Task means the name of the function to run in AzurLaneAutoScript class.
self.task: Function
# Template config is used for dev tools
self.is_template_config = config_name.startswith("template")
if self.is_template_config:
# For dev tools
logger.info("Using template config, which is read only")
self.auto_update = False
self.task = name_to_function("template")
else:
self.load()
if task is None:
# Bind `Alas` by default which includes emulator settings.
task = name_to_function("Alas")
else:
# Bind a specific task for debug purpose.
task = name_to_function(task)
self.bind(task)
self.task = task
self.save()
def load(self):
self.data = self.read_file(self.config_name)
self.config_override()
for path, value in self.modified.items():
deep_set(self.data, keys=path, value=value)
def bind(self, func, func_list=None):
"""
Args:
func (str, Function): Function to run
func_list (list[str]): List of tasks to be bound
"""
if isinstance(func, Function):
func = func.command
# func_list: ["Alas", <task>, *func_list]
if func_list is None:
func_list = []
if func not in func_list:
func_list.insert(0, func)
if "Alas" not in func_list:
func_list.insert(0, "Alas")
logger.info(f"Bind task {func_list}")
# Bind arguments
visited = set()
self.bound.clear()
for func in func_list:
func_data = self.data.get(func, {})
for group, group_data in func_data.items():
for arg, value in group_data.items():
path = f"{group}.{arg}"
if path in visited:
continue
arg = path_to_arg(path)
super().__setattr__(arg, value)
self.bound[arg] = f"{func}.{path}"
visited.add(path)
# Override arguments
for arg, value in self.overridden.items():
super().__setattr__(arg, value)
@property
def hoarding(self):
minutes = int(
deep_get(
self.data, keys="Alas.Optimization.TaskHoardingDuration", default=0
)
)
return timedelta(minutes=max(minutes, 0))
@property
def close_game(self):
return deep_get(
self.data, keys="Alas.Optimization.CloseGameDuringWait", default=False
)
@property
def is_cloud_game(self):
return deep_get(
self.data, keys="Alas.Emulator.GameClient"
) == 'cloud_android'
@cached_property
def stored(self) -> StoredGenerated:
stored = StoredGenerated()
# Bind config
for _, value in iter_attribute(stored):
value._bind(self)
del_cached_property(value, '_stored')
return stored
def get_next_task(self):
"""
Calculate tasks, set pending_task and waiting_task
"""
pending = []
waiting = []
error = []
now = datetime.now()
if AzurLaneConfig.is_hoarding_task:
now -= self.hoarding
for func in self.data.values():
func = Function(func)
if not func.enable:
continue
if not isinstance(func.next_run, datetime):
error.append(func)
elif func.next_run < now:
pending.append(func)
else:
waiting.append(func)
f = Filter(regex=r"(.*)", attr=["command"])
f.load(self.SCHEDULER_PRIORITY)
if pending:
pending = f.apply(pending)
if waiting:
waiting = f.apply(waiting)
waiting = sorted(waiting, key=operator.attrgetter("next_run"))
if error:
pending = error + pending
self.pending_task = pending
self.waiting_task = waiting
def get_next(self):
"""
Returns:
Function: Command to run
"""
self.get_next_task()
if self.pending_task:
AzurLaneConfig.is_hoarding_task = False
logger.info(f"Pending tasks: {[f.command for f in self.pending_task]}")
task = self.pending_task[0]
logger.attr("Task", task)
return task
else:
AzurLaneConfig.is_hoarding_task = True
if self.waiting_task:
logger.info("No task pending")
task = copy.deepcopy(self.waiting_task[0])
task.next_run = (task.next_run + self.hoarding).replace(microsecond=0)
logger.attr("Task", task)
return task
else:
logger.critical("No task waiting or pending")
logger.critical("Please enable at least one task")
raise RequestHumanTakeover
def save(self, mod_name='alas'):
if not self.modified:
return False
for path, value in self.modified.items():
deep_set(self.data, keys=path, value=value)
logger.info(
f"Save config {filepath_config(self.config_name, mod_name)}, {dict_to_kv(self.modified)}"
)
# Don't use self.modified = {}, that will create a new object.
self.modified.clear()
del_cached_property(self, 'stored')
self.write_file(self.config_name, data=self.data)
def update(self):
self.load()
self.config_override()
self.bind(self.task)
self.save()
def config_override(self):
now = datetime.now().replace(microsecond=0)
limited = set()
def limit_next_run(tasks, limit):
for task in tasks:
if task in limited:
continue
limited.add(task)
next_run = deep_get(
self.data, keys=f"{task}.Scheduler.NextRun", default=None
)
if isinstance(next_run, datetime) and next_run > limit:
deep_set(self.data, keys=f"{task}.Scheduler.NextRun", value=now)
limit_next_run(['BattlePass'], limit=now + timedelta(days=40, seconds=-1))
limit_next_run(self.args.keys(), limit=now + timedelta(hours=24, seconds=-1))
def override(self, **kwargs):
"""
Override anything you want.
Variables stall remain overridden even config is reloaded from yaml file.
Note that this method is irreversible.
"""
for arg, value in kwargs.items():
self.overridden[arg] = value
super().__setattr__(arg, value)
def set_record(self, **kwargs):
"""
Args:
**kwargs: For example, `Emotion1_Value=150`
will set `Emotion1_Value=150` and `Emotion1_Record=now()`
"""
with self.multi_set():
for arg, value in kwargs.items():
record = arg.replace("Value", "Record")
self.__setattr__(arg, value)
self.__setattr__(record, datetime.now().replace(microsecond=0))
def multi_set(self):
"""
Set multiple arguments but save once.
Examples:
with self.config.multi_set():
self.config.foo1 = 1
self.config.foo2 = 2
"""
return MultiSetWrapper(main=self)
def cross_get(self, keys, default=None):
"""
Get configs from other tasks.
Args:
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
default:
Returns:
Any:
"""
return deep_get(self.data, keys=keys, default=default)
def cross_set(self, keys, value):
"""
Set configs to other tasks.
Args:
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
value (Any):
Returns:
Any:
"""
self.modified[keys] = value
if self.auto_update:
self.update()
def task_delay(self, success=None, server_update=None, target=None, minute=None, task=None):
"""
Set Scheduler.NextRun
Should set at least one arguments.
If multiple arguments are set, use the nearest.
Args:
success (bool):
If True, delay Scheduler.SuccessInterval
If False, delay Scheduler.FailureInterval
server_update (bool, list, str):
If True, delay to nearest Scheduler.ServerUpdate
If type is list or str, delay to such server update
target (datetime.datetime, str, list):
Delay to such time.
minute (int, float, tuple):
Delay several minutes.
task (str):
Set across task. None for current task.
"""
def ensure_delta(delay):
return timedelta(seconds=int(ensure_time(delay, precision=3) * 60))
run = []
if success is not None:
interval = (
120
if success
else 30
)
run.append(datetime.now() + ensure_delta(interval))
if server_update is not None:
if server_update is True:
server_update = self.Scheduler_ServerUpdate
run.append(get_server_next_update(server_update))
if target is not None:
target = [target] if not isinstance(target, list) else target
target = nearest_future(target)
run.append(target)
if minute is not None:
run.append(datetime.now() + ensure_delta(minute))
if len(run):
run = min(run).replace(microsecond=0)
kv = dict_to_kv(
{
"success": success,
"server_update": server_update,
"target": target,
"minute": minute,
},
allow_none=False,
)
if task is None:
task = self.task.command
logger.info(f"Delay task `{task}` to {run} ({kv})")
self.modified[f'{task}.Scheduler.NextRun'] = run
self.update()
else:
raise ScriptError(
"Missing argument in delay_next_run, should set at least one"
)
def task_call(self, task, force_call=True):
"""
Call another task to run.
That task will run when current task finished.
But it might not be run because:
- Other tasks should run first according to SCHEDULER_PRIORITY
- Task is disabled by user
Args:
task (str): Task name to call, such as `Restart`
force_call (bool):
Returns:
bool: If called.
"""
if deep_get(self.data, keys=f"{task}.Scheduler.NextRun", default=None) is None:
raise ScriptError(f"Task to call: `{task}` does not exist in user config")
if force_call or self.is_task_enabled(task):
logger.info(f"Task call: {task}")
self.modified[f"{task}.Scheduler.NextRun"] = datetime.now().replace(
microsecond=0
)
self.modified[f"{task}.Scheduler.Enable"] = True
if self.auto_update:
self.update()
return True
else:
logger.info(f"Task call: {task} (skipped because disabled by user)")
return False
@staticmethod
def task_stop(message=""):
"""
Stop current task.
Raises:
TaskEnd:
"""
if message:
raise TaskEnd(message)
else:
raise TaskEnd
def task_switched(self):
"""
Check if needs to switch task.
Raises:
bool: If task switched
"""
# Update event
if self.stop_event is not None:
if self.stop_event.is_set():
return True
prev = self.task
self.load()
new = self.get_next()
if prev == new:
logger.info(f"Continue task `{new}`")
return False
else:
logger.info(f"Switch task `{prev}` to `{new}`")
return True
def check_task_switch(self, message=""):
"""
Stop current task when task switched.
Raises:
TaskEnd:
"""
if self.task_switched():
self.task_stop(message=message)
def is_task_enabled(self, task):
return bool(self.cross_get(keys=[task, 'Scheduler', 'Enable'], default=False))
def update_daily_quests(self):
"""
Raises:
TaskEnd: Call task `DailyQuest` and stop current task
"""
if self.stored.DailyActivity.is_expired():
logger.info('DailyActivity expired, call task to update')
self.task_call('DailyQuest')
self.task_stop()
if self.stored.DailyQuest.is_expired():
logger.info('DailyQuest expired, call task to update')
self.task_call('DailyQuest')
self.task_stop()
def update_battle_pass_quests(self):
"""
Raises:
TaskEnd: Call task `BattlePass` and stop current task
"""
if self.stored.BattlePassWeeklyQuest.is_expired():
if self.stored.BattlePassLevel.is_full():
logger.info('BattlePassLevel full, no updates')
else:
logger.info('BattlePassTodayQuest expired, call task to update')
self.task_call('BattlePass')
self.task_stop()
@property
def DEVICE_SCREENSHOT_METHOD(self):
return self.Emulator_ScreenshotMethod
@property
def DEVICE_CONTROL_METHOD(self):
return self.Emulator_ControlMethod
def temporary(self, **kwargs):
"""
Cover some settings, and recover later.
Usage:
backup = self.config.cover(ENABLE_DAILY_REWARD=False)
# do_something()
backup.recover()
Args:
**kwargs:
Returns:
ConfigBackup:
"""
backup = ConfigBackup(config=self)
backup.cover(**kwargs)
return backup
pywebio.output.Output = OutputConfig
pywebio.pin.Output = OutputConfig
class ConfigBackup:
def __init__(self, config):
"""
Args:
config (AzurLaneConfig):
"""
self.config = config
self.backup = {}
self.kwargs = {}
def cover(self, **kwargs):
self.kwargs = kwargs
for key, value in kwargs.items():
self.backup[key] = self.config.__getattribute__(key)
self.config.__setattr__(key, value)
def recover(self):
for key, value in self.backup.items():
self.config.__setattr__(key, value)
class MultiSetWrapper:
def __init__(self, main):
"""
Args:
main (AzurLaneConfig):
"""
self.main = main
self.in_wrapper = False
def __enter__(self):
if self.main.auto_update:
self.main.auto_update = False
else:
self.in_wrapper = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.in_wrapper:
self.main.update()
self.main.auto_update = True