2022-12-04 14:40:19 +00:00
|
|
|
import contextlib
|
|
|
|
|
|
|
|
from asyncio import sleep
|
2022-12-04 14:46:32 +00:00
|
|
|
from typing import Optional
|
2022-12-04 14:40:19 +00:00
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
from pyrogram.raw.functions.messages import GetStickerSet
|
2022-12-04 14:40:19 +00:00
|
|
|
from pyrogram.raw.functions.stickers import CreateStickerSet
|
|
|
|
from pyrogram.raw.types import InputStickerSetShortName, InputDocument, InputStickerSetItem
|
2022-07-17 09:23:15 +00:00
|
|
|
from pyrogram.raw.types.messages import StickerSet
|
2022-12-04 14:40:19 +00:00
|
|
|
from pyrogram.file_id import FileId
|
2022-06-25 06:09:35 +00:00
|
|
|
|
|
|
|
from pagermaid.listener import listener
|
2022-12-04 14:40:19 +00:00
|
|
|
from pagermaid.services import bot, sqlite
|
|
|
|
from pagermaid.enums import Message
|
|
|
|
from pagermaid.single_utils import safe_remove
|
|
|
|
from pagermaid.utils import alias_command
|
2022-06-25 06:09:35 +00:00
|
|
|
|
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
class CannotToStickerSetError(Exception):
|
|
|
|
"""
|
|
|
|
Occurs when program cannot change a message to a sticker set
|
|
|
|
"""
|
2022-06-25 06:09:35 +00:00
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
def __init__(self):
|
|
|
|
super().__init__(
|
|
|
|
"无法将此消息转换为贴纸"
|
|
|
|
)
|
2022-06-25 06:09:35 +00:00
|
|
|
|
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
class NoStickerSetNameError(Exception):
|
|
|
|
"""
|
|
|
|
Occurs when no username is provided
|
|
|
|
"""
|
2022-06-25 06:09:35 +00:00
|
|
|
|
2022-12-04 14:40:19 +00:00
|
|
|
def __init__(self, string: str = "请先设置用户名"):
|
2022-07-17 09:23:15 +00:00
|
|
|
super().__init__(
|
2022-12-04 14:40:19 +00:00
|
|
|
string
|
2022-07-17 09:23:15 +00:00
|
|
|
)
|
2022-06-25 06:09:35 +00:00
|
|
|
|
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
class StickerSetFullError(Exception):
|
|
|
|
"""
|
|
|
|
Occurs when the sticker set is full
|
|
|
|
"""
|
2022-06-25 06:09:35 +00:00
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
def __init__(self):
|
|
|
|
super().__init__(
|
|
|
|
"贴纸包已满"
|
|
|
|
)
|
2022-06-25 06:09:35 +00:00
|
|
|
|
|
|
|
|
2022-07-17 09:23:15 +00:00
|
|
|
async def get_pack(name: str):
|
|
|
|
try:
|
|
|
|
return await bot.invoke(GetStickerSet(
|
|
|
|
stickerset=InputStickerSetShortName(short_name=name),
|
|
|
|
hash=0
|
|
|
|
))
|
|
|
|
except Exception as e: # noqa
|
2022-12-04 14:40:19 +00:00
|
|
|
raise NoStickerSetNameError("贴纸名名称错误或者不存在") from e
|
2022-07-17 09:23:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Sticker:
|
|
|
|
message: Message
|
|
|
|
sticker_set: str
|
2022-12-04 14:40:19 +00:00
|
|
|
custom_sticker_set: bool
|
2022-07-17 09:23:15 +00:00
|
|
|
emoji: str
|
|
|
|
should_forward: Message
|
2022-12-04 14:40:19 +00:00
|
|
|
is_animated: bool
|
|
|
|
is_video: bool
|
2022-12-04 14:46:32 +00:00
|
|
|
document: Optional[InputDocument]
|
|
|
|
document_path: Optional[str]
|
2022-12-04 14:40:19 +00:00
|
|
|
software: str = "PagerMaid-Pyro"
|
2022-07-17 09:23:15 +00:00
|
|
|
|
|
|
|
def __init__(self, message: Message, sticker_set: str = "", emoji: str = "😀",
|
|
|
|
should_forward: Message = None):
|
|
|
|
self.message = message
|
|
|
|
self.sticker_set = sticker_set
|
2022-12-04 14:40:19 +00:00
|
|
|
self.custom_sticker_set = False
|
|
|
|
self.load_custom_sticker_set()
|
2022-07-17 09:23:15 +00:00
|
|
|
self.emoji = emoji
|
|
|
|
self.should_forward = should_forward
|
2022-12-04 14:40:19 +00:00
|
|
|
self.should_create = False
|
|
|
|
self.is_animated = False
|
|
|
|
self.is_video = False
|
2022-12-04 14:46:32 +00:00
|
|
|
self.document = None
|
|
|
|
self.document_path = None
|
2022-12-04 14:40:19 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_custom_sticker_set():
|
|
|
|
return sqlite.get("sticker_set", None)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def set_custom_sticker_get(name: str):
|
|
|
|
sqlite["sticker_set"] = name
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def del_custom_sticker_set():
|
|
|
|
del sqlite["sticker_set"]
|
|
|
|
|
|
|
|
def load_custom_sticker_set(self):
|
|
|
|
if name := self.get_custom_sticker_set():
|
|
|
|
self.sticker_set = name
|
|
|
|
self.custom_sticker_set = True
|
2022-07-17 09:23:15 +00:00
|
|
|
|
|
|
|
async def generate_sticker_set(self, time: int = 1):
|
|
|
|
if not self.sticker_set or time > 1:
|
|
|
|
me = await bot.get_me()
|
2022-12-04 14:40:19 +00:00
|
|
|
if not me.username:
|
|
|
|
raise NoStickerSetNameError()
|
|
|
|
self.sticker_set = f"{me.username}_{time}"
|
|
|
|
if self.is_video:
|
|
|
|
self.sticker_set += "_video"
|
|
|
|
elif self.is_animated:
|
|
|
|
self.sticker_set += "_animated"
|
|
|
|
try:
|
|
|
|
await self.check_pack_full()
|
|
|
|
except NoStickerSetNameError as e:
|
|
|
|
self.should_create = True
|
|
|
|
except StickerSetFullError:
|
|
|
|
await self.generate_sticker_set(time + 1)
|
2022-07-17 09:23:15 +00:00
|
|
|
|
|
|
|
async def check_pack_full(self):
|
|
|
|
pack: StickerSet = await get_pack(self.sticker_set)
|
|
|
|
if pack.set.count == 120:
|
|
|
|
raise StickerSetFullError()
|
|
|
|
|
2022-12-04 14:40:19 +00:00
|
|
|
async def process_sticker(self):
|
|
|
|
if not (self.should_forward and self.should_forward.sticker):
|
|
|
|
raise CannotToStickerSetError()
|
|
|
|
sticker_ = self.should_forward.sticker
|
|
|
|
self.is_video = sticker_.is_video
|
|
|
|
self.is_animated = sticker_.is_animated
|
|
|
|
self.emoji = sticker_.emoji
|
|
|
|
if self.is_video or self.is_animated:
|
|
|
|
self.document_path = await self.download_file()
|
|
|
|
file = FileId.decode(sticker_.file_id)
|
|
|
|
self.document = InputDocument(
|
|
|
|
id=file.media_id,
|
|
|
|
access_hash=file.access_hash,
|
|
|
|
file_reference=file.file_reference,
|
|
|
|
)
|
|
|
|
|
|
|
|
async def download_file(self) -> str:
|
|
|
|
return await self.should_forward.download()
|
|
|
|
|
|
|
|
async def upload_file(self):
|
|
|
|
if not self.document_path:
|
2022-06-25 06:09:35 +00:00
|
|
|
return
|
2022-12-04 14:40:19 +00:00
|
|
|
with contextlib.suppress(Exception):
|
|
|
|
msg = await bot.send_document(429000, document=self.document_path, force_document=True)
|
|
|
|
file = FileId.decode(msg.document.file_id)
|
|
|
|
self.document = InputDocument(
|
|
|
|
id=file.media_id,
|
|
|
|
access_hash=file.access_hash,
|
|
|
|
file_reference=file.file_reference,
|
|
|
|
)
|
|
|
|
safe_remove(self.document_path)
|
|
|
|
|
|
|
|
async def create_sticker_set(self):
|
|
|
|
me = await bot.get_me()
|
|
|
|
title = f"@{me.username} 的私藏" if me.username else self.sticker_set
|
|
|
|
try:
|
|
|
|
await bot.invoke(
|
|
|
|
CreateStickerSet(
|
|
|
|
user_id=await bot.resolve_peer((await bot.get_me()).id),
|
|
|
|
title=title,
|
|
|
|
short_name=self.sticker_set,
|
|
|
|
stickers=[
|
|
|
|
InputStickerSetItem(
|
|
|
|
document=self.document,
|
|
|
|
emoji=self.emoji
|
|
|
|
)
|
|
|
|
],
|
|
|
|
animated=self.is_animated,
|
|
|
|
videos=self.is_video,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
except Exception as e:
|
|
|
|
raise NoStickerSetNameError("贴纸包名称非法,请换一个") from e
|
2022-07-17 09:23:15 +00:00
|
|
|
|
2022-12-04 14:40:19 +00:00
|
|
|
async def add_to_sticker_set(self):
|
2022-09-01 08:28:48 +00:00
|
|
|
async with bot.conversation(429000) as conv:
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.ask("/start")
|
2022-12-04 14:40:19 +00:00
|
|
|
await sleep(.3)
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
|
|
|
await conv.ask("/cancel")
|
2022-12-04 14:40:19 +00:00
|
|
|
await sleep(.3)
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
|
|
|
await conv.ask("/addsticker")
|
2022-12-04 14:40:19 +00:00
|
|
|
await sleep(.3)
|
|
|
|
await conv.mark_as_read()
|
|
|
|
resp: Message = await conv.ask(self.sticker_set)
|
|
|
|
await sleep(.3)
|
|
|
|
if resp.text == "Invalid set selected.":
|
|
|
|
raise NoStickerSetNameError("这个贴纸包好像不属于你~")
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
2022-12-04 14:40:19 +00:00
|
|
|
if self.is_video or self.is_animated:
|
|
|
|
await self.upload_file()
|
|
|
|
else:
|
|
|
|
await self.should_forward.forward("Stickers")
|
|
|
|
resp: Message = await conv.get_response()
|
|
|
|
await sleep(.3)
|
|
|
|
if not resp.text.startswith("Thanks!"):
|
|
|
|
raise NoStickerSetNameError("这个贴纸包类型好像不匹配~")
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
|
|
|
await conv.ask(self.emoji)
|
2022-12-04 14:40:19 +00:00
|
|
|
await sleep(.3)
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
|
|
|
await conv.ask("/done")
|
2022-12-04 14:40:19 +00:00
|
|
|
await sleep(.3)
|
|
|
|
await conv.mark_as_read()
|
|
|
|
await conv.ask("/done")
|
|
|
|
await sleep(.3)
|
2022-07-17 09:23:15 +00:00
|
|
|
await conv.mark_as_read()
|
|
|
|
|
|
|
|
async def to_sticker_set(self):
|
|
|
|
await self.generate_sticker_set()
|
|
|
|
if not self.sticker_set:
|
|
|
|
raise NoStickerSetNameError()
|
2022-12-04 14:40:19 +00:00
|
|
|
if self.should_create:
|
|
|
|
await self.upload_file()
|
|
|
|
await self.create_sticker_set()
|
|
|
|
else:
|
|
|
|
await self.add_to_sticker_set()
|
|
|
|
|
|
|
|
def mention(self):
|
|
|
|
return f"[{self.sticker_set}](https://t.me/addstickers/{self.sticker_set})"
|
|
|
|
|
|
|
|
def get_config(self) -> str:
|
|
|
|
pack = self.mention() if self.sticker_set else "无法保存,请设置用户名"
|
|
|
|
return f"欢迎使用 sticker 插件\n\n" \
|
|
|
|
f"将自动保存到贴纸包:{pack}\n\n" \
|
|
|
|
f"使用命令 <code>,{alias_command('s')} 贴纸包名</code> 自定义保存贴纸包\n" \
|
|
|
|
f"使用命令 <code>,{alias_command('s')} cancel</code> 取消自定义保存贴纸包"
|
2022-07-17 09:23:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
@listener(command="s",
|
|
|
|
need_admin=True)
|
|
|
|
async def sticker(message: Message):
|
|
|
|
one_sticker = Sticker(message, should_forward=message.reply_to_message)
|
2022-12-04 14:40:19 +00:00
|
|
|
if not message.reply_to_message:
|
|
|
|
with contextlib.suppress(Exception):
|
|
|
|
await one_sticker.generate_sticker_set()
|
|
|
|
if not message.arguments:
|
|
|
|
return await message.edit(one_sticker.get_config())
|
|
|
|
elif len(message.parameter) == 1:
|
|
|
|
if message.arguments == "cancel":
|
|
|
|
if one_sticker.get_custom_sticker_set() is None:
|
|
|
|
return await message.edit("还没有设置自定义保存贴纸包")
|
|
|
|
one_sticker.del_custom_sticker_set()
|
|
|
|
return await message.edit("移除自定义保存贴纸包成功")
|
|
|
|
else:
|
|
|
|
one_sticker.sticker_set = message.arguments
|
|
|
|
try:
|
|
|
|
await one_sticker.check_pack_full()
|
|
|
|
except NoStickerSetNameError:
|
|
|
|
pass
|
|
|
|
except Exception as e:
|
|
|
|
return await message.edit(f"设置自定义贴纸包失败:{e}")
|
|
|
|
one_sticker.set_custom_sticker_get(message.arguments)
|
|
|
|
return await message.edit("设置自定义保存贴纸包成功")
|
|
|
|
else:
|
|
|
|
return await message.edit("参数错误")
|
2022-07-17 09:23:15 +00:00
|
|
|
try:
|
2022-12-04 14:40:19 +00:00
|
|
|
await one_sticker.process_sticker()
|
2022-07-17 09:23:15 +00:00
|
|
|
await one_sticker.to_sticker_set()
|
|
|
|
except Exception as e:
|
|
|
|
return await message.edit(f"收藏到贴纸包失败:{e}")
|
2022-12-04 14:40:19 +00:00
|
|
|
await message.edit(f"收藏到贴纸包 {one_sticker.mention()} 成功")
|