# pyright: basic
import contextlib
import copy
import os
import random
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Dict, List, NamedTuple, Optional, Tuple
import yaml
from pagermaid.common.reload import reload_all
from pagermaid.enums import Client, Message
from pagermaid.listener import listener
from pagermaid.services import scheduler
from pagermaid.utils import alias_command, pip_install, logs
def install_dependencies() -> None:
pip_install("pixivpy-async", alias="pixivpy_async")
pip_install("aiohttp-socks", alias="aiohttp_socks")
install_dependencies()
from pixivpy_async import AppPixivAPI
from pixivpy_async.error import NoTokenError
PREFIX = ","
CONFIG_PATH = r"data/pixiv.yml"
COMMAND_CONFIG_PATH = r"data/pixiv_command.yml"
PLUGIN_NAME = "pixiv"
HELP_URL = r"https://www.huajitech.net/pagermaid-pixiv-plugin-help/"
_config: Dict[str, Any] = {}
pixiv_api: Optional[AppPixivAPI] = None
with contextlib.suppress(Exception):
with open(CONFIG_PATH, mode="r") as config_file:
_config = yaml.safe_load(config_file)
def str_to_list(value: Optional[str]) -> List[str]:
return value.split(",") if value else []
def reversed_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
return {v: k for k, v in d.items()}
root_command = alias_command(PLUGIN_NAME)
Handler = Callable[[Client, Message], Awaitable[None]]
class PluginConfig:
proxy: Optional[str] = os.environ.get("PLUGIN_PIXIV_PROXY") or _config.get("proxy")
refresh_token: Optional[str] = os.environ.get(
"PLUGIN_PIXIV_REFRESH_TOKEN"
) or _config.get("refresh_token")
message_elements: List[str] = (
str_to_list(os.environ.get("PLUGIN_PIXIV_MESSAGE_ELEMENTS"))
or _config.get("message_elements")
or [
"image",
"id",
"title",
"caption",
"tags",
"resolution",
"upload_time",
"author",
]
)
@dataclass
class Illust:
id: int
title: str
caption: str
tags: List[str]
image_urls: Dict[str, str]
resolution: Tuple[int, int]
upload_time: str
author_id: str
author_account: str
author_name: str
@staticmethod
def from_response(res: Any) -> "Illust":
return Illust(
res.id,
res.title,
res.caption,
[tag.translated_name or tag.name for tag in res.tags],
dict(res.image_urls),
(res.width, res.height),
res.create_date,
res.user.id,
res.user.account,
res.user.name,
)
class HandlerInfo(NamedTuple):
func: Handler
usage: str
description: str
class CommandManager:
_commands: Dict[str, HandlerInfo]
_aliases: Dict[str, str]
_reversed_aliases: Dict[str, str]
_config_builder: Callable[[Dict[Any, Any]], None]
def __init__(
self, config: Dict[Any, Any], config_builder: Callable[[Dict[Any, Any]], None]
) -> None:
self._commands = {}
self._aliases = {}
self._reversed_aliases = {}
self._config_builder = config_builder
self._read_config(config)
def _read_config(self, config: Dict[Any, Any]) -> None:
self._aliases = config.get("aliases") or {}
self._reversed_aliases = reversed_dict(self._aliases)
def _serialize(self) -> Dict[Any, Any]:
return {"aliases": self._aliases}
def register(self, command: str, handler: HandlerInfo):
assert command not in self._commands, f"无法找到 {command} 指令。"
self._commands[command] = handler
def set_alias(self, command: str, alias: str) -> None:
assert command in self._commands, f"无法找到 {command} 指令。"
assert (
alias not in self._commands and alias not in self._aliases.values()
), "重定向指令冲突。"
self._aliases[command] = alias
self.update_config()
def delete_alias(self, command: str) -> None:
assert command in self._aliases, f"{command} 指令未设置重定向。"
del self._aliases[command]
self.update_config()
def get_aliases(self) -> Dict[str, str]:
return dict(self._aliases)
def get_available_commands(self, alias_only: bool = True) -> Dict[str, HandlerInfo]:
aliases = {
alias: self._commands[command] for command, alias in self._aliases.items()
}
return (
{
command: handler
for command, handler in self._commands.items()
if command not in self._aliases
}
if alias_only
else dict(self._commands)
) | aliases
def alias_of(self, command: str) -> str:
alias = self._aliases.get(command) or (
command if command in self._commands else None
)
assert alias, f"{command} 指令未设置重定向。"
return alias
def command_of(self, alias: str) -> str:
command = self._reversed_aliases.get(alias)
assert command, f"不存在 {alias} 重定向指令。"
return command
def update_config(self):
self._config_builder(self._serialize())
def subcommand(
self, command: str, description: str, usage: str = ""
) -> Callable[[Handler], Handler]:
def decorator(func: Handler):
self.register(command, HandlerInfo(func, usage, description))
return func
return decorator
cmdman = CommandManager(
config=yaml.safe_load(open(COMMAND_CONFIG_PATH, mode="r"))
if os.path.exists(COMMAND_CONFIG_PATH)
else {},
config_builder=lambda d: yaml.safe_dump(d, open(COMMAND_CONFIG_PATH, mode="w")),
)
def generate_usage() -> str:
return "\n".join(
f"`{PREFIX}{root_command} {command} {info.usage}`\n{info.description}"
for command, info in cmdman.get_available_commands(True).items()
)
def illust_sensitive_content_filter(
illusts: List[Illust], keywords: str
) -> List[Illust]:
excluded = ["R-18", "R-18G"]
needed = set(keywords.split()).intersection(excluded)
excluded = set(excluded).difference(needed)
return [
illust
for illust in illusts
if not excluded.intersection(illust.tags)
and (not needed or len(needed.intersection(illust.tags)) == len(needed))
]
def illust_filter_by_tags(illusts: List[Illust], keywords: str) -> List[Illust]:
needed = set(keywords.split())
return [
illust
for illust in illusts
if not needed or len(needed.intersection(illust.tags)) == len(needed)
]
async def get_api() -> AppPixivAPI:
global pixiv_api
if pixiv_api:
return pixiv_api
pixiv_api = AppPixivAPI(proxy=PluginConfig.proxy)
if PluginConfig.refresh_token is None:
logs.info(f"未设置 {PLUGIN_NAME} 插件登录所需的 refresh_token,将以游客身份工作。")
else:
await pixiv_api.login(refresh_token=PluginConfig.refresh_token)
return pixiv_api
@scheduler.scheduled_job("interval", minutes=30, id="pixiv_fetch_token")
async def fetch_token() -> None:
if not PluginConfig.refresh_token:
return
api = await get_api()
if api:
await api.login(refresh_token=PluginConfig.refresh_token)
async def send_illust(message: Message, illust: Illust) -> None:
elements = PluginConfig.message_elements
caption = (
(f"**{illust.title}**\n" if "title" in elements else "")
+ (f"__{illust.caption}__\n\n" if "caption" in elements else "")
+ (
f'ID: {illust.id}\n'
if "id" in elements
else ""
)
+ (
f'作者: {illust.author_name} '
f"({illust.author_account})\n"
if "author" in elements
else ""
)
+ (f'标签: {", ".join(illust.tags)}\n' if "tags" in elements else "")
+ (
f"分辨率: {illust.resolution[0]}x{illust.resolution[1]}\n"
if "resolution" in elements
else ""
)
+ (f"上传时间: {illust.upload_time}" if "upload_time" in elements else "")
)
if "image" in elements:
await message.reply_photo(
illust.image_urls["large"],
caption=caption,
quote=False,
reply_to_message_id=message.reply_to_message_id,
message_thread_id=message.message_thread_id,
)
else:
await message.reply_text(
caption,
reply_to_message_id=message.reply_to_message_id,
message_thread_id=message.message_thread_id,
)
async def report_error(message: Message, ex: Exception) -> None:
if isinstance(ex, NoTokenError):
await message.edit(f"没有配置 Token 诶,要不发送 `{PREFIX}{root_command} help` 看看帮助?")
else:
error = f"{type(ex).__name__}: {ex}"
await message.edit("呜呜呜 ~ 出错了:\n" + error)
logs.error(message)
@cmdman.subcommand(
"search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]"
)
async def search(_: Client, message: Message) -> None:
keywords = message.arguments
if not keywords:
await message.edit("没有关键词我怎么搜索?")
return
await message.edit("正在发送中,请耐心等待www")
api = await get_api()
response = await api.search_illust(
keywords, search_target="partial_match_for_tags"
) # partial match
illusts = [Illust.from_response(illust) for illust in response.illusts]
filtered_illusts = illust_sensitive_content_filter(illusts, keywords)
if not filtered_illusts:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
await send_illust(message, illust)
await message.safe_delete()
@cmdman.subcommand(
"recommend",
"获取 Pixiv 每日推荐,可传入多个 Tag 参数筛选目标结果,并随机选取一张图发送",
"[Tag] ... [R-18 / R-18G]",
)
async def recommend(_: Client, message: Message) -> None:
await message.edit("正在发送中,请耐心等待www")
keywords = message.arguments
api = await get_api()
response: Any = await api.illust_recommended()
illusts = [Illust.from_response(illust) for illust in response.illusts]
filtered_illusts = illust_filter_by_tags(
illust_sensitive_content_filter(illusts, keywords), keywords
)
if not filtered_illusts:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
await send_illust(message, illust)
await message.safe_delete()
@cmdman.subcommand("help", "获取插件帮助")
async def help_cmd(_: Client, message: Message) -> None:
await message.edit(
f"{PLUGIN_NAME} 插件使用帮助: {HELP_URL}", disable_web_page_preview=True
)
@cmdman.subcommand("id", "根据 ID 获取 Pixiv 相关插图", "")
async def id_cmd(_: Client, message: Message) -> None:
try:
id_ = int(message.arguments)
except ValueError:
await message.edit("你输入的不是正确的 ID 诶www")
return
await message.edit("正在发送中,请耐心等待www")
api = await get_api()
response = await api.illust_detail(id_)
if not (response := response.get("illust")):
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = Illust.from_response(response)
await send_illust(message, illust)
await message.safe_delete()
@cmdman.subcommand("alias", "重定向子命令", "{del <子指令>|list|set <子指令> <重定向子指令>}")
async def alias_cmd(_: Client, message: Message) -> None:
if not message.arguments:
await message.edit(f"缺少参数了,要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
operation = message.parameter[0]
if operation == "del":
if len(message.parameter) != 2:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
command = message.parameter[1]
cmdman.delete_alias(command)
await message.edit(f"已删除 `{command}` 重定向。正在重新加载 PagerMaid-Pyro。")
await reload_all()
elif operation == "list":
await message.edit(str(cmdman.get_aliases()))
elif operation == "set":
if len(message.parameter) != 3:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
command, alias = message.parameter[1:]
cmdman.set_alias(command, alias)
await message.edit(f"已重定向 `{command}` 至 `{alias}`。正在重新加载 PagerMaid-Pyro。")
await reload_all()
else:
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
@listener(
command=PLUGIN_NAME,
description=generate_usage(),
parameters=f"{{{'|'.join(cmdman.get_available_commands(True).keys())}}}",
)
async def message_handler(client: Client, message: Message) -> None:
try:
command = message.parameter[0]
except IndexError:
command = ""
commands = cmdman.get_available_commands(False)
info = commands.get(command)
if not info:
await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {root_command}` 看看?")
return
new_message = copy.copy(message)
new_message.arguments = new_message.arguments[len(command) + 1 :]
new_message.parameter = new_message.parameter[1:]
new_message.bind(client)
try:
return await info.func(client, new_message)
except Exception as ex:
await report_error(message, ex)