PagerMaid_Plugins/getstickers.py
2022-01-18 16:47:20 +08:00

225 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from collections import defaultdict
import os
import zipfile
from PIL import Image
from telethon.tl.functions.messages import GetAllStickersRequest
from telethon.tl.functions.messages import GetStickerSetRequest
from telethon.errors import MessageNotModifiedError
from telethon.errors.rpcerrorlist import StickersetInvalidError
from pagermaid import working_dir, version
from telethon.tl.types import (
DocumentAttributeSticker,
InputStickerSetID,
)
from pagermaid.listener import listener
from pagermaid.utils import alias_command
lottie_import = True
try:
from lottie.exporters.gif import export_gif
from lottie.importers.core import import_tgs
except ImportError:
lottie_import = False
@listener(is_plugin=True, outgoing=True, command=alias_command("getstickers"),
description="获取整个贴纸包的贴纸false 关闭 tgs 转 gif任意值开启下载所有贴纸包\n"
"转 gif 需要手动安装 pypi 依赖 lottie[gif] \n"
"Ubuntu 转换出错请先运行 `-sh apt install libpangocairo-1.0-0 -y`。",
parameters="<任意值>")
async def getstickers(context):
tgs_gif = True
if not os.path.isdir('data/sticker/'):
os.makedirs('data/sticker/')
if len(context.parameter) == 1 or len(context.parameter) == 2:
if "false" in context.arguments:
tgs_gif = False
if "all" in context.arguments:
sticker_sets = await context.client(GetAllStickersRequest(0))
for stickerset in sticker_sets.sets:
file_ext_ns_ion = "webp"
wdnmd = InputStickerSetID(id=stickerset.id, access_hash=stickerset.access_hash)
sticker_set = await context.client(GetStickerSetRequest(wdnmd))
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
if os.path.isfile(pack_file):
os.remove(pack_file)
# Sticker emojis
emojis = defaultdict(str)
for pack in sticker_set.packs:
for document_id in pack.documents:
emojis[document_id] += pack.emoticon
async def download(sticker, emojis, path, file):
await context.client.download_media(sticker, file=os.path.join(path, file))
with open(pack_file, "a") as f:
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
animated = import_tgs(os.path.join(path, file))
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
elif file_ext_ns_ion == 'webp':
convert_png(os.path.join(path, file))
pending_tasks = [
asyncio.ensure_future(
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
f"{i:03d}.{file_ext_ns_ion}")
) for i, document in enumerate(sticker_set.documents)
]
xx = await context.client.send_message(context.chat_id,
f"正在下载 {sticker_set.set.short_name} "
f"中的 {sticker_set.set.count} 张贴纸。。。")
num_tasks = len(pending_tasks)
while 1:
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
return_when=asyncio.FIRST_COMPLETED)
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
try:
await xx.edit(
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
except MessageNotModifiedError:
pass
if not pending_tasks:
break
await xx.edit("下载完毕,打包上传中。")
directory_name = sticker_set.set.short_name
os.chdir("data/sticker/") # 修改当前工作目录
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
zipdir(directory_name, zipf)
zipf.close()
await context.client.send_file(
context.chat_id,
directory_name + ".zip",
caption=sticker_set.set.short_name,
force_document=True,
allow_cache=False
)
try:
os.remove(directory_name + ".zip")
os.remove(directory_name)
except:
pass
os.chdir(working_dir)
await xx.delete()
return
reply_message = await context.get_reply_message()
if reply_message:
if not reply_message.sticker:
await context.edit("请回复一张贴纸。")
return
else:
await context.edit("请回复一张贴纸。")
return
sticker = reply_message.sticker
sticker_attrib = find_instance(sticker.attributes, DocumentAttributeSticker)
if not sticker_attrib.stickerset:
await context.edit("回复的贴纸不属于任何贴纸包。")
return
is_a_s = is_it_animated_sticker(reply_message)
file_ext_ns_ion = "webp"
if is_a_s:
file_ext_ns_ion = "tgs"
if tgs_gif and not lottie_import:
await context.reply('`lottie[gif]` 依赖未安装tgs 无法转换为 gif ,进行标准格式导出。')
try:
sticker_set = await context.client(GetStickerSetRequest(sticker_attrib.stickerset))
except StickersetInvalidError:
await context.edit('回复的贴纸不存在于任何贴纸包中。')
return
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
if os.path.isfile(pack_file):
os.remove(pack_file)
# Sticker emojis
emojis = defaultdict(str)
for pack in sticker_set.packs:
for document_id in pack.documents:
emojis[document_id] += pack.emoticon
async def download(sticker, emojis, path, file):
await context.client.download_media(sticker, file=os.path.join(path, file))
with open(pack_file, "a") as f:
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
animated = import_tgs(os.path.join(path, file))
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
elif file_ext_ns_ion == 'webp':
convert_png(os.path.join(path, file))
pending_tasks = [
asyncio.ensure_future(
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
f"{i:03d}.{file_ext_ns_ion}")
) for i, document in enumerate(sticker_set.documents)
]
await context.edit(
f"正在下载 {sticker_set.set.short_name} 中的 {sticker_set.set.count} 张贴纸。。。")
num_tasks = len(pending_tasks)
while 1:
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
return_when=asyncio.FIRST_COMPLETED)
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
try:
await context.edit(
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
except MessageNotModifiedError:
pass
if not pending_tasks:
break
await context.edit("下载完毕,打包上传中。")
directory_name = sticker_set.set.short_name
os.chdir("data/sticker/") # 修改当前工作目录
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
zipdir(directory_name, zipf)
zipf.close()
await context.client.send_file(
context.chat_id,
directory_name + ".zip",
caption=sticker_set.set.short_name,
force_document=True,
allow_cache=False,
reply_to=reply_message.id
)
try:
os.remove(directory_name + ".zip")
os.remove(directory_name)
except:
pass
os.chdir(working_dir)
await context.delete()
def find_instance(items, class_or_tuple):
for item in items:
if isinstance(item, class_or_tuple):
return item
return None
def is_it_animated_sticker(message):
try:
if message.media and message.media.document:
mime_type = message.media.document.mime_type
if "tgsticker" in mime_type:
return True
else:
return False
else:
return False
except:
return False
def zipdir(path, ziph):
for root, dirs, files in os.walk(path):
for file in files:
ziph.write(os.path.join(root, file))
os.remove(os.path.join(root, file))
def convert_png(path):
im = Image.open(path)
im = im.convert('RGBA')
new_path = path.replace(".webp", ".png")
im.save(new_path, 'PNG')
return new_path