sticker 重写

This commit is contained in:
xtaodada 2022-12-04 22:40:19 +08:00
parent 6fa1881420
commit a7df3757fc
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659

View File

@ -1,11 +1,18 @@
from pyrogram.raw.functions.messages import GetStickerSet import contextlib
from pyrogram.raw.types import InputStickerSetShortName
from pyrogram.raw.types.messages import StickerSet from asyncio import sleep
from pyrogram.types import ReplyKeyboardMarkup
from pyrogram.raw.functions.messages import GetStickerSet
from pyrogram.raw.functions.stickers import CreateStickerSet
from pyrogram.raw.types import InputStickerSetShortName, InputDocument, InputStickerSetItem
from pyrogram.raw.types.messages import StickerSet
from pyrogram.file_id import FileId
from pagermaid import bot
from pagermaid.listener import listener from pagermaid.listener import listener
from pagermaid.single_utils import Message from pagermaid.services import bot, sqlite
from pagermaid.enums import Message
from pagermaid.single_utils import safe_remove
from pagermaid.utils import alias_command
class CannotToStickerSetError(Exception): class CannotToStickerSetError(Exception):
@ -24,9 +31,9 @@ class NoStickerSetNameError(Exception):
Occurs when no username is provided Occurs when no username is provided
""" """
def __init__(self): def __init__(self, string: str = "请先设置用户名"):
super().__init__( super().__init__(
"请先设置用户名" string
) )
@ -41,30 +48,6 @@ class StickerSetFullError(Exception):
) )
async def unblock_sticker_bot():
await bot.unblock_user("Stickers")
async def get_all_packs():
async with bot.conversation(429000) as conv:
await conv.ask("/start")
await conv.mark_as_read()
await conv.ask("/cancel")
await conv.mark_as_read()
await conv.ask("/addsticker")
msg: Message = await conv.ask("/addsticker")
await conv.mark_as_read()
await conv.ask("/cancel")
await conv.mark_as_read()
if isinstance(msg.reply_markup, ReplyKeyboardMarkup):
packs = []
keyboard = msg.reply_markup.keyboard
for i in keyboard:
packs.extend(j for j in i if isinstance(j, str))
return packs
return []
async def get_pack(name: str): async def get_pack(name: str):
try: try:
return await bot.invoke(GetStickerSet( return await bot.invoke(GetStickerSet(
@ -72,84 +55,212 @@ async def get_pack(name: str):
hash=0 hash=0
)) ))
except Exception as e: # noqa except Exception as e: # noqa
raise NoStickerSetNameError() from e raise NoStickerSetNameError("贴纸名名称错误或者不存在") from e
class Sticker: class Sticker:
message: Message message: Message
sticker_set: str sticker_set: str
custom_sticker_set: bool
emoji: str emoji: str
should_forward: Message should_forward: Message
is_animated: bool
is_video: bool
document: InputDocument
document_path: str
software: str = "PagerMaid-Pyro"
def __init__(self, message: Message, sticker_set: str = "", emoji: str = "😀", def __init__(self, message: Message, sticker_set: str = "", emoji: str = "😀",
should_forward: Message = None): should_forward: Message = None):
self.message = message self.message = message
self.sticker_set = sticker_set self.sticker_set = sticker_set
self.custom_sticker_set = False
self.load_custom_sticker_set()
self.emoji = emoji self.emoji = emoji
self.should_forward = should_forward self.should_forward = should_forward
self.should_create = False
self.is_animated = False
self.is_video = False
@staticmethod
def get_custom_sticker_set():
return sqlite.get("sticker_set", None)
@staticmethod
def set_custom_sticker_get(name: str):
sqlite["sticker_set"] = name
@staticmethod
def del_custom_sticker_set():
del sqlite["sticker_set"]
def load_custom_sticker_set(self):
if name := self.get_custom_sticker_set():
self.sticker_set = name
self.custom_sticker_set = True
async def generate_sticker_set(self, time: int = 1): async def generate_sticker_set(self, time: int = 1):
if not self.sticker_set or time > 1: if not self.sticker_set or time > 1:
me = await bot.get_me() me = await bot.get_me()
if me.username: if not me.username:
self.sticker_set = f"{me.username}_{time}" raise NoStickerSetNameError()
try: self.sticker_set = f"{me.username}_{time}"
await self.check_pack_full() if self.is_video:
except StickerSetFullError: self.sticker_set += "_video"
await self.generate_sticker_set(time + 1) elif self.is_animated:
self.sticker_set += "_animated"
try:
await self.check_pack_full()
except NoStickerSetNameError as e:
self.should_create = True
except StickerSetFullError:
await self.generate_sticker_set(time + 1)
async def check_pack_full(self): async def check_pack_full(self):
pack: StickerSet = await get_pack(self.sticker_set) pack: StickerSet = await get_pack(self.sticker_set)
if pack.set.count == 120: if pack.set.count == 120:
raise StickerSetFullError() raise StickerSetFullError()
async def process_sticker(self, test: bool = False): async def process_sticker(self):
if self.should_forward and self.should_forward.sticker and \ if not (self.should_forward and self.should_forward.sticker):
not self.should_forward.sticker.is_video and not self.should_forward.sticker.is_animated: raise CannotToStickerSetError()
if not test: sticker_ = self.should_forward.sticker
await self.should_forward.forward("Stickers") self.is_video = sticker_.is_video
self.emoji = self.should_forward.sticker.emoji self.is_animated = sticker_.is_animated
return self.emoji = sticker_.emoji
raise CannotToStickerSetError() if self.is_video or self.is_animated:
self.document_path = await self.download_file()
file = FileId.decode(sticker_.file_id)
self.document = InputDocument(
id=file.media_id,
access_hash=file.access_hash,
file_reference=file.file_reference,
)
async def add_sticker(self): async def download_file(self) -> str:
return await self.should_forward.download()
async def upload_file(self):
if not self.document_path:
return
with contextlib.suppress(Exception):
msg = await bot.send_document(429000, document=self.document_path, force_document=True)
file = FileId.decode(msg.document.file_id)
self.document = InputDocument(
id=file.media_id,
access_hash=file.access_hash,
file_reference=file.file_reference,
)
safe_remove(self.document_path)
async def create_sticker_set(self):
me = await bot.get_me()
title = f"@{me.username} 的私藏" if me.username else self.sticker_set
try:
await bot.invoke(
CreateStickerSet(
user_id=await bot.resolve_peer((await bot.get_me()).id),
title=title,
short_name=self.sticker_set,
stickers=[
InputStickerSetItem(
document=self.document,
emoji=self.emoji
)
],
animated=self.is_animated,
videos=self.is_video,
)
)
except Exception as e:
raise NoStickerSetNameError("贴纸包名称非法,请换一个") from e
async def add_to_sticker_set(self):
async with bot.conversation(429000) as conv: async with bot.conversation(429000) as conv:
await conv.ask("/start") await conv.ask("/start")
await sleep(.3)
await conv.mark_as_read() await conv.mark_as_read()
await conv.ask("/cancel") await conv.ask("/cancel")
await sleep(.3)
await conv.mark_as_read() await conv.mark_as_read()
await conv.ask("/addsticker") await conv.ask("/addsticker")
await conv.ask("/addsticker") await sleep(.3)
await conv.mark_as_read() await conv.mark_as_read()
await conv.ask(self.sticker_set) resp: Message = await conv.ask(self.sticker_set)
await sleep(.3)
if resp.text == "Invalid set selected.":
raise NoStickerSetNameError("这个贴纸包好像不属于你~")
await conv.mark_as_read()
if self.is_video or self.is_animated:
await self.upload_file()
else:
await self.should_forward.forward("Stickers")
resp: Message = await conv.get_response()
await sleep(.3)
if not resp.text.startswith("Thanks!"):
raise NoStickerSetNameError("这个贴纸包类型好像不匹配~")
await conv.mark_as_read() await conv.mark_as_read()
await self.process_sticker()
await conv.ask(self.emoji) await conv.ask(self.emoji)
await sleep(.3)
await conv.mark_as_read() await conv.mark_as_read()
await conv.ask("/done") await conv.ask("/done")
await sleep(.3)
await conv.mark_as_read()
await conv.ask("/done")
await sleep(.3)
await conv.mark_as_read() await conv.mark_as_read()
async def to_sticker_set(self): async def to_sticker_set(self):
await self.generate_sticker_set() await self.generate_sticker_set()
if not self.sticker_set: if not self.sticker_set:
raise NoStickerSetNameError() raise NoStickerSetNameError()
packs = await get_all_packs() if self.should_create:
if self.sticker_set not in packs: await self.upload_file()
# TODO: add a way to add a new pack await self.create_sticker_set()
raise NoStickerSetNameError() else:
await self.check_pack_full() await self.add_to_sticker_set()
# TODO: add a way to change to next pack
await self.add_sticker() def mention(self):
return f"[{self.sticker_set}](https://t.me/addstickers/{self.sticker_set})"
def get_config(self) -> str:
pack = self.mention() if self.sticker_set else "无法保存,请设置用户名"
return f"欢迎使用 sticker 插件\n\n" \
f"将自动保存到贴纸包:{pack}\n\n" \
f"使用命令 <code>,{alias_command('s')} 贴纸包名</code> 自定义保存贴纸包\n" \
f"使用命令 <code>,{alias_command('s')} cancel</code> 取消自定义保存贴纸包"
@listener(command="s", @listener(command="s",
need_admin=True) need_admin=True)
async def sticker(message: Message): async def sticker(message: Message):
await unblock_sticker_bot()
one_sticker = Sticker(message, should_forward=message.reply_to_message) one_sticker = Sticker(message, should_forward=message.reply_to_message)
if not message.reply_to_message:
with contextlib.suppress(Exception):
await one_sticker.generate_sticker_set()
if not message.arguments:
return await message.edit(one_sticker.get_config())
elif len(message.parameter) == 1:
if message.arguments == "cancel":
if one_sticker.get_custom_sticker_set() is None:
return await message.edit("还没有设置自定义保存贴纸包")
one_sticker.del_custom_sticker_set()
return await message.edit("移除自定义保存贴纸包成功")
else:
one_sticker.sticker_set = message.arguments
try:
await one_sticker.check_pack_full()
except NoStickerSetNameError:
pass
except Exception as e:
return await message.edit(f"设置自定义贴纸包失败:{e}")
one_sticker.set_custom_sticker_get(message.arguments)
return await message.edit("设置自定义保存贴纸包成功")
else:
return await message.edit("参数错误")
try: try:
await one_sticker.process_sticker(test=True) await one_sticker.process_sticker()
await one_sticker.to_sticker_set() await one_sticker.to_sticker_set()
except Exception as e: except Exception as e:
return await message.edit(f"收藏到贴纸包失败:{e}") return await message.edit(f"收藏到贴纸包失败:{e}")
await message.edit("收藏到贴纸包成功") await message.edit(f"收藏到贴纸包 {one_sticker.mention()} 成功")