From 45fcb1f1549a6618f989f310da5f5c3e22b31394 Mon Sep 17 00:00:00 2001 From: xtaodada Date: Sun, 7 Aug 2022 13:09:25 +0800 Subject: [PATCH] =?UTF-8?q?getstickers=20=E8=8E=B7=E5=8F=96=E6=95=B4?= =?UTF-8?q?=E4=B8=AA=E8=B4=B4=E7=BA=B8=E5=8C=85=E7=9A=84=E8=B4=B4=E7=BA=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- getstickers/main.py | 90 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 getstickers/main.py diff --git a/getstickers/main.py b/getstickers/main.py new file mode 100644 index 0000000..6a98deb --- /dev/null +++ b/getstickers/main.py @@ -0,0 +1,90 @@ +import asyncio +import shutil +from collections import defaultdict +import os +import zipfile + +from pyrogram.raw.functions.messages import GetStickerSet +from pyrogram.raw.types import InputStickerSetShortName +from pyrogram.raw.types.messages import StickerSet +from pyrogram.types import Document + +from pagermaid import working_dir +from pagermaid.enums import Message, Client +from pagermaid.listener import listener +from pagermaid.single_utils import safe_remove + + +@listener(command="getstickers", + description="获取整个贴纸包的贴纸") +async def get_stickers(bot: Client, message: Message): + if not os.path.isdir('data/sticker/'): + os.makedirs('data/sticker/') + if message.reply_to_message: + if not message.reply_to_message.sticker: + return await message.edit("请回复一张贴纸。") + else: + return await message.edit("请回复一张贴纸。") + sticker = message.reply_to_message.sticker + if not sticker.set_name: + return await message.edit("回复的贴纸不属于任何贴纸包。") + try: + sticker_set: StickerSet = await bot.invoke( + GetStickerSet(stickerset=InputStickerSetShortName(short_name=sticker.set_name), hash=0)) + except Exception: + return await message.edit('回复的贴纸不存在于任何贴纸包中。') + pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt") + + if os.path.isfile(pack_file): + os.remove(pack_file) + emojis = defaultdict(str) + for pack in sticker_set.packs: + for document_id in pack.documents: + emojis[document_id] += pack.emoticon + file_ext_ns_ion = "webp" + if sticker.is_video: + file_ext_ns_ion = "mp4" + elif sticker.is_animated: + file_ext_ns_ion = "tgs" + + async def download(sticker_, emojis_, path, file): + sticker_file = Document._parse(bot, sticker_, "sticker.webp") # noqa + await bot.download_media(sticker_file.file_id, file_name=os.path.join(path, file)) + + with open(pack_file, "a") as f: + f.write(f"{{'image_file': '{file}','emojis':{emojis_[sticker_.id]}}},") + + pending_tasks = [asyncio.ensure_future( + download(document, emojis, f"data/sticker/{sticker_set.set.short_name}", f"{i:03d}.{file_ext_ns_ion}") + ) for i, document in enumerate(sticker_set.documents)] + + message: Message = await message.edit(f"正在下载 {sticker_set.set.short_name} 中的 {sticker_set.set.count} 张贴纸。。。") + + while 1: + done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5, return_when=asyncio.FIRST_COMPLETED) + if not pending_tasks: + break + + await message.edit("下载完毕,打包上传中。") + directory_name = sticker_set.set.short_name + os.chdir("data/sticker/") + zipf = zipfile.ZipFile(f"{directory_name}.zip", "w", zipfile.ZIP_DEFLATED) + zipdir(directory_name, zipf) + zipf.close() + await bot.send_document( + message.chat.id, + f"{directory_name}.zip", + caption=sticker_set.set.short_name, + reply_to_message_id=message.reply_to_message_id + ) + safe_remove(f"{directory_name}.zip") + shutil.rmtree(directory_name) + os.chdir(working_dir) + await message.safe_delete() + + +def zipdir(path, zip_): + for root, dirs, files in os.walk(path): + for file in files: + zip_.write(os.path.join(root, file)) + os.remove(os.path.join(root, file))