PagerMaid_Plugins/getstickers.py

157 lines
5.8 KiB
Python
Raw Normal View History

2021-06-30 07:25:39 +00:00
import asyncio
from collections import defaultdict
import os
import zipfile
2021-07-01 02:09:16 +00:00
from PIL import Image
2021-06-30 07:25:39 +00:00
from telethon.tl.functions.messages import GetStickerSetRequest
from telethon.errors import MessageNotModifiedError
from pagermaid import working_dir
from telethon.tl.types import (
DocumentAttributeFilename,
DocumentAttributeSticker,
InputMediaUploadedDocument,
InputPeerNotifySettings,
InputStickerSetID,
InputStickerSetShortName,
MessageMediaPhoto
)
from pagermaid.listener import listener
from pagermaid.utils import alias_command
lottie_import = True
try:
from lottie.exporters.gif import export_gif
from lottie.importers.core import import_tgs
except ImportError:
lottie_import = False
@listener(is_plugin=True, outgoing=True, command=alias_command("getstickers"),
2021-07-01 02:09:16 +00:00
description="获取整个贴纸包的贴纸,任意值开启 tgs 转 gif转 gif 需要手动安装 pypi 依赖 lottie[gif] 。",
2021-06-30 07:25:39 +00:00
parameters="<任意值>")
async def getstickers(context):
tgs_gif = True
if len(context.parameter) == 0:
tgs_gif = False
if not os.path.isdir('data/sticker/'):
os.makedirs('data/sticker/')
if context.reply_to_msg_id:
reply_message = await context.get_reply_message()
if not reply_message.sticker:
await context.edit("请回复一张贴纸。")
return
sticker = reply_message.sticker
sticker_attrib = find_instance(sticker.attributes, DocumentAttributeSticker)
if not sticker_attrib.stickerset:
await context.edit("回复的贴纸不属于任何贴纸包。")
return
is_a_s = is_it_animated_sticker(reply_message)
file_ext_ns_ion = "webp"
if is_a_s:
file_ext_ns_ion = "tgs"
if tgs_gif and not lottie_import:
await context.reply('`lottie[gif]` 依赖未安装tgs 无法转换为 gif ,进行标准格式导出。')
sticker_set = await context.client(GetStickerSetRequest(sticker_attrib.stickerset))
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
if os.path.isfile(pack_file):
os.remove(pack_file)
# Sticker emojis
emojis = defaultdict(str)
for pack in sticker_set.packs:
for document_id in pack.documents:
emojis[document_id] += pack.emoticon
async def download(sticker, emojis, path, file):
await context.client.download_media(sticker, file=os.path.join(path, file))
with open(pack_file, "a") as f:
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
animated = import_tgs(os.path.join(path, file))
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
2021-07-01 02:09:16 +00:00
elif file_ext_ns_ion == 'webp':
convert_png(os.path.join(path, file))
2021-06-30 07:25:39 +00:00
pending_tasks = [
asyncio.ensure_future(
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
f"{i:03d}.{file_ext_ns_ion}")
) for i, document in enumerate(sticker_set.documents)
]
await context.edit(
f"正在下载 {sticker_set.set.short_name} 中的 {sticker_set.set.count} 张贴纸。。。")
num_tasks = len(pending_tasks)
while 1:
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
return_when=asyncio.FIRST_COMPLETED)
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
try:
await context.edit(
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
except MessageNotModifiedError:
pass
if not pending_tasks:
break
await context.edit("下载完毕,打包上传中。")
directory_name = sticker_set.set.short_name
os.chdir("data/sticker/") # 修改当前工作目录
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
zipdir(directory_name, zipf)
zipf.close()
await context.client.send_file(
context.chat_id,
directory_name + ".zip",
caption=sticker_set.set.short_name,
force_document=True,
allow_cache=False,
reply_to=reply_message.id
)
try:
os.remove(directory_name + ".zip")
os.remove(directory_name)
except:
pass
os.chdir(working_dir)
await context.delete()
else:
await context.edit("请回复一张贴纸。")
def find_instance(items, class_or_tuple):
for item in items:
if isinstance(item, class_or_tuple):
return item
return None
def is_it_animated_sticker(message):
try:
if message.media and message.media.document:
mime_type = message.media.document.mime_type
if "tgsticker" in mime_type:
return True
else:
return False
else:
return False
except:
return False
def zipdir(path, ziph):
for root, dirs, files in os.walk(path):
for file in files:
ziph.write(os.path.join(root, file))
os.remove(os.path.join(root, file))
def convert_png(path):
im = Image.open(path)
im.load()
alpha = im.split()[-1]
im = im.convert('RGB').convert('P', palette=Image.ADAPTIVE, colors=255)
mask = Image.eval(alpha, lambda a: 255 if a <=128 else 0)
im.paste(255, mask)
new_path = path.replace(".webp", ".png")
im.save(new_path, transparency=255)
return new_path