PagerMaid_Plugins_Pyro/sticker/main.py

286 lines
9.7 KiB
Python
Raw Permalink Normal View History

2022-12-04 14:40:19 +00:00
import contextlib
from asyncio import sleep
2022-12-04 14:46:32 +00:00
from typing import Optional
2022-12-04 14:40:19 +00:00
2022-12-05 13:49:48 +00:00
from pyrogram.errors import PeerIdInvalid
2022-07-17 09:23:15 +00:00
from pyrogram.raw.functions.messages import GetStickerSet
2022-12-04 14:40:19 +00:00
from pyrogram.raw.functions.stickers import CreateStickerSet
2023-07-01 12:18:58 +00:00
from pyrogram.raw.types import (
InputStickerSetShortName,
InputDocument,
InputStickerSetItem,
)
2022-07-17 09:23:15 +00:00
from pyrogram.raw.types.messages import StickerSet
2022-12-04 14:40:19 +00:00
from pyrogram.file_id import FileId
from pagermaid.listener import listener
2022-12-04 14:40:19 +00:00
from pagermaid.services import bot, sqlite
from pagermaid.enums import Message
2024-09-28 14:35:08 +00:00
from pagermaid.utils import alias_command, safe_remove
2022-07-17 09:23:15 +00:00
class CannotToStickerSetError(Exception):
"""
2023-07-01 12:18:58 +00:00
Occurs when program cannot change a message to a sticker set
2022-07-17 09:23:15 +00:00
"""
2022-07-17 09:23:15 +00:00
def __init__(self):
2023-07-01 12:18:58 +00:00
super().__init__("无法将此消息转换为贴纸")
2022-07-17 09:23:15 +00:00
class NoStickerSetNameError(Exception):
"""
2023-07-01 12:18:58 +00:00
Occurs when no username is provided
2022-07-17 09:23:15 +00:00
"""
2022-12-04 14:40:19 +00:00
def __init__(self, string: str = "请先设置用户名"):
2023-07-01 12:18:58 +00:00
super().__init__(string)
2022-07-17 09:23:15 +00:00
class StickerSetFullError(Exception):
"""
2023-07-01 12:18:58 +00:00
Occurs when the sticker set is full
2022-07-17 09:23:15 +00:00
"""
2022-07-17 09:23:15 +00:00
def __init__(self):
2023-07-01 12:18:58 +00:00
super().__init__("贴纸包已满")
2022-07-17 09:23:15 +00:00
async def get_pack(name: str):
try:
2023-07-01 12:18:58 +00:00
return await bot.invoke(
GetStickerSet(stickerset=InputStickerSetShortName(short_name=name), hash=0)
)
2022-07-17 09:23:15 +00:00
except Exception as e: # noqa
2022-12-04 14:40:19 +00:00
raise NoStickerSetNameError("贴纸名名称错误或者不存在") from e
2022-07-17 09:23:15 +00:00
class Sticker:
message: Message
sticker_set: str
2022-12-04 14:40:19 +00:00
custom_sticker_set: bool
2022-07-17 09:23:15 +00:00
emoji: str
should_forward: Message
2022-12-04 14:40:19 +00:00
is_animated: bool
is_video: bool
2022-12-04 14:55:57 +00:00
nums: int
2022-12-04 14:46:32 +00:00
document: Optional[InputDocument]
document_path: Optional[str]
2022-12-04 14:40:19 +00:00
software: str = "PagerMaid-Pyro"
2022-07-17 09:23:15 +00:00
2023-07-01 12:18:58 +00:00
def __init__(
self,
message: Message,
sticker_set: str = "",
emoji: str = "😀",
should_forward: Message = None,
):
2022-07-17 09:23:15 +00:00
self.message = message
self.sticker_set = sticker_set
2022-12-04 14:40:19 +00:00
self.custom_sticker_set = False
self.load_custom_sticker_set()
2022-07-17 09:23:15 +00:00
self.emoji = emoji
self.should_forward = should_forward
2022-12-04 14:40:19 +00:00
self.should_create = False
self.is_animated = False
self.is_video = False
2022-12-04 14:55:57 +00:00
self.nums = 1
2022-12-04 14:46:32 +00:00
self.document = None
self.document_path = None
2022-12-04 14:40:19 +00:00
@staticmethod
def get_custom_sticker_set():
return sqlite.get("sticker_set", None)
@staticmethod
def set_custom_sticker_get(name: str):
sqlite["sticker_set"] = name
@staticmethod
def del_custom_sticker_set():
del sqlite["sticker_set"]
def load_custom_sticker_set(self):
if name := self.get_custom_sticker_set():
self.sticker_set = name
self.custom_sticker_set = True
2022-07-17 09:23:15 +00:00
async def generate_sticker_set(self, time: int = 1):
2022-12-04 14:55:57 +00:00
self.nums = time
2022-07-17 09:23:15 +00:00
if not self.sticker_set or time > 1:
me = await bot.get_me()
2022-12-04 14:40:19 +00:00
if not me.username:
raise NoStickerSetNameError()
self.sticker_set = f"{me.username}_{time}"
if self.is_video:
self.sticker_set += "_video"
elif self.is_animated:
self.sticker_set += "_animated"
try:
await self.check_pack_full()
2022-12-05 13:49:48 +00:00
except NoStickerSetNameError:
2022-12-04 14:40:19 +00:00
self.should_create = True
except StickerSetFullError:
await self.generate_sticker_set(time + 1)
2022-07-17 09:23:15 +00:00
async def check_pack_full(self):
pack: StickerSet = await get_pack(self.sticker_set)
if pack.set.count == 120:
raise StickerSetFullError()
2022-12-04 14:40:19 +00:00
async def process_sticker(self):
if not (self.should_forward and self.should_forward.sticker):
raise CannotToStickerSetError()
sticker_ = self.should_forward.sticker
self.is_video = sticker_.is_video
self.is_animated = sticker_.is_animated
2022-12-05 15:11:59 +00:00
self.emoji = sticker_.emoji or self.emoji
2022-12-04 14:40:19 +00:00
if self.is_video or self.is_animated:
self.document_path = await self.download_file()
file = FileId.decode(sticker_.file_id)
self.document = InputDocument(
id=file.media_id,
access_hash=file.access_hash,
file_reference=file.file_reference,
)
async def download_file(self) -> str:
return await self.should_forward.download()
async def upload_file(self):
if not self.document_path:
return
2022-12-04 14:40:19 +00:00
with contextlib.suppress(Exception):
2023-07-01 12:18:58 +00:00
msg = await bot.send_document(
429000, document=self.document_path, force_document=True
)
2022-12-04 14:40:19 +00:00
file = FileId.decode(msg.document.file_id)
self.document = InputDocument(
id=file.media_id,
access_hash=file.access_hash,
file_reference=file.file_reference,
)
safe_remove(self.document_path)
async def create_sticker_set(self):
me = await bot.get_me()
2022-12-04 14:55:57 +00:00
title = f"@{me.username} 的私藏({self.nums}" if me.username else self.sticker_set
if self.is_video:
title += "Video"
elif self.is_animated:
title += "Animated"
2022-12-04 14:40:19 +00:00
try:
await bot.invoke(
CreateStickerSet(
user_id=await bot.resolve_peer((await bot.get_me()).id),
title=title,
short_name=self.sticker_set,
stickers=[
2023-07-01 12:18:58 +00:00
InputStickerSetItem(document=self.document, emoji=self.emoji)
2022-12-04 14:40:19 +00:00
],
animated=self.is_animated,
videos=self.is_video,
)
)
except Exception as e:
raise NoStickerSetNameError("贴纸包名称非法,请换一个") from e
2022-07-17 09:23:15 +00:00
2022-12-04 14:40:19 +00:00
async def add_to_sticker_set(self):
2022-09-01 08:28:48 +00:00
async with bot.conversation(429000) as conv:
2022-07-17 09:23:15 +00:00
await conv.ask("/start")
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
await conv.ask("/cancel")
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
await conv.ask("/addsticker")
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-12-04 14:40:19 +00:00
await conv.mark_as_read()
resp: Message = await conv.ask(self.sticker_set)
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-12-04 14:40:19 +00:00
if resp.text == "Invalid set selected.":
raise NoStickerSetNameError("这个贴纸包好像不属于你~")
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
2022-12-04 14:40:19 +00:00
if self.is_video or self.is_animated:
await self.upload_file()
else:
await self.should_forward.forward("Stickers")
resp: Message = await conv.get_response()
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-12-04 14:40:19 +00:00
if not resp.text.startswith("Thanks!"):
raise NoStickerSetNameError("这个贴纸包类型好像不匹配~")
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
await conv.ask(self.emoji)
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
await conv.ask("/done")
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-12-04 14:40:19 +00:00
await conv.mark_as_read()
await conv.ask("/done")
2023-07-01 12:18:58 +00:00
await sleep(0.3)
2022-07-17 09:23:15 +00:00
await conv.mark_as_read()
async def to_sticker_set(self):
await self.generate_sticker_set()
if not self.sticker_set:
raise NoStickerSetNameError()
2022-12-04 14:40:19 +00:00
if self.should_create:
await self.upload_file()
await self.create_sticker_set()
else:
await self.add_to_sticker_set()
def mention(self):
return f"[{self.sticker_set}](https://t.me/addstickers/{self.sticker_set})"
def get_config(self) -> str:
pack = self.mention() if self.sticker_set else "无法保存,请设置用户名"
2023-07-01 12:18:58 +00:00
return (
f"欢迎使用 sticker 插件\n\n"
f"将自动保存到贴纸包:{pack}\n\n"
f"使用命令 <code>,{alias_command('s')} 贴纸包名</code> 自定义保存贴纸包\n"
f"使用命令 <code>,{alias_command('s')} cancel</code> 取消自定义保存贴纸包"
)
2022-07-17 09:23:15 +00:00
2022-12-04 14:55:57 +00:00
@listener(
command="s",
2023-01-23 14:51:49 +00:00
parameters="[贴纸包名/cancel]",
2022-12-04 14:55:57 +00:00
description="保存贴纸到自己的贴纸包",
need_admin=True,
)
2022-07-17 09:23:15 +00:00
async def sticker(message: Message):
one_sticker = Sticker(message, should_forward=message.reply_to_message)
2022-12-04 14:40:19 +00:00
if not message.reply_to_message:
with contextlib.suppress(Exception):
await one_sticker.generate_sticker_set()
if not message.arguments:
return await message.edit(one_sticker.get_config())
elif len(message.parameter) == 1:
if message.arguments == "cancel":
if one_sticker.get_custom_sticker_set() is None:
return await message.edit("还没有设置自定义保存贴纸包")
one_sticker.del_custom_sticker_set()
return await message.edit("移除自定义保存贴纸包成功")
else:
one_sticker.sticker_set = message.arguments
try:
await one_sticker.check_pack_full()
except NoStickerSetNameError:
pass
except Exception as e:
return await message.edit(f"设置自定义贴纸包失败:{e}")
one_sticker.set_custom_sticker_get(message.arguments)
return await message.edit("设置自定义保存贴纸包成功")
else:
return await message.edit("参数错误")
2022-07-17 09:23:15 +00:00
try:
2022-12-04 14:40:19 +00:00
await one_sticker.process_sticker()
2022-07-17 09:23:15 +00:00
await one_sticker.to_sticker_set()
2022-12-05 13:49:48 +00:00
except PeerIdInvalid:
return await message.edit("请先私聊一次 @Stickers 机器人")
2022-07-17 09:23:15 +00:00
except Exception as e:
return await message.edit(f"收藏到贴纸包失败:{e}")
2022-12-04 14:40:19 +00:00
await message.edit(f"收藏到贴纸包 {one_sticker.mention()} 成功")