2021-06-30 07:25:39 +00:00
|
|
|
|
import asyncio
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
import os
|
|
|
|
|
import zipfile
|
2021-07-01 02:09:16 +00:00
|
|
|
|
from PIL import Image
|
2021-07-19 04:05:55 +00:00
|
|
|
|
from telethon.tl.functions.messages import GetAllStickersRequest
|
2021-06-30 07:25:39 +00:00
|
|
|
|
from telethon.tl.functions.messages import GetStickerSetRequest
|
|
|
|
|
from telethon.errors import MessageNotModifiedError
|
2021-07-08 13:56:26 +00:00
|
|
|
|
from telethon.errors.rpcerrorlist import StickersetInvalidError
|
2022-01-18 08:47:20 +00:00
|
|
|
|
from pagermaid import working_dir, version
|
2021-06-30 07:25:39 +00:00
|
|
|
|
from telethon.tl.types import (
|
|
|
|
|
DocumentAttributeSticker,
|
|
|
|
|
InputStickerSetID,
|
|
|
|
|
)
|
|
|
|
|
from pagermaid.listener import listener
|
|
|
|
|
from pagermaid.utils import alias_command
|
|
|
|
|
|
|
|
|
|
lottie_import = True
|
|
|
|
|
try:
|
|
|
|
|
from lottie.exporters.gif import export_gif
|
|
|
|
|
from lottie.importers.core import import_tgs
|
|
|
|
|
except ImportError:
|
|
|
|
|
lottie_import = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@listener(is_plugin=True, outgoing=True, command=alias_command("getstickers"),
|
2021-12-18 16:55:04 +00:00
|
|
|
|
description="获取整个贴纸包的贴纸,false 关闭 tgs 转 gif;任意值开启下载所有贴纸包;\n"
|
|
|
|
|
"转 gif 需要手动安装 pypi 依赖 lottie[gif] \n"
|
|
|
|
|
"Ubuntu 转换出错请先运行 `-sh apt install libpangocairo-1.0-0 -y`。",
|
2021-06-30 07:25:39 +00:00
|
|
|
|
parameters="<任意值>")
|
|
|
|
|
async def getstickers(context):
|
|
|
|
|
tgs_gif = True
|
|
|
|
|
if not os.path.isdir('data/sticker/'):
|
|
|
|
|
os.makedirs('data/sticker/')
|
2021-07-19 04:05:55 +00:00
|
|
|
|
if len(context.parameter) == 1 or len(context.parameter) == 2:
|
2021-08-16 14:32:42 +00:00
|
|
|
|
if "false" in context.arguments:
|
2021-07-19 04:05:55 +00:00
|
|
|
|
tgs_gif = False
|
2021-08-16 14:32:42 +00:00
|
|
|
|
if "all" in context.arguments:
|
2021-07-19 04:05:55 +00:00
|
|
|
|
sticker_sets = await context.client(GetAllStickersRequest(0))
|
|
|
|
|
for stickerset in sticker_sets.sets:
|
|
|
|
|
file_ext_ns_ion = "webp"
|
|
|
|
|
wdnmd = InputStickerSetID(id=stickerset.id, access_hash=stickerset.access_hash)
|
|
|
|
|
sticker_set = await context.client(GetStickerSetRequest(wdnmd))
|
|
|
|
|
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
|
|
|
|
|
if os.path.isfile(pack_file):
|
|
|
|
|
os.remove(pack_file)
|
|
|
|
|
# Sticker emojis
|
|
|
|
|
emojis = defaultdict(str)
|
|
|
|
|
for pack in sticker_set.packs:
|
|
|
|
|
for document_id in pack.documents:
|
|
|
|
|
emojis[document_id] += pack.emoticon
|
2021-08-16 14:32:42 +00:00
|
|
|
|
|
2021-07-19 04:05:55 +00:00
|
|
|
|
async def download(sticker, emojis, path, file):
|
|
|
|
|
await context.client.download_media(sticker, file=os.path.join(path, file))
|
|
|
|
|
with open(pack_file, "a") as f:
|
|
|
|
|
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
|
|
|
|
|
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
|
|
|
|
|
animated = import_tgs(os.path.join(path, file))
|
|
|
|
|
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
|
|
|
|
|
elif file_ext_ns_ion == 'webp':
|
|
|
|
|
convert_png(os.path.join(path, file))
|
|
|
|
|
|
|
|
|
|
pending_tasks = [
|
|
|
|
|
asyncio.ensure_future(
|
|
|
|
|
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
|
|
|
|
|
f"{i:03d}.{file_ext_ns_ion}")
|
|
|
|
|
) for i, document in enumerate(sticker_set.documents)
|
|
|
|
|
]
|
2021-08-16 14:32:42 +00:00
|
|
|
|
xx = await context.client.send_message(context.chat_id,
|
|
|
|
|
f"正在下载 {sticker_set.set.short_name} "
|
|
|
|
|
f"中的 {sticker_set.set.count} 张贴纸。。。")
|
2021-07-19 04:05:55 +00:00
|
|
|
|
num_tasks = len(pending_tasks)
|
|
|
|
|
while 1:
|
|
|
|
|
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
|
|
|
|
|
return_when=asyncio.FIRST_COMPLETED)
|
|
|
|
|
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
|
|
|
|
|
try:
|
|
|
|
|
await xx.edit(
|
|
|
|
|
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
|
|
|
|
|
except MessageNotModifiedError:
|
|
|
|
|
pass
|
|
|
|
|
if not pending_tasks:
|
|
|
|
|
break
|
|
|
|
|
await xx.edit("下载完毕,打包上传中。")
|
|
|
|
|
directory_name = sticker_set.set.short_name
|
|
|
|
|
os.chdir("data/sticker/") # 修改当前工作目录
|
|
|
|
|
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
|
|
|
|
|
zipdir(directory_name, zipf)
|
|
|
|
|
zipf.close()
|
|
|
|
|
await context.client.send_file(
|
|
|
|
|
context.chat_id,
|
|
|
|
|
directory_name + ".zip",
|
|
|
|
|
caption=sticker_set.set.short_name,
|
|
|
|
|
force_document=True,
|
|
|
|
|
allow_cache=False
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
os.remove(directory_name + ".zip")
|
|
|
|
|
os.remove(directory_name)
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
os.chdir(working_dir)
|
|
|
|
|
await xx.delete()
|
2021-08-16 14:32:42 +00:00
|
|
|
|
return
|
|
|
|
|
reply_message = await context.get_reply_message()
|
|
|
|
|
if reply_message:
|
2021-06-30 07:25:39 +00:00
|
|
|
|
if not reply_message.sticker:
|
|
|
|
|
await context.edit("请回复一张贴纸。")
|
|
|
|
|
return
|
2021-08-16 14:32:42 +00:00
|
|
|
|
else:
|
|
|
|
|
await context.edit("请回复一张贴纸。")
|
|
|
|
|
return
|
|
|
|
|
sticker = reply_message.sticker
|
|
|
|
|
sticker_attrib = find_instance(sticker.attributes, DocumentAttributeSticker)
|
|
|
|
|
if not sticker_attrib.stickerset:
|
|
|
|
|
await context.edit("回复的贴纸不属于任何贴纸包。")
|
|
|
|
|
return
|
|
|
|
|
is_a_s = is_it_animated_sticker(reply_message)
|
|
|
|
|
file_ext_ns_ion = "webp"
|
|
|
|
|
if is_a_s:
|
|
|
|
|
file_ext_ns_ion = "tgs"
|
|
|
|
|
if tgs_gif and not lottie_import:
|
|
|
|
|
await context.reply('`lottie[gif]` 依赖未安装,tgs 无法转换为 gif ,进行标准格式导出。')
|
|
|
|
|
try:
|
|
|
|
|
sticker_set = await context.client(GetStickerSetRequest(sticker_attrib.stickerset))
|
|
|
|
|
except StickersetInvalidError:
|
|
|
|
|
await context.edit('回复的贴纸不存在于任何贴纸包中。')
|
|
|
|
|
return
|
|
|
|
|
pack_file = os.path.join('data/sticker/', sticker_set.set.short_name, "pack.txt")
|
|
|
|
|
if os.path.isfile(pack_file):
|
|
|
|
|
os.remove(pack_file)
|
|
|
|
|
# Sticker emojis
|
|
|
|
|
emojis = defaultdict(str)
|
|
|
|
|
for pack in sticker_set.packs:
|
|
|
|
|
for document_id in pack.documents:
|
|
|
|
|
emojis[document_id] += pack.emoticon
|
2021-06-30 07:25:39 +00:00
|
|
|
|
|
2021-08-16 14:32:42 +00:00
|
|
|
|
async def download(sticker, emojis, path, file):
|
|
|
|
|
await context.client.download_media(sticker, file=os.path.join(path, file))
|
|
|
|
|
with open(pack_file, "a") as f:
|
|
|
|
|
f.write(f"{{'image_file': '{file}','emojis':{emojis[sticker.id]}}},")
|
|
|
|
|
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
|
|
|
|
|
animated = import_tgs(os.path.join(path, file))
|
|
|
|
|
export_gif(animated, os.path.join(path, file)[:-3] + 'gif')
|
|
|
|
|
elif file_ext_ns_ion == 'webp':
|
|
|
|
|
convert_png(os.path.join(path, file))
|
2021-06-30 07:25:39 +00:00
|
|
|
|
|
2021-08-16 14:32:42 +00:00
|
|
|
|
pending_tasks = [
|
|
|
|
|
asyncio.ensure_future(
|
|
|
|
|
download(document, emojis, 'data/sticker/' + sticker_set.set.short_name,
|
|
|
|
|
f"{i:03d}.{file_ext_ns_ion}")
|
|
|
|
|
) for i, document in enumerate(sticker_set.documents)
|
|
|
|
|
]
|
|
|
|
|
await context.edit(
|
|
|
|
|
f"正在下载 {sticker_set.set.short_name} 中的 {sticker_set.set.count} 张贴纸。。。")
|
|
|
|
|
num_tasks = len(pending_tasks)
|
|
|
|
|
while 1:
|
|
|
|
|
done, pending_tasks = await asyncio.wait(pending_tasks, timeout=2.5,
|
|
|
|
|
return_when=asyncio.FIRST_COMPLETED)
|
|
|
|
|
if file_ext_ns_ion == 'tgs' and lottie_import and tgs_gif:
|
|
|
|
|
try:
|
|
|
|
|
await context.edit(
|
|
|
|
|
f"正在下载/转换中,进度: {num_tasks - len(pending_tasks)}/{sticker_set.set.count}")
|
|
|
|
|
except MessageNotModifiedError:
|
|
|
|
|
pass
|
|
|
|
|
if not pending_tasks:
|
|
|
|
|
break
|
|
|
|
|
await context.edit("下载完毕,打包上传中。")
|
|
|
|
|
directory_name = sticker_set.set.short_name
|
|
|
|
|
os.chdir("data/sticker/") # 修改当前工作目录
|
|
|
|
|
zipf = zipfile.ZipFile(directory_name + ".zip", "w", zipfile.ZIP_DEFLATED)
|
|
|
|
|
zipdir(directory_name, zipf)
|
|
|
|
|
zipf.close()
|
|
|
|
|
await context.client.send_file(
|
|
|
|
|
context.chat_id,
|
|
|
|
|
directory_name + ".zip",
|
|
|
|
|
caption=sticker_set.set.short_name,
|
|
|
|
|
force_document=True,
|
|
|
|
|
allow_cache=False,
|
|
|
|
|
reply_to=reply_message.id
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
os.remove(directory_name + ".zip")
|
|
|
|
|
os.remove(directory_name)
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
os.chdir(working_dir)
|
|
|
|
|
await context.delete()
|
2021-06-30 07:25:39 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_instance(items, class_or_tuple):
|
|
|
|
|
for item in items:
|
|
|
|
|
if isinstance(item, class_or_tuple):
|
|
|
|
|
return item
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_it_animated_sticker(message):
|
|
|
|
|
try:
|
|
|
|
|
if message.media and message.media.document:
|
|
|
|
|
mime_type = message.media.document.mime_type
|
|
|
|
|
if "tgsticker" in mime_type:
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return False
|
|
|
|
|
except:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def zipdir(path, ziph):
|
|
|
|
|
for root, dirs, files in os.walk(path):
|
|
|
|
|
for file in files:
|
|
|
|
|
ziph.write(os.path.join(root, file))
|
|
|
|
|
os.remove(os.path.join(root, file))
|
2021-07-01 02:31:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_png(path):
|
|
|
|
|
im = Image.open(path)
|
2021-07-01 08:09:25 +00:00
|
|
|
|
im = im.convert('RGBA')
|
2021-07-01 02:31:29 +00:00
|
|
|
|
new_path = path.replace(".webp", ".png")
|
2021-07-01 08:09:25 +00:00
|
|
|
|
im.save(new_path, 'PNG')
|
2021-07-01 02:31:29 +00:00
|
|
|
|
return new_path
|