PagerMaid_Plugins_Pyro/sticker_transfer/main.py

149 lines
4.9 KiB
Python
Raw Normal View History

2022-07-03 05:50:11 +00:00
""" PagerMaid module that 贴纸迁移 """
import csv
from asyncio import sleep
2022-07-03 12:50:16 +00:00
from os import sep
2022-07-03 05:50:11 +00:00
from pyrogram.errors import StickersetInvalid, FloodWait
from pyrogram.raw.functions.messages import GetAllStickers, UninstallStickerSet
2022-07-03 05:50:11 +00:00
from pyrogram.raw.functions.messages import InstallStickerSet
from pyrogram.raw.types import InputStickerSetShortName
from pagermaid import bot
from pagermaid.listener import listener
from pagermaid.single_utils import Message, safe_remove
class NoStickerSetError(Exception):
pass
2022-07-03 05:50:11 +00:00
async def export_sticker_to_csv():
stickers = await bot.invoke(GetAllStickers(hash=0))
if not stickers.sets:
raise NoStickerSetError
2022-07-03 05:50:11 +00:00
with open("stickers.csv", "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["name", "short_name", "is_masks", "is_animated", "is_video"])
for sticker_set in stickers.sets:
2023-07-01 12:18:58 +00:00
writer.writerow(
[
sticker_set.title,
sticker_set.short_name,
sticker_set.archived if hasattr(sticker_set, "archived") else False,
sticker_set.animated if hasattr(sticker_set, "animated") else False,
sticker_set.videos if hasattr(sticker_set, "videos") else False,
]
)
2022-07-03 12:50:16 +00:00
return len(stickers.sets)
2022-07-03 05:50:11 +00:00
async def import_sticker(short_name):
2023-07-01 12:18:58 +00:00
await bot.invoke(
InstallStickerSet(
stickerset=InputStickerSetShortName(short_name=short_name),
archived=False,
)
)
2022-07-03 05:50:11 +00:00
async def remove_sticker(short_name):
2023-07-01 12:18:58 +00:00
await bot.invoke(
UninstallStickerSet(
stickerset=InputStickerSetShortName(short_name=short_name),
)
)
2022-07-03 05:50:11 +00:00
async def import_sticker_from_csv(file_name):
success = 0
failed = 0
need_import = []
2022-07-03 05:50:11 +00:00
with open(file_name, "r", newline="", encoding="utf-8-sig") as f:
reader = csv.reader(f)
for row in reader:
if row[0] == "name":
continue
need_import.insert(0, row[1])
for i in need_import:
try:
await import_sticker(i)
success += 1
except StickersetInvalid:
failed += 1
except FloodWait as e:
await sleep(e.value)
await import_sticker(row[1])
success += 1
except Exception:
failed += 1
return success, failed
async def clear_sets():
stickers = await bot.invoke(GetAllStickers(hash=0))
if not stickers.sets:
raise NoStickerSetError
success = 0
failed = 0
for sticker_set in stickers.sets:
try:
await remove_sticker(sticker_set.short_name)
success += 1
except FloodWait as e:
await sleep(e.value)
await remove_sticker(sticker_set.short_name)
failed += 1
except Exception:
failed += 1
2022-07-03 05:50:11 +00:00
return success, failed
2023-07-01 12:18:58 +00:00
@listener(
command="sticker_transfer",
need_admin=True,
parameters="导出/导入/清空",
description="导出、导入、清空已安装的贴纸包",
)
2022-07-03 05:50:11 +00:00
async def sticker_transfer(message: Message):
if message.arguments == "导出":
try:
num = await export_sticker_to_csv()
except NoStickerSetError:
return await message.edit("没有贴纸包可以导出")
2022-07-03 12:50:16 +00:00
if num:
2022-11-21 11:38:51 +00:00
await bot.send_document(
message.chat.id,
"stickers.csv",
caption=f"贴纸包导出文件,成功导出了 {num} 个贴纸包",
thumb=f"pagermaid{sep}assets{sep}logo.jpg",
2023-07-01 12:18:58 +00:00
reply_to_message_id=message.reply_to_top_message_id,
2022-11-21 11:38:51 +00:00
)
2022-07-03 12:50:16 +00:00
safe_remove("stickers.csv")
await message.safe_delete()
else:
await message.edit("没有贴纸包可以导出")
2022-07-03 05:50:11 +00:00
elif message.arguments == "导入":
reply = message.reply_to_message
if not reply:
2022-07-03 12:50:16 +00:00
return await message.edit("❌ 请回复贴纸包导出文件")
2022-07-03 05:50:11 +00:00
if not reply.document:
2022-07-03 12:50:16 +00:00
return await message.edit("❌ 请回复贴纸包导出文件")
2022-07-03 05:50:11 +00:00
if not reply.document.file_name.endswith(".csv"):
2022-07-03 12:50:16 +00:00
return await message.edit("❌ 请回复贴纸包导出文件")
2022-07-03 05:50:11 +00:00
message = await message.edit("导入中")
file_name = await reply.download()
success, failed = await import_sticker_from_csv(file_name)
safe_remove(file_name)
2022-07-03 12:50:16 +00:00
await message.edit(f"导入成功 {success} 个贴纸包,失败 {failed} 个贴纸包")
elif message.arguments == "清空":
message = await message.edit("清空中")
try:
success, failed = await clear_sets()
except NoStickerSetError:
return await message.edit("没有贴纸包可以清空")
await message.edit(f"清空成功 {success} 个贴纸包,失败 {failed} 个贴纸包")
2022-07-03 05:50:11 +00:00
else:
await message.edit("❌ 参数错误,请选择 `导出` 或 `导入` 或 `清空`")