mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2024-11-16 06:25:24 +00:00
598 lines
18 KiB
Python
598 lines
18 KiB
Python
import copy
|
|
import datetime
|
|
import operator
|
|
import threading
|
|
|
|
import pywebio
|
|
|
|
from module.base.decorator import cached_property, del_cached_property
|
|
from module.base.filter import Filter
|
|
from module.config.config_generated import GeneratedConfig
|
|
from module.config.config_manual import ManualConfig, OutputConfig
|
|
from module.config.config_updater import ConfigUpdater
|
|
from module.config.stored.classes import iter_attribute
|
|
from module.config.stored.stored_generated import StoredGenerated
|
|
from module.config.utils import *
|
|
from module.config.watcher import ConfigWatcher
|
|
from module.exception import RequestHumanTakeover, ScriptError
|
|
from module.logger import logger
|
|
|
|
|
|
class TaskEnd(Exception):
|
|
pass
|
|
|
|
|
|
class Function:
|
|
def __init__(self, data):
|
|
self.enable = deep_get(data, keys="Scheduler.Enable", default=False)
|
|
self.command = deep_get(data, keys="Scheduler.Command", default="Unknown")
|
|
self.next_run = deep_get(data, keys="Scheduler.NextRun", default=DEFAULT_TIME)
|
|
|
|
def __str__(self):
|
|
enable = "Enable" if self.enable else "Disable"
|
|
return f"{self.command} ({enable}, {str(self.next_run)})"
|
|
|
|
__repr__ = __str__
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Function):
|
|
return False
|
|
|
|
if self.command == other.command and self.next_run == other.next_run:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def name_to_function(name):
|
|
"""
|
|
Args:
|
|
name (str):
|
|
|
|
Returns:
|
|
Function:
|
|
"""
|
|
function = Function({})
|
|
function.command = name
|
|
function.enable = True
|
|
return function
|
|
|
|
|
|
class AzurLaneConfig(ConfigUpdater, ManualConfig, GeneratedConfig, ConfigWatcher):
|
|
stop_event: threading.Event = None
|
|
bound = {}
|
|
|
|
# Class property
|
|
is_hoarding_task = True
|
|
|
|
def __setattr__(self, key, value):
|
|
if key in self.bound:
|
|
path = self.bound[key]
|
|
self.modified[path] = value
|
|
if self.auto_update:
|
|
self.update()
|
|
else:
|
|
super().__setattr__(key, value)
|
|
|
|
def __init__(self, config_name, task=None):
|
|
logger.attr("Lang", self.LANG)
|
|
# This will read ./config/<config_name>.json
|
|
self.config_name = config_name
|
|
# Raw json data in yaml file.
|
|
self.data = {}
|
|
# Modified arguments. Key: Argument path in yaml file. Value: Modified value.
|
|
# All variable modifications will be record here and saved in method `save()`.
|
|
self.modified = {}
|
|
# Key: Argument name in GeneratedConfig. Value: Path in `data`.
|
|
self.bound = {}
|
|
# If write after every variable modification.
|
|
self.auto_update = True
|
|
# Force override variables
|
|
# Key: Argument name in GeneratedConfig. Value: Modified value.
|
|
self.overridden = {}
|
|
# Scheduler queue, will be updated in `get_next_task()`, list of Function objects
|
|
# pending_task: Run time has been reached, but haven't been run due to task scheduling.
|
|
# waiting_task: Run time haven't been reached, wait needed.
|
|
self.pending_task = []
|
|
self.waiting_task = []
|
|
# Task to run and bind.
|
|
# Task means the name of the function to run in AzurLaneAutoScript class.
|
|
self.task: Function
|
|
# Template config is used for dev tools
|
|
self.is_template_config = config_name.startswith("template")
|
|
|
|
if self.is_template_config:
|
|
# For dev tools
|
|
logger.info("Using template config, which is read only")
|
|
self.auto_update = False
|
|
self.task = name_to_function("template")
|
|
else:
|
|
self.load()
|
|
if task is None:
|
|
# Bind `Alas` by default which includes emulator settings.
|
|
task = name_to_function("Alas")
|
|
else:
|
|
# Bind a specific task for debug purpose.
|
|
task = name_to_function(task)
|
|
self.bind(task)
|
|
self.task = task
|
|
self.save()
|
|
|
|
def load(self):
|
|
self.data = self.read_file(self.config_name)
|
|
self.config_override()
|
|
|
|
for path, value in self.modified.items():
|
|
deep_set(self.data, keys=path, value=value)
|
|
|
|
def bind(self, func, func_list=None):
|
|
"""
|
|
Args:
|
|
func (str, Function): Function to run
|
|
func_list (list[str]): List of tasks to be bound
|
|
"""
|
|
if isinstance(func, Function):
|
|
func = func.command
|
|
# func_list: ["Alas", <task>, *func_list]
|
|
if func_list is None:
|
|
func_list = []
|
|
if func not in func_list:
|
|
func_list.insert(0, func)
|
|
if "Alas" not in func_list:
|
|
func_list.insert(0, "Alas")
|
|
logger.info(f"Bind task {func_list}")
|
|
|
|
# Bind arguments
|
|
visited = set()
|
|
self.bound.clear()
|
|
for func in func_list:
|
|
func_data = self.data.get(func, {})
|
|
for group, group_data in func_data.items():
|
|
for arg, value in group_data.items():
|
|
path = f"{group}.{arg}"
|
|
if path in visited:
|
|
continue
|
|
arg = path_to_arg(path)
|
|
super().__setattr__(arg, value)
|
|
self.bound[arg] = f"{func}.{path}"
|
|
visited.add(path)
|
|
|
|
# Override arguments
|
|
for arg, value in self.overridden.items():
|
|
super().__setattr__(arg, value)
|
|
|
|
@property
|
|
def hoarding(self):
|
|
minutes = int(
|
|
deep_get(
|
|
self.data, keys="Alas.Optimization.TaskHoardingDuration", default=0
|
|
)
|
|
)
|
|
return timedelta(minutes=max(minutes, 0))
|
|
|
|
@property
|
|
def close_game(self):
|
|
return deep_get(
|
|
self.data, keys="Alas.Optimization.CloseGameDuringWait", default=False
|
|
)
|
|
|
|
@property
|
|
def is_cloud_game(self):
|
|
return deep_get(
|
|
self.data, keys="Alas.Emulator.GameClient"
|
|
) == 'cloud_android'
|
|
|
|
@cached_property
|
|
def stored(self) -> StoredGenerated:
|
|
stored = StoredGenerated()
|
|
# Bind config
|
|
for _, value in iter_attribute(stored):
|
|
value._bind(self)
|
|
del_cached_property(value, '_stored')
|
|
return stored
|
|
|
|
def get_next_task(self):
|
|
"""
|
|
Calculate tasks, set pending_task and waiting_task
|
|
"""
|
|
pending = []
|
|
waiting = []
|
|
error = []
|
|
now = datetime.now()
|
|
if AzurLaneConfig.is_hoarding_task:
|
|
now -= self.hoarding
|
|
for func in self.data.values():
|
|
func = Function(func)
|
|
if not func.enable:
|
|
continue
|
|
if not isinstance(func.next_run, datetime):
|
|
error.append(func)
|
|
elif func.next_run < now:
|
|
pending.append(func)
|
|
else:
|
|
waiting.append(func)
|
|
|
|
f = Filter(regex=r"(.*)", attr=["command"])
|
|
f.load(self.SCHEDULER_PRIORITY)
|
|
if pending:
|
|
pending = f.apply(pending)
|
|
if waiting:
|
|
waiting = f.apply(waiting)
|
|
waiting = sorted(waiting, key=operator.attrgetter("next_run"))
|
|
if error:
|
|
pending = error + pending
|
|
|
|
self.pending_task = pending
|
|
self.waiting_task = waiting
|
|
|
|
def get_next(self):
|
|
"""
|
|
Returns:
|
|
Function: Command to run
|
|
"""
|
|
self.get_next_task()
|
|
|
|
if self.pending_task:
|
|
AzurLaneConfig.is_hoarding_task = False
|
|
logger.info(f"Pending tasks: {[f.command for f in self.pending_task]}")
|
|
task = self.pending_task[0]
|
|
logger.attr("Task", task)
|
|
return task
|
|
else:
|
|
AzurLaneConfig.is_hoarding_task = True
|
|
|
|
if self.waiting_task:
|
|
logger.info("No task pending")
|
|
task = copy.deepcopy(self.waiting_task[0])
|
|
task.next_run = (task.next_run + self.hoarding).replace(microsecond=0)
|
|
logger.attr("Task", task)
|
|
return task
|
|
else:
|
|
logger.critical("No task waiting or pending")
|
|
logger.critical("Please enable at least one task")
|
|
raise RequestHumanTakeover
|
|
|
|
def save(self, mod_name='alas'):
|
|
if not self.modified:
|
|
return False
|
|
|
|
for path, value in self.modified.items():
|
|
deep_set(self.data, keys=path, value=value)
|
|
|
|
logger.info(
|
|
f"Save config {filepath_config(self.config_name, mod_name)}, {dict_to_kv(self.modified)}"
|
|
)
|
|
# Don't use self.modified = {}, that will create a new object.
|
|
self.modified.clear()
|
|
del_cached_property(self, 'stored')
|
|
self.write_file(self.config_name, data=self.data)
|
|
|
|
def update(self):
|
|
self.load()
|
|
self.config_override()
|
|
self.bind(self.task)
|
|
self.save()
|
|
|
|
def config_override(self):
|
|
now = datetime.now().replace(microsecond=0)
|
|
limited = set()
|
|
|
|
def limit_next_run(tasks, limit):
|
|
for task in tasks:
|
|
if task in limited:
|
|
continue
|
|
limited.add(task)
|
|
next_run = deep_get(
|
|
self.data, keys=f"{task}.Scheduler.NextRun", default=None
|
|
)
|
|
if isinstance(next_run, datetime) and next_run > limit:
|
|
deep_set(self.data, keys=f"{task}.Scheduler.NextRun", value=now)
|
|
|
|
limit_next_run(['BattlePass'], limit=now + timedelta(days=40, seconds=-1))
|
|
limit_next_run(self.args.keys(), limit=now + timedelta(hours=24, seconds=-1))
|
|
|
|
def override(self, **kwargs):
|
|
"""
|
|
Override anything you want.
|
|
Variables stall remain overridden even config is reloaded from yaml file.
|
|
Note that this method is irreversible.
|
|
"""
|
|
for arg, value in kwargs.items():
|
|
self.overridden[arg] = value
|
|
super().__setattr__(arg, value)
|
|
|
|
def set_record(self, **kwargs):
|
|
"""
|
|
Args:
|
|
**kwargs: For example, `Emotion1_Value=150`
|
|
will set `Emotion1_Value=150` and `Emotion1_Record=now()`
|
|
"""
|
|
with self.multi_set():
|
|
for arg, value in kwargs.items():
|
|
record = arg.replace("Value", "Record")
|
|
self.__setattr__(arg, value)
|
|
self.__setattr__(record, datetime.now().replace(microsecond=0))
|
|
|
|
def multi_set(self):
|
|
"""
|
|
Set multiple arguments but save once.
|
|
|
|
Examples:
|
|
with self.config.multi_set():
|
|
self.config.foo1 = 1
|
|
self.config.foo2 = 2
|
|
"""
|
|
return MultiSetWrapper(main=self)
|
|
|
|
def cross_get(self, keys, default=None):
|
|
"""
|
|
Get configs from other tasks.
|
|
|
|
Args:
|
|
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
|
|
default:
|
|
|
|
Returns:
|
|
Any:
|
|
"""
|
|
return deep_get(self.data, keys=keys, default=default)
|
|
|
|
def cross_set(self, keys, value):
|
|
"""
|
|
Set configs to other tasks.
|
|
|
|
Args:
|
|
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
|
|
value (Any):
|
|
|
|
Returns:
|
|
Any:
|
|
"""
|
|
self.modified[keys] = value
|
|
if self.auto_update:
|
|
self.update()
|
|
|
|
def task_delay(self, success=None, server_update=None, target=None, minute=None, task=None):
|
|
"""
|
|
Set Scheduler.NextRun
|
|
Should set at least one arguments.
|
|
If multiple arguments are set, use the nearest.
|
|
|
|
Args:
|
|
success (bool):
|
|
If True, delay Scheduler.SuccessInterval
|
|
If False, delay Scheduler.FailureInterval
|
|
server_update (bool, list, str):
|
|
If True, delay to nearest Scheduler.ServerUpdate
|
|
If type is list or str, delay to such server update
|
|
target (datetime.datetime, str, list):
|
|
Delay to such time.
|
|
minute (int, float, tuple):
|
|
Delay several minutes.
|
|
task (str):
|
|
Set across task. None for current task.
|
|
"""
|
|
|
|
def ensure_delta(delay):
|
|
return timedelta(seconds=int(ensure_time(delay, precision=3) * 60))
|
|
|
|
run = []
|
|
if success is not None:
|
|
interval = (
|
|
120
|
|
if success
|
|
else 30
|
|
)
|
|
run.append(datetime.now() + ensure_delta(interval))
|
|
if server_update is not None:
|
|
if server_update is True:
|
|
server_update = self.Scheduler_ServerUpdate
|
|
run.append(get_server_next_update(server_update))
|
|
if target is not None:
|
|
target = [target] if not isinstance(target, list) else target
|
|
target = nearest_future(target)
|
|
run.append(target)
|
|
if minute is not None:
|
|
run.append(datetime.now() + ensure_delta(minute))
|
|
|
|
if len(run):
|
|
run = min(run).replace(microsecond=0)
|
|
kv = dict_to_kv(
|
|
{
|
|
"success": success,
|
|
"server_update": server_update,
|
|
"target": target,
|
|
"minute": minute,
|
|
},
|
|
allow_none=False,
|
|
)
|
|
if task is None:
|
|
task = self.task.command
|
|
logger.info(f"Delay task `{task}` to {run} ({kv})")
|
|
self.modified[f'{task}.Scheduler.NextRun'] = run
|
|
self.update()
|
|
else:
|
|
raise ScriptError(
|
|
"Missing argument in delay_next_run, should set at least one"
|
|
)
|
|
|
|
def task_call(self, task, force_call=True):
|
|
"""
|
|
Call another task to run.
|
|
|
|
That task will run when current task finished.
|
|
But it might not be run because:
|
|
- Other tasks should run first according to SCHEDULER_PRIORITY
|
|
- Task is disabled by user
|
|
|
|
Args:
|
|
task (str): Task name to call, such as `Restart`
|
|
force_call (bool):
|
|
|
|
Returns:
|
|
bool: If called.
|
|
"""
|
|
if deep_get(self.data, keys=f"{task}.Scheduler.NextRun", default=None) is None:
|
|
raise ScriptError(f"Task to call: `{task}` does not exist in user config")
|
|
|
|
if force_call or self.is_task_enabled(task):
|
|
logger.info(f"Task call: {task}")
|
|
self.modified[f"{task}.Scheduler.NextRun"] = datetime.now().replace(
|
|
microsecond=0
|
|
)
|
|
self.modified[f"{task}.Scheduler.Enable"] = True
|
|
if self.auto_update:
|
|
self.update()
|
|
return True
|
|
else:
|
|
logger.info(f"Task call: {task} (skipped because disabled by user)")
|
|
return False
|
|
|
|
@staticmethod
|
|
def task_stop(message=""):
|
|
"""
|
|
Stop current task.
|
|
|
|
Raises:
|
|
TaskEnd:
|
|
"""
|
|
if message:
|
|
raise TaskEnd(message)
|
|
else:
|
|
raise TaskEnd
|
|
|
|
def task_switched(self):
|
|
"""
|
|
Check if needs to switch task.
|
|
|
|
Raises:
|
|
bool: If task switched
|
|
"""
|
|
# Update event
|
|
if self.stop_event is not None:
|
|
if self.stop_event.is_set():
|
|
return True
|
|
prev = self.task
|
|
self.load()
|
|
new = self.get_next()
|
|
if prev == new:
|
|
logger.info(f"Continue task `{new}`")
|
|
return False
|
|
else:
|
|
logger.info(f"Switch task `{prev}` to `{new}`")
|
|
return True
|
|
|
|
def check_task_switch(self, message=""):
|
|
"""
|
|
Stop current task when task switched.
|
|
|
|
Raises:
|
|
TaskEnd:
|
|
"""
|
|
if self.task_switched():
|
|
self.task_stop(message=message)
|
|
|
|
def is_task_enabled(self, task):
|
|
return bool(self.cross_get(keys=[task, 'Scheduler', 'Enable'], default=False))
|
|
|
|
def update_daily_quests(self):
|
|
"""
|
|
Raises:
|
|
TaskEnd: Call task `DailyQuest` and stop current task
|
|
"""
|
|
if self.stored.DailyActivity.is_expired():
|
|
logger.info('DailyActivity expired, call task to update')
|
|
self.task_call('DailyQuest')
|
|
self.task_stop()
|
|
if self.stored.DailyQuest.is_expired():
|
|
logger.info('DailyQuest expired, call task to update')
|
|
self.task_call('DailyQuest')
|
|
self.task_stop()
|
|
|
|
def update_battle_pass_quests(self):
|
|
"""
|
|
Raises:
|
|
TaskEnd: Call task `BattlePass` and stop current task
|
|
"""
|
|
if self.stored.BattlePassWeeklyQuest.is_expired():
|
|
if self.stored.BattlePassLevel.is_full():
|
|
logger.info('BattlePassLevel full, no updates')
|
|
else:
|
|
logger.info('BattlePassTodayQuest expired, call task to update')
|
|
self.task_call('BattlePass')
|
|
self.task_stop()
|
|
|
|
@property
|
|
def DEVICE_SCREENSHOT_METHOD(self):
|
|
return self.Emulator_ScreenshotMethod
|
|
|
|
@property
|
|
def DEVICE_CONTROL_METHOD(self):
|
|
return self.Emulator_ControlMethod
|
|
|
|
def temporary(self, **kwargs):
|
|
"""
|
|
Cover some settings, and recover later.
|
|
|
|
Usage:
|
|
backup = self.config.cover(ENABLE_DAILY_REWARD=False)
|
|
# do_something()
|
|
backup.recover()
|
|
|
|
Args:
|
|
**kwargs:
|
|
|
|
Returns:
|
|
ConfigBackup:
|
|
"""
|
|
backup = ConfigBackup(config=self)
|
|
backup.cover(**kwargs)
|
|
return backup
|
|
|
|
|
|
pywebio.output.Output = OutputConfig
|
|
pywebio.pin.Output = OutputConfig
|
|
|
|
|
|
class ConfigBackup:
|
|
def __init__(self, config):
|
|
"""
|
|
Args:
|
|
config (AzurLaneConfig):
|
|
"""
|
|
self.config = config
|
|
self.backup = {}
|
|
self.kwargs = {}
|
|
|
|
def cover(self, **kwargs):
|
|
self.kwargs = kwargs
|
|
for key, value in kwargs.items():
|
|
self.backup[key] = self.config.__getattribute__(key)
|
|
self.config.__setattr__(key, value)
|
|
|
|
def recover(self):
|
|
for key, value in self.backup.items():
|
|
self.config.__setattr__(key, value)
|
|
|
|
|
|
class MultiSetWrapper:
|
|
def __init__(self, main):
|
|
"""
|
|
Args:
|
|
main (AzurLaneConfig):
|
|
"""
|
|
self.main = main
|
|
self.in_wrapper = False
|
|
|
|
def __enter__(self):
|
|
if self.main.auto_update:
|
|
self.main.auto_update = False
|
|
else:
|
|
self.in_wrapper = True
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if not self.in_wrapper:
|
|
self.main.update()
|
|
self.main.auto_update = True
|