From a7df3757fc90df72b20458e44108fa2cffeb75ea Mon Sep 17 00:00:00 2001 From: xtaodada Date: Sun, 4 Dec 2022 22:40:19 +0800 Subject: [PATCH] =?UTF-8?q?sticker=20=E9=87=8D=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sticker/main.py | 233 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 172 insertions(+), 61 deletions(-) diff --git a/sticker/main.py b/sticker/main.py index 5a77c3e..93298cb 100644 --- a/sticker/main.py +++ b/sticker/main.py @@ -1,11 +1,18 @@ -from pyrogram.raw.functions.messages import GetStickerSet -from pyrogram.raw.types import InputStickerSetShortName -from pyrogram.raw.types.messages import StickerSet -from pyrogram.types import ReplyKeyboardMarkup +import contextlib + +from asyncio import sleep + +from pyrogram.raw.functions.messages import GetStickerSet +from pyrogram.raw.functions.stickers import CreateStickerSet +from pyrogram.raw.types import InputStickerSetShortName, InputDocument, InputStickerSetItem +from pyrogram.raw.types.messages import StickerSet +from pyrogram.file_id import FileId -from pagermaid import bot from pagermaid.listener import listener -from pagermaid.single_utils import Message +from pagermaid.services import bot, sqlite +from pagermaid.enums import Message +from pagermaid.single_utils import safe_remove +from pagermaid.utils import alias_command class CannotToStickerSetError(Exception): @@ -24,9 +31,9 @@ class NoStickerSetNameError(Exception): Occurs when no username is provided """ - def __init__(self): + def __init__(self, string: str = "请先设置用户名"): super().__init__( - "请先设置用户名" + string ) @@ -41,30 +48,6 @@ class StickerSetFullError(Exception): ) -async def unblock_sticker_bot(): - await bot.unblock_user("Stickers") - - -async def get_all_packs(): - async with bot.conversation(429000) as conv: - await conv.ask("/start") - await conv.mark_as_read() - await conv.ask("/cancel") - await conv.mark_as_read() - await conv.ask("/addsticker") - msg: Message = await conv.ask("/addsticker") - await conv.mark_as_read() - await conv.ask("/cancel") - await conv.mark_as_read() - if isinstance(msg.reply_markup, ReplyKeyboardMarkup): - packs = [] - keyboard = msg.reply_markup.keyboard - for i in keyboard: - packs.extend(j for j in i if isinstance(j, str)) - return packs - return [] - - async def get_pack(name: str): try: return await bot.invoke(GetStickerSet( @@ -72,84 +55,212 @@ async def get_pack(name: str): hash=0 )) except Exception as e: # noqa - raise NoStickerSetNameError() from e + raise NoStickerSetNameError("贴纸名名称错误或者不存在") from e class Sticker: message: Message sticker_set: str + custom_sticker_set: bool emoji: str should_forward: Message + is_animated: bool + is_video: bool + document: InputDocument + document_path: str + software: str = "PagerMaid-Pyro" def __init__(self, message: Message, sticker_set: str = "", emoji: str = "😀", should_forward: Message = None): self.message = message self.sticker_set = sticker_set + self.custom_sticker_set = False + self.load_custom_sticker_set() self.emoji = emoji self.should_forward = should_forward + self.should_create = False + self.is_animated = False + self.is_video = False + + @staticmethod + def get_custom_sticker_set(): + return sqlite.get("sticker_set", None) + + @staticmethod + def set_custom_sticker_get(name: str): + sqlite["sticker_set"] = name + + @staticmethod + def del_custom_sticker_set(): + del sqlite["sticker_set"] + + def load_custom_sticker_set(self): + if name := self.get_custom_sticker_set(): + self.sticker_set = name + self.custom_sticker_set = True async def generate_sticker_set(self, time: int = 1): if not self.sticker_set or time > 1: me = await bot.get_me() - if me.username: - self.sticker_set = f"{me.username}_{time}" - try: - await self.check_pack_full() - except StickerSetFullError: - await self.generate_sticker_set(time + 1) + if not me.username: + raise NoStickerSetNameError() + self.sticker_set = f"{me.username}_{time}" + if self.is_video: + self.sticker_set += "_video" + elif self.is_animated: + self.sticker_set += "_animated" + try: + await self.check_pack_full() + except NoStickerSetNameError as e: + self.should_create = True + except StickerSetFullError: + await self.generate_sticker_set(time + 1) async def check_pack_full(self): pack: StickerSet = await get_pack(self.sticker_set) if pack.set.count == 120: raise StickerSetFullError() - async def process_sticker(self, test: bool = False): - if self.should_forward and self.should_forward.sticker and \ - not self.should_forward.sticker.is_video and not self.should_forward.sticker.is_animated: - if not test: - await self.should_forward.forward("Stickers") - self.emoji = self.should_forward.sticker.emoji - return - raise CannotToStickerSetError() + async def process_sticker(self): + if not (self.should_forward and self.should_forward.sticker): + raise CannotToStickerSetError() + sticker_ = self.should_forward.sticker + self.is_video = sticker_.is_video + self.is_animated = sticker_.is_animated + self.emoji = sticker_.emoji + if self.is_video or self.is_animated: + self.document_path = await self.download_file() + file = FileId.decode(sticker_.file_id) + self.document = InputDocument( + id=file.media_id, + access_hash=file.access_hash, + file_reference=file.file_reference, + ) - async def add_sticker(self): + async def download_file(self) -> str: + return await self.should_forward.download() + + async def upload_file(self): + if not self.document_path: + return + with contextlib.suppress(Exception): + msg = await bot.send_document(429000, document=self.document_path, force_document=True) + file = FileId.decode(msg.document.file_id) + self.document = InputDocument( + id=file.media_id, + access_hash=file.access_hash, + file_reference=file.file_reference, + ) + safe_remove(self.document_path) + + async def create_sticker_set(self): + me = await bot.get_me() + title = f"@{me.username} 的私藏" if me.username else self.sticker_set + try: + await bot.invoke( + CreateStickerSet( + user_id=await bot.resolve_peer((await bot.get_me()).id), + title=title, + short_name=self.sticker_set, + stickers=[ + InputStickerSetItem( + document=self.document, + emoji=self.emoji + ) + ], + animated=self.is_animated, + videos=self.is_video, + ) + ) + except Exception as e: + raise NoStickerSetNameError("贴纸包名称非法,请换一个") from e + + async def add_to_sticker_set(self): async with bot.conversation(429000) as conv: await conv.ask("/start") + await sleep(.3) await conv.mark_as_read() await conv.ask("/cancel") + await sleep(.3) await conv.mark_as_read() await conv.ask("/addsticker") - await conv.ask("/addsticker") + await sleep(.3) await conv.mark_as_read() - await conv.ask(self.sticker_set) + resp: Message = await conv.ask(self.sticker_set) + await sleep(.3) + if resp.text == "Invalid set selected.": + raise NoStickerSetNameError("这个贴纸包好像不属于你~") + await conv.mark_as_read() + if self.is_video or self.is_animated: + await self.upload_file() + else: + await self.should_forward.forward("Stickers") + resp: Message = await conv.get_response() + await sleep(.3) + if not resp.text.startswith("Thanks!"): + raise NoStickerSetNameError("这个贴纸包类型好像不匹配~") await conv.mark_as_read() - await self.process_sticker() await conv.ask(self.emoji) + await sleep(.3) await conv.mark_as_read() await conv.ask("/done") + await sleep(.3) + await conv.mark_as_read() + await conv.ask("/done") + await sleep(.3) await conv.mark_as_read() async def to_sticker_set(self): await self.generate_sticker_set() if not self.sticker_set: raise NoStickerSetNameError() - packs = await get_all_packs() - if self.sticker_set not in packs: - # TODO: add a way to add a new pack - raise NoStickerSetNameError() - await self.check_pack_full() - # TODO: add a way to change to next pack - await self.add_sticker() + if self.should_create: + await self.upload_file() + await self.create_sticker_set() + else: + await self.add_to_sticker_set() + + def mention(self): + return f"[{self.sticker_set}](https://t.me/addstickers/{self.sticker_set})" + + def get_config(self) -> str: + pack = self.mention() if self.sticker_set else "无法保存,请设置用户名" + return f"欢迎使用 sticker 插件\n\n" \ + f"将自动保存到贴纸包:{pack}\n\n" \ + f"使用命令 ,{alias_command('s')} 贴纸包名 自定义保存贴纸包\n" \ + f"使用命令 ,{alias_command('s')} cancel 取消自定义保存贴纸包" @listener(command="s", need_admin=True) async def sticker(message: Message): - await unblock_sticker_bot() one_sticker = Sticker(message, should_forward=message.reply_to_message) + if not message.reply_to_message: + with contextlib.suppress(Exception): + await one_sticker.generate_sticker_set() + if not message.arguments: + return await message.edit(one_sticker.get_config()) + elif len(message.parameter) == 1: + if message.arguments == "cancel": + if one_sticker.get_custom_sticker_set() is None: + return await message.edit("还没有设置自定义保存贴纸包") + one_sticker.del_custom_sticker_set() + return await message.edit("移除自定义保存贴纸包成功") + else: + one_sticker.sticker_set = message.arguments + try: + await one_sticker.check_pack_full() + except NoStickerSetNameError: + pass + except Exception as e: + return await message.edit(f"设置自定义贴纸包失败:{e}") + one_sticker.set_custom_sticker_get(message.arguments) + return await message.edit("设置自定义保存贴纸包成功") + else: + return await message.edit("参数错误") try: - await one_sticker.process_sticker(test=True) + await one_sticker.process_sticker() await one_sticker.to_sticker_set() except Exception as e: return await message.edit(f"收藏到贴纸包失败:{e}") - await message.edit("收藏到贴纸包成功") + await message.edit(f"收藏到贴纸包 {one_sticker.mention()} 成功")