PagerMaid_Plugins_Pyro/sticker/main.py
2022-07-17 17:23:15 +08:00

156 lines
4.8 KiB
Python

from pyrogram.raw.functions.messages import GetStickerSet
from pyrogram.raw.types import InputStickerSetShortName
from pyrogram.raw.types.messages import StickerSet
from pyrogram.types import ReplyKeyboardMarkup
from pagermaid import bot
from pagermaid.listener import listener
from pagermaid.single_utils import Message
class CannotToStickerSetError(Exception):
"""
Occurs when program cannot change a message to a sticker set
"""
def __init__(self):
super().__init__(
"无法将此消息转换为贴纸"
)
class NoStickerSetNameError(Exception):
"""
Occurs when no username is provided
"""
def __init__(self):
super().__init__(
"请先设置用户名"
)
class StickerSetFullError(Exception):
"""
Occurs when the sticker set is full
"""
def __init__(self):
super().__init__(
"贴纸包已满"
)
async def unblock_sticker_bot():
await bot.unblock_user("Stickers")
async def get_all_packs(message: Message):
async with message.bot.conversation(429000) as conv:
await conv.ask("/start")
await conv.mark_as_read()
await conv.ask("/cancel")
await conv.mark_as_read()
await conv.ask("/addsticker")
msg: Message = await conv.ask("/addsticker")
await conv.mark_as_read()
await conv.ask("/cancel")
await conv.mark_as_read()
if isinstance(msg.reply_markup, ReplyKeyboardMarkup):
packs = []
keyboard = msg.reply_markup.keyboard
for i in keyboard:
packs.extend(j for j in i if isinstance(j, str))
return packs
return []
async def get_pack(name: str):
try:
return await bot.invoke(GetStickerSet(
stickerset=InputStickerSetShortName(short_name=name),
hash=0
))
except Exception as e: # noqa
raise NoStickerSetNameError() from e
class Sticker:
message: Message
sticker_set: str
emoji: str
should_forward: Message
def __init__(self, message: Message, sticker_set: str = "", emoji: str = "😀",
should_forward: Message = None):
self.message = message
self.sticker_set = sticker_set
self.emoji = emoji
self.should_forward = should_forward
async def generate_sticker_set(self, time: int = 1):
if not self.sticker_set or time > 1:
me = await bot.get_me()
if me.username:
self.sticker_set = f"{me.username}_{time}"
try:
await self.check_pack_full()
except StickerSetFullError:
await self.generate_sticker_set(time + 1)
async def check_pack_full(self):
pack: StickerSet = await get_pack(self.sticker_set)
if pack.set.count == 120:
raise StickerSetFullError()
async def process_sticker(self, test: bool = False):
if self.should_forward and self.should_forward.sticker and \
not self.should_forward.sticker.is_video and not self.should_forward.sticker.is_animated:
if not test:
await self.should_forward.forward("Stickers")
self.emoji = self.should_forward.sticker.emoji
return
raise CannotToStickerSetError()
async def add_sticker(self):
async with self.message.bot.conversation(429000) as conv:
await conv.ask("/start")
await conv.mark_as_read()
await conv.ask("/cancel")
await conv.mark_as_read()
await conv.ask("/addsticker")
await conv.ask("/addsticker")
await conv.mark_as_read()
await conv.ask(self.sticker_set)
await conv.mark_as_read()
await self.process_sticker()
await conv.ask(self.emoji)
await conv.mark_as_read()
await conv.ask("/done")
await conv.mark_as_read()
async def to_sticker_set(self):
await self.generate_sticker_set()
if not self.sticker_set:
raise NoStickerSetNameError()
packs = await get_all_packs(self.message)
if self.sticker_set not in packs:
# TODO: add a way to add a new pack
raise NoStickerSetNameError()
await self.check_pack_full()
# TODO: add a way to change to next pack
await self.add_sticker()
@listener(command="s",
need_admin=True)
async def sticker(message: Message):
await unblock_sticker_bot()
one_sticker = Sticker(message, should_forward=message.reply_to_message)
try:
await one_sticker.process_sticker(test=True)
await one_sticker.to_sticker_set()
except Exception as e:
return await message.edit(f"收藏到贴纸包失败:{e}")
await message.edit("收藏到贴纸包成功")