From a839a15071a4d4fdfd034218b9139deb1f063f4a Mon Sep 17 00:00:00 2001 From: Ricky8955555 Date: Fri, 27 Jan 2023 10:23:39 +0800 Subject: [PATCH] =?UTF-8?q?pixiv=20=E6=8F=90=E4=BE=9B=20Pixiv=20=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E6=9C=8D=E5=8A=A1=20(#123)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: omg-xtao <100690902+omg-xtao@users.noreply.github.com> --- list.json | 10 ++ pixiv/main.py | 269 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+) create mode 100644 pixiv/main.py diff --git a/list.json b/list.json index 41c15b6..b5926bf 100644 --- a/list.json +++ b/list.json @@ -949,6 +949,16 @@ "supported": true, "des-short": "no_mentions 自动消除某个对话的 @ 提醒", "des": "no_mentions 自动消除某个对话的 @ 提醒" + }, + { + "name": "pixiv", + "version": "1.00", + "section": "chat", + "maintainer": "ricky8955555", + "size": "7.851 kb", + "supported": true, + "des-short": "提供 Pixiv 相关服务", + "des": "提供 Pixiv 相关服务" } ] } \ No newline at end of file diff --git a/pixiv/main.py b/pixiv/main.py new file mode 100644 index 0000000..75983c2 --- /dev/null +++ b/pixiv/main.py @@ -0,0 +1,269 @@ +# pyright: basic + +import contextlib +import copy +import os +import random +from dataclasses import dataclass +from typing import (Any, Awaitable, Callable, Dict, List, NamedTuple, Optional, + Tuple) + +import yaml + +from pagermaid import logs +from pagermaid.enums import Client, Message +from pagermaid.listener import listener +from pagermaid.utils import pip_install + + +def install_dependencies() -> None: + pip_install("pixivpy-async", alias="pixivpy_async") + pip_install("aiohttp-socks", alias="aiohttp_socks") + + +install_dependencies() + +from pixivpy_async import AppPixivAPI + +VERSION = "1.00" +PREFIX = "," +CONFIG_PATH = r"data/pixiv.yml" +PLUGIN_NAME = "pixiv" +HELP_URL = r"https://www.huajitech.net/pagermaid-pixiv-plugin-help/" +_config: Dict[str, Any] = {} +pixiv_api: Optional[AppPixivAPI] = None + +with contextlib.suppress(Exception): + with open(CONFIG_PATH, mode="r") as config_file: + _config: Dict[str, Any] = yaml.safe_load(config_file) + + +def str_to_list(value: Optional[str]) -> List[str]: + return value.split(",") if value else [] + + +Handler = Callable[[Client, Message], Awaitable[None]] + + +class PluginConfig: + proxy: Optional[str] = os.environ.get("PLUGIN_PIXIV_PROXY") or _config.get("proxy") + + refresh_token: Optional[str] = os.environ.get( + "PLUGIN_PIXIV_REFRESH_TOKEN" + ) or _config.get("refresh_token") + + message_elements: List[str] = ( + str_to_list(os.environ.get("PLUGIN_PIXIV_MESSAGE_ELEMENTS")) + or _config.get("message_elements") + or [ + "image", + "id", + "title", + "caption", + "tags", + "resolution", + "upload_time", + "author", + ] + ) + + +@dataclass +class Illust: + id: int + title: str + caption: str + tags: List[str] + image_urls: Dict[str, str] + resolution: Tuple[int, int] + upload_time: str + author_id: str + author_account: str + author_name: str + + @staticmethod + def from_response(res: Any) -> "Illust": + return Illust( + res.id, + res.title, + res.caption, + [tag.translated_name or tag.name for tag in res.tags], + dict(res.image_urls), + (res.width, res.height), + res.create_date, + res.user.id, + res.user.account, + res.user.name, + ) + + +class HandlerInfo(NamedTuple): + func: Handler + usage: str + description: str + + +command_map: Dict[str, HandlerInfo] = {} + + +def command( + com: str, description: str, usage: str = "" +) -> Callable[[Handler], Handler]: + def decorator(func: Handler): + command_map[com] = HandlerInfo(func, usage, description) + return func + + return decorator + + +def generate_usage() -> str: + return "\n".join( + f"`{PREFIX}{PLUGIN_NAME} {com} {info.usage}`\n{info.description}" + for com, info in command_map.items() + ) + + +def illust_sensitive_content_filter( + illusts: List[Illust], keywords: str +) -> List[Illust]: + excluded = ["R-18", "R-18G"] + needed = set(keywords.split()).intersection(excluded) + excluded = set(excluded).difference(needed) + + return [ + illust + for illust in illusts + if not excluded.intersection(illust.tags) + and (needed.intersection(illust.tags) if needed else True) + ] + + +def illust_filter_by_tags(illusts: List[Illust], keywords: str) -> List[Illust]: + needed = set(keywords.split()) + return [illust for illust in illusts if needed.intersection(illust.tags) or True] + + +async def get_api() -> AppPixivAPI: + global pixiv_api + if pixiv_api: + return pixiv_api + pixiv_api = AppPixivAPI(proxy=PluginConfig.proxy) + + if PluginConfig.refresh_token is None: + logs.info(f"未设置 {PLUGIN_NAME} 插件登录所需的 refresh_token,将以游客身份工作。") + else: + await pixiv_api.login(refresh_token=PluginConfig.refresh_token) + + return pixiv_api + + +async def send_illust(client: Client, chat_id: int, illust: Illust) -> None: + elements = PluginConfig.message_elements + + caption = ( + (f"**{illust.title}**\n" if "title" in elements else "") + + (f"__{illust.caption}__\n\n" if "caption" in elements else "") + + ( + f'ID: {illust.id}\n' + if "id" in elements + else "" + ) + + ( + f'作者: {illust.author_name} ' + f'({illust.author_account})\n' + if "author" in elements + else "" + ) + + (f'标签: {", ".join(illust.tags)}\n' if "tags" in elements else "") + + ( + f"分辨率: {illust.resolution[0]}x{illust.resolution[1]}\n" + if "resolution" in elements + else "" + ) + + (f"上传时间: {illust.upload_time}" if "upload_time" in elements else "") + ) + + if "image" in elements: + await client.send_photo(chat_id, illust.image_urls["large"], caption) + else: + await client.send_message(chat_id, caption) + + +async def report_error(origin_message: Message, ex: Exception) -> None: + message = f"{type(ex).__name__}: {ex}" + await origin_message.edit("呜呜呜 ~ 出错了:\n" + message) + logs.error(message) + + +@command("search", "通过关键词(可传入多个)搜索 Pixiv 相关插图,并随机选取一张图发送", "<关键词> ... [R-18 / R-18G]") +async def search(client: Client, message: Message) -> None: + keywords = message.arguments + + if not keywords: + await message.edit("没有关键词我怎么搜索?") + return + + api = await get_api() + response = await api.search_illust( + keywords, search_target="partial_match_for_tags" + ) # partial match + illusts = [Illust.from_response(illust) for illust in response.illusts] + filtered_illusts = illust_sensitive_content_filter(illusts, keywords) + if not filtered_illusts: + await message.edit("呜呜呜 ~ 没有找到相应结果。") + return + illust = random.choice(filtered_illusts) + await send_illust(client, message.chat.id, illust) + await message.safe_delete() + + +@command( + "recommend", + "获取 Pixiv 每日推荐,可传入多个 Tag 参数筛选目标结果,并随机选取一张图发送", + "[Tag] ... [R-18 / R-18G]", +) +async def recommend(client: Client, message: Message) -> None: + keywords = message.arguments + api = await get_api() + response: Any = await api.illust_recommended() + illusts = [Illust.from_response(illust) for illust in response.illusts] + filtered_illusts = illust_filter_by_tags( + illust_sensitive_content_filter(illusts, keywords), keywords + ) + if not filtered_illusts: + await message.edit("呜呜呜 ~ 没有找到相应结果。") + return + illust = random.choice(filtered_illusts) + await send_illust(client, message.chat.id, illust) + await message.safe_delete() + + +@command("help", "获取插件帮助") +async def help_cmd(_: Client, message: Message) -> None: + await message.edit( + f"{PLUGIN_NAME} 插件使用帮助: {HELP_URL}", disable_web_page_preview=True + ) + + +@listener( + command=PLUGIN_NAME, + description=generate_usage(), + parameters=f"{{{'|'.join(command_map.keys())}}}", +) +async def message_handler(client: Client, message: Message) -> None: + try: + com: str = message.parameter[0] + except IndexError: + com = "" + info = command_map.get(com) + if not info: + await message.edit(f"我看不懂你发了什么诶。要不发送 `{PREFIX}help {PLUGIN_NAME}` 看看?") + return + new_message = copy.copy(message) + new_message.arguments = new_message.arguments[len(com) + 1:] + new_message.parameter = new_message.parameter[1:] + new_message.bind(client) + try: + return await info.func(client, new_message) + except Exception as ex: + await report_error(message, ex)