PagerMaid_Plugins_Pyro/restore_pin/main.py
2024-02-04 15:56:06 +08:00

136 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from asyncio import sleep
from typing import Dict, List
from pyrogram import filters
from pyrogram.errors import ChatAdminRequired, UserAdminInvalid, FloodWait
from pyrogram.raw.functions.channels import GetAdminLog, GetMessages
from pyrogram.raw.types import (
ChannelAdminLogEventsFilter,
ChannelAdminLogEventActionUpdatePinned,
InputMessageID,
)
from pyrogram.raw.types.channels import AdminLogResults
from pyrogram.raw.types.messages import Messages
from pagermaid.enums import Message
from pagermaid.listener import listener
from pagermaid.services import bot
async def get_admin_log(cid: int) -> AdminLogResults:
d: AdminLogResults = await bot.invoke(
GetAdminLog(
channel=await bot.resolve_peer(cid),
q="",
max_id=0,
min_id=0,
limit=100,
events_filter=ChannelAdminLogEventsFilter(pinned=True),
)
)
return d
def get_num_map(events: AdminLogResults) -> Dict[int, List[int]]:
num_map: Dict[int, List[int]] = {}
for event in events.events:
if isinstance(event.action, ChannelAdminLogEventActionUpdatePinned):
if event.action.message.pinned:
continue
num = num_map.get(event.user_id, [])
num.append(event.action.message.id)
num_map[event.user_id] = num
return num_map
async def try_ask_admin(message: Message, num_map: Dict[int, List[int]]) -> int:
nums = sorted(num_map.keys(), key=lambda x: len(num_map[x]), reverse=True)
text = "请发送执行误取消置顶操作的管理员 id\n\n"
for idx in nums:
text += f"`{idx}` - 取消 {len(num_map[idx])}\n"
await message.edit(text)
try:
async with bot.conversation(
message.chat.id, filters=filters.user(message.from_user.id)
) as conv:
await sleep(0.1)
res: Message = await conv.get_response()
await res.safe_delete()
uid = int(res.text)
if uid not in num_map:
raise ValueError
except ValueError:
await message.edit("错误:管理员 id 不正确")
return 0
return uid
async def pin_one(message: Message, mid: int):
try:
await bot.pin_chat_message(message.chat.id, mid, disable_notification=True)
except ChatAdminRequired:
return
except UserAdminInvalid:
return
except FloodWait as e:
await message.edit(f"触发限制,睡眠 {e.value}")
await sleep(e.value)
await pin_one(message, mid)
async def get_unpin_messages(cid: int, ids: List[int]) -> List[int]:
ids = [InputMessageID(id=i) for i in ids]
r: Messages = await bot.invoke(
GetMessages(channel=await bot.resolve_peer(cid), id=ids)
)
new_ids = []
for i in r.messages:
if not i.pinned:
new_ids.append(i.id)
return new_ids
async def try_restore_pin(message: Message, ids: List[int]):
new_ids = await get_unpin_messages(message.chat.id, ids)
if not new_ids:
return await message.edit("没有需要恢复的置顶")
error = ""
for idx, i in enumerate(new_ids):
if (idx + 1) % 5 == 0:
await message.edit(f"正在恢复第 {idx + 1} 条置顶...")
try:
await pin_one(message, i)
except Exception as e:
error += f"恢复第 {idx + 1} 条置顶失败 {e}\n"
await sleep(3)
if error:
await message.edit(error)
else:
await message.edit("已恢复所有消息的置顶")
@listener(
command="restore_pin",
description="恢复管理员误取消的置顶",
groups_only=True,
need_admin=True,
)
async def restore_pin(message: Message):
if not message.from_user:
return
message = await message.edit("正在获取管理员日志...")
try:
events = await get_admin_log(message.chat.id)
except Exception as e:
return await message.edit(f"请求管理员日志失败:{e}")
if not events.events:
return await message.edit("管理员日志为空,无法恢复")
num_map = get_num_map(events)
if not num_map:
return await message.edit("管理员日志为空,无法恢复")
admin_id = await try_ask_admin(message, num_map)
if admin_id not in num_map:
return
await message.edit("尝试恢复置顶...")
await try_restore_pin(message, num_map[admin_id])