pixiv 优化命令系统

This commit is contained in:
Ricky8955555 2023-04-15 17:13:01 +08:00 committed by GitHub
parent bef10530b0
commit 75bf8aea6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -29,18 +29,15 @@ from pixivpy_async.error import NoTokenError
PREFIX = ","
CONFIG_PATH = r"data/pixiv.yml"
INTERACTIVE_CONFIG_PATH = r"data/pixiv_interactive.yml"
COMMAND_CONFIG_PATH = r"data/pixiv_command.yml"
PLUGIN_NAME = "pixiv"
HELP_URL = r"https://www.huajitech.net/pagermaid-pixiv-plugin-help/"
_config: Dict[str, Any] = {}
_interactive_config: Dict[str, Any] = {}
pixiv_api: Optional[AppPixivAPI] = None
with contextlib.suppress(Exception):
with open(CONFIG_PATH, mode="r") as config_file:
_config = yaml.safe_load(config_file)
with open(INTERACTIVE_CONFIG_PATH, mode="r") as interactive_config_file:
_interactive_config = yaml.safe_load(interactive_config_file)
def str_to_list(value: Optional[str]) -> List[str]:
@ -51,7 +48,7 @@ def reversed_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
return {v: k for k, v in d.items()}
root_command = lambda: alias_command(PLUGIN_NAME)
root_command = alias_command(PLUGIN_NAME)
Handler = Callable[[Client, Message], Awaitable[None]]
@ -80,17 +77,6 @@ class PluginConfig:
)
class InteractiveConfig:
aliases: Dict[str, str] = _interactive_config.get("aliases") or {}
reversed_aliases: Dict[str, str] = reversed_dict(aliases)
@staticmethod
def apply() -> None:
_interactive_config["aliases"] = InteractiveConfig.aliases
with open(INTERACTIVE_CONFIG_PATH, mode="w") as interactive_config_file:
yaml.safe_dump(_interactive_config, interactive_config_file)
@dataclass
class Illust:
id: int
@ -126,31 +112,99 @@ class HandlerInfo(NamedTuple):
description: str
subcommands: Dict[str, HandlerInfo] = {}
class CommandManager:
_commands: Dict[str, HandlerInfo]
_aliases: Dict[str, str]
_reversed_aliases: Dict[str, str]
_config_builder: Callable[[Dict[Any, Any]], None]
def __init__(
self, config: Dict[Any, Any], config_builder: Callable[[Dict[Any, Any]], None]
) -> None:
self._commands = {}
self._aliases = {}
self._reversed_aliases = {}
self._config_builder = config_builder
self._read_config(config)
def _read_config(self, config: Dict[Any, Any]) -> None:
self._aliases = config.get("aliases") or {}
self._reversed_aliases = reversed_dict(self._aliases)
def _serialize(self) -> Dict[Any, Any]:
return {"aliases": self._aliases}
def register(self, command: str, handler: HandlerInfo):
assert command not in self._commands, f"无法找到 {command} 指令。"
self._commands[command] = handler
def set_alias(self, command: str, alias: str) -> None:
assert command in self._commands, f"无法找到 {command} 指令。"
assert (
alias not in self._commands and alias not in self._aliases.values()
), "重定向指令冲突。"
self._aliases[command] = alias
self.update_config()
def delete_alias(self, command: str) -> None:
assert command in self._aliases, f"{command} 指令未设置重定向。"
del self._aliases[command]
self.update_config()
def get_aliases(self) -> Dict[str, str]:
return dict(self._aliases)
def get_available_commands(self, alias_only: bool = True) -> Dict[str, HandlerInfo]:
aliases = {
alias: self._commands[command] for command, alias in self._aliases.items()
}
return (
{
command: handler
for command, handler in self._commands.items()
if command not in self._aliases
}
if alias_only
else dict(self._commands)
) | aliases
def alias_of(self, command: str) -> str:
alias = self._aliases.get(command) or (
command if command in self._commands else None
)
assert alias, f"{command} 指令未设置重定向。"
return alias
def command_of(self, alias: str) -> str:
command = self._reversed_aliases.get(alias)
assert command, f"不存在 {alias} 重定向指令。"
return command
def update_config(self):
self._config_builder(self._serialize())
def subcommand(
com: str, description: str, usage: str = ""
self, command: str, description: str, usage: str = ""
) -> Callable[[Handler], Handler]:
def decorator(func: Handler):
subcommands[com] = HandlerInfo(func, usage, description)
self.register(command, HandlerInfo(func, usage, description))
return func
return decorator
def subcommand_alias(com: str) -> str:
return InteractiveConfig.aliases.get(com) or com
def subcommand_from_alias(com: str) -> str:
return InteractiveConfig.reversed_aliases.get(com) or com
cmdman = CommandManager(
config=yaml.safe_load(open(COMMAND_CONFIG_PATH, mode="r"))
if os.path.exists(COMMAND_CONFIG_PATH)
else {},
config_builder=lambda d: yaml.safe_dump(d, open(COMMAND_CONFIG_PATH, mode="w")),
)
def generate_usage() -> str:
return "\n".join(
f"`{PREFIX}{root_command()} {subcommand_alias(com)} {info.usage}`\n{info.description}"
for com, info in subcommands.items()
f"`{PREFIX}{root_command} {command} {info.usage}`\n{info.description}"
for command, info in cmdman.get_available_commands(True).items()
)
@ -245,22 +299,22 @@ async def send_illust(message: Message, illust: Illust) -> None:
async def report_error(message: Message, ex: Exception) -> None:
if isinstance(ex, NoTokenError):
await message.edit(f"没有配置 Token 诶,要不发送 `{PREFIX}{root_command()} help` 看看帮助?")
await message.edit(f"没有配置 Token 诶,要不发送 `{PREFIX}{root_command} help` 看看帮助?")
else:
error = f"{type(ex).__name__}: {ex}"
await message.edit("呜呜呜 ~ 出错了:\n" + error)
logs.error(message)
@subcommand(
@cmdman.subcommand(
"search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]"
)
async def search(_: Client, message: Message) -> None:
await message.edit("正在发送中请耐心等待www")
keywords = message.arguments
if not keywords:
await message.edit("没有关键词我怎么搜索?")
return
await message.edit("正在发送中请耐心等待www")
api = await get_api()
response = await api.search_illust(
keywords, search_target="partial_match_for_tags"
@ -275,7 +329,7 @@ async def search(_: Client, message: Message) -> None:
await message.safe_delete()
@subcommand(
@cmdman.subcommand(
"recommend",
"获取 Pixiv 每日推荐,可传入多个 Tag 参数筛选目标结果,并随机选取一张图发送",
"[Tag] ... [R-18 / R-18G]",
@ -297,69 +351,58 @@ async def recommend(_: Client, message: Message) -> None:
await message.safe_delete()
@subcommand("help", "获取插件帮助")
@cmdman.subcommand("help", "获取插件帮助")
async def help_cmd(_: Client, message: Message) -> None:
await message.edit(
f"{PLUGIN_NAME} 插件使用帮助: {HELP_URL}", disable_web_page_preview=True
)
@subcommand("alias", "重定向子命令", "{del <子指令>|list|set <子指令> <重定向子指令>}")
@cmdman.subcommand("alias", "重定向子命令", "{del <子指令>|list|set <子指令> <重定向子指令>}")
async def alias_cmd(_: Client, message: Message) -> None:
if not message.arguments:
await message.edit(f"缺少参数了,要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
await message.edit(f"缺少参数了,要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
operation = message.parameter[0]
if operation == "del":
if len(message.parameter) != 2:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
com = message.parameter[1]
if com not in subcommands:
await message.edit("未知子指令。")
elif com not in InteractiveConfig.aliases:
await message.edit("该子指令未重定向。")
else:
del InteractiveConfig.aliases[com]
InteractiveConfig.apply()
await message.edit(f"已删除 {com} 重定向。正在重新加载 PagerMaid-Pyro。")
command = message.parameter[1]
cmdman.delete_alias(command)
await message.edit(f"已删除 `{command}` 重定向。正在重新加载 PagerMaid-Pyro。")
await reload_all()
elif operation == "list":
await message.edit(str(InteractiveConfig.aliases))
await message.edit(str(cmdman.get_aliases()))
elif operation == "set":
if len(message.parameter) != 3:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
com, alias = message.parameter[1:]
if com not in subcommands:
await message.edit("未知子指令。")
if alias in InteractiveConfig.reversed_aliases or alias in subcommands:
await message.edit("重定向冲突。")
else:
InteractiveConfig.aliases[com] = alias
InteractiveConfig.apply()
await message.edit(f"已将 {com} 重定向至 {alias}。正在重新加载 PagerMaid-Pyro。")
command, alias = message.parameter[1:]
cmdman.set_alias(command, alias)
await message.edit(f"已重定向 `{command}` 至 `{alias}`。正在重新加载 PagerMaid-Pyro。")
await reload_all()
else:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
@listener(
command=PLUGIN_NAME,
description=generate_usage(),
parameters=f"{{{'|'.join(subcommands.keys())}}}",
parameters=f"{{{'|'.join(cmdman.get_available_commands(True).keys())}}}",
)
async def message_handler(client: Client, message: Message) -> None:
try:
com: str = message.parameter[0]
command = message.parameter[0]
except IndexError:
com = ""
info = subcommands.get(subcommand_from_alias(com))
command = ""
commands = cmdman.get_available_commands(False)
info = commands.get(command)
if not info:
await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {root_command()}` 看看?")
await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {root_command}` 看看?")
return
new_message = copy.copy(message)
new_message.arguments = new_message.arguments[len(com) + 1 :]
new_message.arguments = new_message.arguments[len(command) + 1 :]
new_message.parameter = new_message.parameter[1:]
new_message.bind(client)
try: