from asyncio import sleep from typing import Dict, List from pyrogram import filters from pyrogram.errors import ChatAdminRequired, UserAdminInvalid, FloodWait from pyrogram.raw.functions.channels import GetAdminLog, GetMessages from pyrogram.raw.types import ( ChannelAdminLogEventsFilter, ChannelAdminLogEventActionUpdatePinned, InputMessageID, ) from pyrogram.raw.types.channels import AdminLogResults from pyrogram.raw.types.messages import Messages from pagermaid.enums import Message from pagermaid.listener import listener from pagermaid.services import bot async def get_admin_log(cid: int) -> AdminLogResults: d: AdminLogResults = await bot.invoke( GetAdminLog( channel=await bot.resolve_peer(cid), q="", max_id=0, min_id=0, limit=100, events_filter=ChannelAdminLogEventsFilter(pinned=True), ) ) return d def get_num_map(events: AdminLogResults) -> Dict[int, List[int]]: num_map: Dict[int, List[int]] = {} for event in events.events: if isinstance(event.action, ChannelAdminLogEventActionUpdatePinned): if event.action.message.pinned: continue num = num_map.get(event.user_id, []) num.append(event.action.message.id) num_map[event.user_id] = num return num_map async def try_ask_admin(message: Message, num_map: Dict[int, List[int]]) -> int: nums = sorted(num_map.keys(), key=lambda x: len(num_map[x]), reverse=True) text = "请发送执行误取消置顶操作的管理员 id:\n\n" for idx in nums: text += f"`{idx}` - 取消 {len(num_map[idx])} 条\n" await message.edit(text) try: async with bot.conversation( message.chat.id, filters=filters.user(message.from_user.id) ) as conv: await sleep(0.1) res: Message = await conv.get_response() await res.safe_delete() uid = int(res.text) if uid not in num_map: raise ValueError except ValueError: await message.edit("错误:管理员 id 不正确") return 0 return uid async def pin_one(message: Message, mid: int): try: await bot.pin_chat_message(message.chat.id, mid, disable_notification=True) except ChatAdminRequired: return except UserAdminInvalid: return except FloodWait as e: await message.edit(f"触发限制,睡眠 {e.value} 秒") await sleep(e.value) await pin_one(message, mid) async def get_unpin_messages(cid: int, ids: List[int]) -> List[int]: ids = [InputMessageID(id=i) for i in ids] r: Messages = await bot.invoke( GetMessages(channel=await bot.resolve_peer(cid), id=ids) ) new_ids = [] for i in r.messages: if not i.pinned: new_ids.append(i.id) return new_ids async def try_restore_pin(message: Message, ids: List[int]): new_ids = await get_unpin_messages(message.chat.id, ids) if not new_ids: return await message.edit("没有需要恢复的置顶") error = "" for idx, i in enumerate(new_ids): if (idx + 1) % 5 == 0: await message.edit(f"正在恢复第 {idx + 1} 条置顶...") try: await pin_one(message, i) except Exception as e: error += f"恢复第 {idx + 1} 条置顶失败 :{e}\n" await sleep(3) if error: await message.edit(error) else: await message.edit("已恢复所有消息的置顶") @listener( command="restore_pin", description="恢复管理员误取消的置顶", groups_only=True, need_admin=True, ) async def restore_pin(message: Message): if not message.from_user: return message = await message.edit("正在获取管理员日志...") try: events = await get_admin_log(message.chat.id) except Exception as e: return await message.edit(f"请求管理员日志失败:{e}") if not events.events: return await message.edit("管理员日志为空,无法恢复") num_map = get_num_map(events) if not num_map: return await message.edit("管理员日志为空,无法恢复") admin_id = await try_ask_admin(message, num_map) if admin_id not in num_map: return await message.edit("尝试恢复置顶...") await try_restore_pin(message, num_map[admin_id])