PagerMaid_Plugins_Pyro/pixiv/main.py

412 lines
13 KiB
Python
Raw Normal View History

# pyright: basic
import contextlib
import copy
import os
import random
from dataclasses import dataclass
from typing import (Any, Awaitable, Callable, Dict, List, NamedTuple, Optional,
Tuple)
import yaml
from pagermaid import logs
from pagermaid.common.reload import reload_all
from pagermaid.enums import Client, Message
from pagermaid.listener import listener
2023-01-27 04:18:12 +00:00
from pagermaid.services import scheduler
from pagermaid.utils import alias_command, pip_install
def install_dependencies() -> None:
pip_install("pixivpy-async", alias="pixivpy_async")
pip_install("aiohttp-socks", alias="aiohttp_socks")
install_dependencies()
from pixivpy_async import AppPixivAPI
from pixivpy_async.error import NoTokenError
PREFIX = ","
CONFIG_PATH = r"data/pixiv.yml"
2023-04-15 09:13:01 +00:00
COMMAND_CONFIG_PATH = r"data/pixiv_command.yml"
PLUGIN_NAME = "pixiv"
HELP_URL = r"https://www.huajitech.net/pagermaid-pixiv-plugin-help/"
_config: Dict[str, Any] = {}
pixiv_api: Optional[AppPixivAPI] = None
with contextlib.suppress(Exception):
with open(CONFIG_PATH, mode="r") as config_file:
_config = yaml.safe_load(config_file)
def str_to_list(value: Optional[str]) -> List[str]:
return value.split(",") if value else []
def reversed_dict(d: Dict[Any, Any]) -> Dict[Any, Any]:
return {v: k for k, v in d.items()}
2023-04-15 09:13:01 +00:00
root_command = alias_command(PLUGIN_NAME)
Handler = Callable[[Client, Message], Awaitable[None]]
class PluginConfig:
proxy: Optional[str] = os.environ.get("PLUGIN_PIXIV_PROXY") or _config.get("proxy")
refresh_token: Optional[str] = os.environ.get(
"PLUGIN_PIXIV_REFRESH_TOKEN"
) or _config.get("refresh_token")
message_elements: List[str] = (
str_to_list(os.environ.get("PLUGIN_PIXIV_MESSAGE_ELEMENTS"))
or _config.get("message_elements")
or [
"image",
"id",
"title",
"caption",
"tags",
"resolution",
"upload_time",
"author",
]
)
@dataclass
class Illust:
id: int
title: str
caption: str
tags: List[str]
image_urls: Dict[str, str]
resolution: Tuple[int, int]
upload_time: str
author_id: str
author_account: str
author_name: str
@staticmethod
def from_response(res: Any) -> "Illust":
return Illust(
res.id,
res.title,
res.caption,
[tag.translated_name or tag.name for tag in res.tags],
dict(res.image_urls),
(res.width, res.height),
res.create_date,
res.user.id,
res.user.account,
res.user.name,
)
class HandlerInfo(NamedTuple):
func: Handler
usage: str
description: str
2023-04-15 09:13:01 +00:00
class CommandManager:
_commands: Dict[str, HandlerInfo]
_aliases: Dict[str, str]
_reversed_aliases: Dict[str, str]
_config_builder: Callable[[Dict[Any, Any]], None]
def __init__(
self, config: Dict[Any, Any], config_builder: Callable[[Dict[Any, Any]], None]
) -> None:
self._commands = {}
self._aliases = {}
self._reversed_aliases = {}
self._config_builder = config_builder
self._read_config(config)
def _read_config(self, config: Dict[Any, Any]) -> None:
self._aliases = config.get("aliases") or {}
self._reversed_aliases = reversed_dict(self._aliases)
def _serialize(self) -> Dict[Any, Any]:
return {"aliases": self._aliases}
def register(self, command: str, handler: HandlerInfo):
assert command not in self._commands, f"无法找到 {command} 指令。"
self._commands[command] = handler
def set_alias(self, command: str, alias: str) -> None:
assert command in self._commands, f"无法找到 {command} 指令。"
assert (
alias not in self._commands and alias not in self._aliases.values()
), "重定向指令冲突。"
self._aliases[command] = alias
self.update_config()
def delete_alias(self, command: str) -> None:
assert command in self._aliases, f"{command} 指令未设置重定向。"
del self._aliases[command]
self.update_config()
def get_aliases(self) -> Dict[str, str]:
return dict(self._aliases)
def get_available_commands(self, alias_only: bool = True) -> Dict[str, HandlerInfo]:
aliases = {
alias: self._commands[command] for command, alias in self._aliases.items()
}
return (
{
command: handler
for command, handler in self._commands.items()
if command not in self._aliases
}
if alias_only
else dict(self._commands)
) | aliases
def alias_of(self, command: str) -> str:
alias = self._aliases.get(command) or (
command if command in self._commands else None
)
assert alias, f"{command} 指令未设置重定向。"
return alias
2023-04-15 09:13:01 +00:00
def command_of(self, alias: str) -> str:
command = self._reversed_aliases.get(alias)
assert command, f"不存在 {alias} 重定向指令。"
return command
2023-04-15 09:13:01 +00:00
def update_config(self):
self._config_builder(self._serialize())
2023-04-15 09:13:01 +00:00
def subcommand(
self, command: str, description: str, usage: str = ""
) -> Callable[[Handler], Handler]:
def decorator(func: Handler):
self.register(command, HandlerInfo(func, usage, description))
return func
2023-04-15 09:13:01 +00:00
return decorator
2023-04-15 09:13:01 +00:00
cmdman = CommandManager(
config=yaml.safe_load(open(COMMAND_CONFIG_PATH, mode="r"))
if os.path.exists(COMMAND_CONFIG_PATH)
else {},
config_builder=lambda d: yaml.safe_dump(d, open(COMMAND_CONFIG_PATH, mode="w")),
)
def generate_usage() -> str:
return "\n".join(
2023-04-15 09:13:01 +00:00
f"`{PREFIX}{root_command} {command} {info.usage}`\n{info.description}"
for command, info in cmdman.get_available_commands(True).items()
)
def illust_sensitive_content_filter(
illusts: List[Illust], keywords: str
) -> List[Illust]:
excluded = ["R-18", "R-18G"]
needed = set(keywords.split()).intersection(excluded)
excluded = set(excluded).difference(needed)
return [
illust
for illust in illusts
if not excluded.intersection(illust.tags)
and (needed.intersection(illust.tags) if needed else True)
]
def illust_filter_by_tags(illusts: List[Illust], keywords: str) -> List[Illust]:
needed = set(keywords.split())
return [
illust
for illust in illusts
if (needed.intersection(illust.tags) if needed else True)
]
async def get_api() -> AppPixivAPI:
global pixiv_api
if pixiv_api:
return pixiv_api
pixiv_api = AppPixivAPI(proxy=PluginConfig.proxy)
if PluginConfig.refresh_token is None:
logs.info(f"未设置 {PLUGIN_NAME} 插件登录所需的 refresh_token将以游客身份工作。")
else:
await pixiv_api.login(refresh_token=PluginConfig.refresh_token)
return pixiv_api
@scheduler.scheduled_job("interval", minutes=30, id="pixiv_fetch_token")
async def fetch_token() -> None:
if not PluginConfig.refresh_token:
2023-01-27 04:18:12 +00:00
return
api = await get_api()
if api:
await api.login(refresh_token=PluginConfig.refresh_token)
2023-01-27 04:18:12 +00:00
async def send_illust(message: Message, illust: Illust) -> None:
elements = PluginConfig.message_elements
caption = (
(f"**{illust.title}**\n" if "title" in elements else "")
+ (f"__{illust.caption}__\n\n" if "caption" in elements else "")
+ (
f'ID: <a href="https://www.pixiv.net/artworks/{illust.id}">{illust.id}</a>\n'
if "id" in elements
else ""
)
+ (
f'作者: <a href="https://www.pixiv.net/users/{illust.author_id}">{illust.author_name}</a> '
f"({illust.author_account})\n"
if "author" in elements
else ""
)
+ (f'标签: {", ".join(illust.tags)}\n' if "tags" in elements else "")
+ (
f"分辨率: {illust.resolution[0]}x{illust.resolution[1]}\n"
if "resolution" in elements
else ""
)
+ (f"上传时间: {illust.upload_time}" if "upload_time" in elements else "")
)
if "image" in elements:
await message.reply_photo(
illust.image_urls["large"],
caption=caption,
quote=False,
reply_to_message_id=message.reply_to_message_id
or message.reply_to_top_message_id,
)
else:
await message.reply_text(
caption,
reply_to_message_id=message.reply_to_message_id
or message.reply_to_top_message_id,
)
async def report_error(message: Message, ex: Exception) -> None:
if isinstance(ex, NoTokenError):
2023-04-15 09:13:01 +00:00
await message.edit(f"没有配置 Token 诶,要不发送 `{PREFIX}{root_command} help` 看看帮助?")
else:
error = f"{type(ex).__name__}: {ex}"
await message.edit("呜呜呜 ~ 出错了:\n" + error)
logs.error(message)
2023-04-15 09:13:01 +00:00
@cmdman.subcommand(
"search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]"
)
async def search(_: Client, message: Message) -> None:
keywords = message.arguments
if not keywords:
await message.edit("没有关键词我怎么搜索?")
return
2023-04-15 09:13:01 +00:00
await message.edit("正在发送中请耐心等待www")
api = await get_api()
response = await api.search_illust(
keywords, search_target="partial_match_for_tags"
) # partial match
illusts = [Illust.from_response(illust) for illust in response.illusts]
filtered_illusts = illust_sensitive_content_filter(illusts, keywords)
if not filtered_illusts:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
await send_illust(message, illust)
await message.safe_delete()
2023-04-15 09:13:01 +00:00
@cmdman.subcommand(
"recommend",
"获取 Pixiv 每日推荐,可传入多个 Tag 参数筛选目标结果,并随机选取一张图发送",
"[Tag] ... [R-18 / R-18G]",
)
async def recommend(_: Client, message: Message) -> None:
await message.edit("正在发送中请耐心等待www")
keywords = message.arguments
api = await get_api()
response: Any = await api.illust_recommended()
illusts = [Illust.from_response(illust) for illust in response.illusts]
filtered_illusts = illust_filter_by_tags(
illust_sensitive_content_filter(illusts, keywords), keywords
)
if not filtered_illusts:
await message.edit("呜呜呜 ~ 没有找到相应结果。")
return
illust = random.choice(filtered_illusts)
await send_illust(message, illust)
await message.safe_delete()
2023-04-15 09:13:01 +00:00
@cmdman.subcommand("help", "获取插件帮助")
async def help_cmd(_: Client, message: Message) -> None:
await message.edit(
f"{PLUGIN_NAME} 插件使用帮助: {HELP_URL}", disable_web_page_preview=True
)
2023-04-15 09:13:01 +00:00
@cmdman.subcommand("alias", "重定向子命令", "{del <子指令>|list|set <子指令> <重定向子指令>}")
async def alias_cmd(_: Client, message: Message) -> None:
if not message.arguments:
2023-04-15 09:13:01 +00:00
await message.edit(f"缺少参数了,要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
operation = message.parameter[0]
if operation == "del":
if len(message.parameter) != 2:
2023-04-15 09:13:01 +00:00
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
2023-04-15 09:13:01 +00:00
command = message.parameter[1]
cmdman.delete_alias(command)
await message.edit(f"已删除 `{command}` 重定向。正在重新加载 PagerMaid-Pyro。")
await reload_all()
elif operation == "list":
2023-04-15 09:13:01 +00:00
await message.edit(str(cmdman.get_aliases()))
elif operation == "set":
if len(message.parameter) != 3:
2023-04-15 09:13:01 +00:00
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
return
2023-04-15 09:13:01 +00:00
command, alias = message.parameter[1:]
cmdman.set_alias(command, alias)
await message.edit(f"已重定向 `{command}` 至 `{alias}`。正在重新加载 PagerMaid-Pyro。")
await reload_all()
else:
2023-04-15 09:13:01 +00:00
await message.edit(f"参数错误。要不发送 `{PREFIX}help {root_command}` 看看帮助?")
@listener(
command=PLUGIN_NAME,
description=generate_usage(),
2023-04-15 09:13:01 +00:00
parameters=f"{{{'|'.join(cmdman.get_available_commands(True).keys())}}}",
)
async def message_handler(client: Client, message: Message) -> None:
try:
2023-04-15 09:13:01 +00:00
command = message.parameter[0]
except IndexError:
2023-04-15 09:13:01 +00:00
command = ""
commands = cmdman.get_available_commands(False)
info = commands.get(command)
if not info:
2023-04-15 09:13:01 +00:00
await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {root_command}` 看看?")
return
new_message = copy.copy(message)
2023-04-15 09:13:01 +00:00
new_message.arguments = new_message.arguments[len(command) + 1 :]
new_message.parameter = new_message.parameter[1:]
new_message.bind(client)
try:
return await info.func(client, new_message)
except Exception as ex:
await report_error(message, ex)