diff --git a/pixiv/main.py b/pixiv/main.py
index 55a2e51..8c7ec48 100644
--- a/pixiv/main.py
+++ b/pixiv/main.py
@@ -9,12 +9,12 @@ from typing import (Any, Awaitable, Callable, Dict, List, NamedTuple, Optional,
Tuple)
import yaml
-
from pagermaid import logs
+from pagermaid.common.reload import reload_all
from pagermaid.enums import Client, Message
from pagermaid.listener import listener
from pagermaid.services import scheduler
-from pagermaid.utils import pip_install
+from pagermaid.utils import alias_command, pip_install
def install_dependencies() -> None:
@@ -25,24 +25,35 @@ def install_dependencies() -> None:
install_dependencies()
from pixivpy_async import AppPixivAPI
+from pixivpy_async.error import NoTokenError
-VERSION = "1.00"
PREFIX = ","
CONFIG_PATH = r"data/pixiv.yml"
+INTERACTIVE_CONFIG_PATH = r"data/pixiv_interactive.yml"
PLUGIN_NAME = "pixiv"
HELP_URL = r"https://www.huajitech.net/pagermaid-pixiv-plugin-help/"
_config: Dict[str, Any] = {}
+_interactive_config: Dict[str, Any] = {}
pixiv_api: Optional[AppPixivAPI] = None
with contextlib.suppress(Exception):
with open(CONFIG_PATH, mode="r") as config_file:
- _config: Dict[str, Any] = yaml.safe_load(config_file)
+ _config = yaml.safe_load(config_file)
+ with open(INTERACTIVE_CONFIG_PATH, mode="r") as interactive_config_file:
+ _interactive_config = yaml.safe_load(interactive_config_file)
def str_to_list(value: Optional[str]) -> List[str]:
return value.split(",") if value else []
+def reversed_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
+ return {v: k for k, v in d.items()}
+
+
+root_command = lambda: alias_command(PLUGIN_NAME)
+
+
Handler = Callable[[Client, Message], Awaitable[None]]
@@ -54,21 +65,32 @@ class PluginConfig:
) or _config.get("refresh_token")
message_elements: List[str] = (
- str_to_list(os.environ.get("PLUGIN_PIXIV_MESSAGE_ELEMENTS"))
- or _config.get("message_elements")
- or [
- "image",
- "id",
- "title",
- "caption",
- "tags",
- "resolution",
- "upload_time",
- "author",
- ]
+ str_to_list(os.environ.get("PLUGIN_PIXIV_MESSAGE_ELEMENTS"))
+ or _config.get("message_elements")
+ or [
+ "image",
+ "id",
+ "title",
+ "caption",
+ "tags",
+ "resolution",
+ "upload_time",
+ "author",
+ ]
)
+class InteractiveConfig:
+ aliases: Dict[str, str] = _interactive_config.get("aliases") or {}
+ reversed_aliases: Dict[str, str] = reversed_dict(aliases)
+
+ @staticmethod
+ def apply() -> None:
+ _interactive_config["aliases"] = InteractiveConfig.aliases
+ with open(INTERACTIVE_CONFIG_PATH, mode="w") as interactive_config_file:
+ yaml.safe_dump(_interactive_config, interactive_config_file)
+
+
@dataclass
class Illust:
id: int
@@ -104,28 +126,36 @@ class HandlerInfo(NamedTuple):
description: str
-command_map: Dict[str, HandlerInfo] = {}
+subcommands: Dict[str, HandlerInfo] = {}
-def command(
- com: str, description: str, usage: str = ""
+def subcommand(
+ com: str, description: str, usage: str = ""
) -> Callable[[Handler], Handler]:
def decorator(func: Handler):
- command_map[com] = HandlerInfo(func, usage, description)
+ subcommands[com] = HandlerInfo(func, usage, description)
return func
return decorator
+def subcommand_alias(com: str) -> str:
+ return InteractiveConfig.aliases.get(com) or com
+
+
+def subcommand_from_alias(com: str) -> str:
+ return InteractiveConfig.reversed_aliases.get(com) or com
+
+
def generate_usage() -> str:
return "\n".join(
- f"`{PREFIX}{PLUGIN_NAME} {com} {info.usage}`\n{info.description}"
- for com, info in command_map.items()
+ f"`{PREFIX}{root_command()} {subcommand_alias(com)} {info.usage}`\n{info.description}"
+ for com, info in subcommands.items()
)
def illust_sensitive_content_filter(
- illusts: List[Illust], keywords: str
+ illusts: List[Illust], keywords: str
) -> List[Illust]:
excluded = ["R-18", "R-18G"]
needed = set(keywords.split()).intersection(excluded)
@@ -141,7 +171,11 @@ def illust_sensitive_content_filter(
def illust_filter_by_tags(illusts: List[Illust], keywords: str) -> List[Illust]:
needed = set(keywords.split())
- return [illust for illust in illusts if (needed.intersection(illust.tags) if needed else True)]
+ return [
+ illust
+ for illust in illusts
+ if (needed.intersection(illust.tags) if needed else True)
+ ]
async def get_api() -> AppPixivAPI:
@@ -167,52 +201,66 @@ async def fetch_token() -> None:
await api.login(refresh_token=PluginConfig.refresh_token)
-async def send_illust(client: Client, chat_id: int, illust: Illust) -> None:
+async def send_illust(message: Message, illust: Illust) -> None:
elements = PluginConfig.message_elements
caption = (
- (f"**{illust.title}**\n" if "title" in elements else "")
- + (f"__{illust.caption}__\n\n" if "caption" in elements else "")
- + (
- f'ID: {illust.id}\n'
- if "id" in elements
- else ""
- )
- + (
- f'作者: {illust.author_name} '
- f'({illust.author_account})\n'
- if "author" in elements
- else ""
- )
- + (f'标签: {", ".join(illust.tags)}\n' if "tags" in elements else "")
- + (
- f"分辨率: {illust.resolution[0]}x{illust.resolution[1]}\n"
- if "resolution" in elements
- else ""
- )
- + (f"上传时间: {illust.upload_time}" if "upload_time" in elements else "")
+ (f"**{illust.title}**\n" if "title" in elements else "")
+ + (f"__{illust.caption}__\n\n" if "caption" in elements else "")
+ + (
+ f'ID: {illust.id}\n'
+ if "id" in elements
+ else ""
+ )
+ + (
+ f'作者: {illust.author_name} '
+ f"({illust.author_account})\n"
+ if "author" in elements
+ else ""
+ )
+ + (f'标签: {", ".join(illust.tags)}\n' if "tags" in elements else "")
+ + (
+ f"分辨率: {illust.resolution[0]}x{illust.resolution[1]}\n"
+ if "resolution" in elements
+ else ""
+ )
+ + (f"上传时间: {illust.upload_time}" if "upload_time" in elements else "")
)
if "image" in elements:
- await client.send_photo(chat_id, illust.image_urls["large"], caption)
+ await message.reply_photo(
+ illust.image_urls["large"],
+ caption=caption,
+ quote=False,
+ reply_to_message_id=message.reply_to_message_id
+ or message.reply_to_top_message_id,
+ )
else:
- await client.send_message(chat_id, caption)
+ await message.reply_text(
+ caption,
+ reply_to_message_id=message.reply_to_message_id
+ or message.reply_to_top_message_id,
+ )
-async def report_error(origin_message: Message, ex: Exception) -> None:
- message = f"{type(ex).__name__}: {ex}"
- await origin_message.edit("呜呜呜 ~ 出错了:\n" + message)
- logs.error(message)
+async def report_error(message: Message, ex: Exception) -> None:
+ if isinstance(ex, NoTokenError):
+ await message.edit(f"没有配置 Token 诶,要不发送 `{PREFIX}{root_command()} help` 看看帮助?")
+ else:
+ error = f"{type(ex).__name__}: {ex}"
+ await message.edit("呜呜呜 ~ 出错了:\n" + error)
+ logs.error(message)
-@command("search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]")
-async def search(client: Client, message: Message) -> None:
+@subcommand(
+ "search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]"
+)
+async def search(_: Client, message: Message) -> None:
+ await message.edit("正在发送中,请耐心等待www")
keywords = message.arguments
-
if not keywords:
await message.edit("没有关键词我怎么搜索?")
return
-
api = await get_api()
response = await api.search_illust(
keywords, search_target="partial_match_for_tags"
@@ -223,16 +271,17 @@ async def search(client: Client, message: Message) -> None:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
- await send_illust(client, message.chat.id, illust)
+ await send_illust(message, illust)
await message.safe_delete()
-@command(
+@subcommand(
"recommend",
"获取 Pixiv 每日推荐,可传入多个 Tag 参数筛选目标结果,并随机选取一张图发送",
"[Tag] ... [R-18 / R-18G]",
)
-async def recommend(client: Client, message: Message) -> None:
+async def recommend(_: Client, message: Message) -> None:
+ await message.edit("正在发送中,请耐心等待www")
keywords = message.arguments
api = await get_api()
response: Any = await api.illust_recommended()
@@ -244,33 +293,73 @@ async def recommend(client: Client, message: Message) -> None:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
- await send_illust(client, message.chat.id, illust)
+ await send_illust(message, illust)
await message.safe_delete()
-@command("help", "获取插件帮助")
+@subcommand("help", "获取插件帮助")
async def help_cmd(_: Client, message: Message) -> None:
await message.edit(
f"{PLUGIN_NAME} 插件使用帮助: {HELP_URL}", disable_web_page_preview=True
)
+@subcommand("alias", "重定向子命令", "{del <子指令>|list|set <子指令> <重定向子指令>}")
+async def alias_cmd(_: Client, message: Message) -> None:
+ if not message.arguments:
+ await message.edit(f"缺少参数了,要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
+ return
+ operation = message.parameter[0]
+ if operation == "del":
+ if len(message.parameter) != 2:
+ await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
+ return
+ com = message.parameter[1]
+ if com not in subcommands:
+ await message.edit("未知子指令。")
+ elif com not in InteractiveConfig.aliases:
+ await message.edit("该子指令未重定向。")
+ else:
+ del InteractiveConfig.aliases[com]
+ InteractiveConfig.apply()
+ await message.edit(f"已删除 {com} 重定向。正在重新加载 PagerMaid-Pyro。")
+ await reload_all()
+ elif operation == "list":
+ await message.edit(str(InteractiveConfig.aliases))
+ elif operation == "set":
+ if len(message.parameter) != 3:
+ await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
+ return
+ com, alias = message.parameter[1:]
+ if com not in subcommands:
+ await message.edit("未知子指令。")
+ if alias in InteractiveConfig.reversed_aliases or alias in subcommands:
+ await message.edit("重定向冲突。")
+ else:
+ InteractiveConfig.aliases[com] = alias
+ InteractiveConfig.apply()
+ await message.edit(f"已将 {com} 重定向至 {alias}。正在重新加载 PagerMaid-Pyro。")
+ await reload_all()
+ else:
+ await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command()}` 看看帮助?")
+
+
@listener(
command=PLUGIN_NAME,
description=generate_usage(),
- parameters=f"{{{'|'.join(command_map.keys())}}}",
+ parameters=f"{{{'|'.join(subcommands.keys())}}}",
)
async def message_handler(client: Client, message: Message) -> None:
try:
com: str = message.parameter[0]
except IndexError:
com = ""
- info = command_map.get(com)
+ info = subcommands.get(subcommand_from_alias(com))
if not info:
- await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {PLUGIN_NAME}` 看看?")
+ await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {root_command()}` 看看?")
return
new_message = copy.copy(message)
- new_message.arguments = new_message.arguments[len(com) + 1:]
+ new_message.arguments = new_message.arguments[len(com) + 1 :]
new_message.parameter = new_message.parameter[1:]
new_message.bind(client)
try: