PagerMaid_Plugins_Pyro/flood_clean/main.py

170 lines
6.0 KiB
Python
Raw Permalink Normal View History

import contextlib
import csv
from asyncio import sleep
from datetime import datetime
from pathlib import Path
from typing import List
from pyrogram.enums import ChatMemberStatus
from pyrogram.errors import FloodWait
from pyrogram.types import ChatEventFilter, ChatEvent
from pagermaid.enums import Client, Message
from pagermaid.enums.command import CommandHandler
from pagermaid.listener import listener
HELP_MSG = """通过表格清理 48 小时内加群的用户
- `get chat_id` 通过群组管理员日志生成表格表格将会发送到当前会话请注意隐私安全
- `do chat_id` 通过表格清理用户需要附带表格文件将需要清理的用户最后一列标记为 1 即可"""
JOIN_USER_FILTER = ChatEventFilter(new_members=True)
BAN_USER_FILTER = ChatEventFilter(new_restrictions=True)
DATA_PATH = Path("data")
class FloodClean:
@staticmethod
def get_file_name(chat_id: int) -> Path:
return DATA_PATH / f"{chat_id}.csv"
@staticmethod
def generate_csv_line(event: "ChatEvent", writer: "csv.DictWriter", ids: List[int], banned_ids: List[int]):
if not event.user:
return
user = event.user
if event.invited_member and event.invited_member.user:
user = event.invited_member.user
if user.is_deleted:
return
if user.id in ids:
return
ids.append(user.id)
writer.writerow({
"uid": user.id,
"full_name": user.full_name,
"first_name": user.first_name,
"last_name": user.last_name or "",
"is_premium": user.is_premium,
"is_bot": user.is_bot,
"has_photo": bool(user.photo),
"username": user.username or "",
"date": event.date.strftime("%Y-%m-%d %H:%M:%S"),
"is_kicked": user.id in banned_ids,
"need_kick": 0,
})
@staticmethod
async def get_banned_users(client: Client, chat_id: int) -> List[int]:
ids = []
now = datetime.now()
async for event in client.get_chat_event_log(chat_id, filters=BAN_USER_FILTER):
np = event.new_member_permissions
if not np:
continue
if np.status != ChatMemberStatus.BANNED:
continue
user = np.user
if not np.user:
continue
uid = user.id
if np.until_date and np.until_date <= now:
continue
if uid not in ids:
ids.append(uid)
return ids
@staticmethod
async def get_event_log(client: Client, chat_id: int) -> str:
field_order = [
"uid",
"full_name",
"first_name",
"last_name",
"is_premium",
"is_bot",
"has_photo",
"username",
"date",
"is_kicked",
"need_kick",
]
file_name = FloodClean.get_file_name(chat_id)
ids = []
banned_ids = await FloodClean.get_banned_users(client, chat_id)
with open(file_name, 'w', encoding="utf-8", newline='') as csvfile:
writer = csv.DictWriter(csvfile, field_order)
writer.writeheader()
async for event in client.get_chat_event_log(chat_id, filters=JOIN_USER_FILTER):
FloodClean.generate_csv_line(event, writer, ids, banned_ids)
return str(file_name)
@staticmethod
def get_need_kick_users(chat_id: int) -> List[int]:
file_name = FloodClean.get_file_name(chat_id)
ids = []
with open(file_name, 'r', encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if row["need_kick"] == "1":
uid = int(row["uid"])
if uid not in ids:
ids.append(uid)
return ids
@staticmethod
async def try_kick_user(client: Client, message: Message, chat_id: int, uid: int):
try:
await client.ban_chat_member(chat_id, uid)
except FloodWait as e:
with contextlib.suppress(Exception):
await message.edit(f"uid: {uid} 遇到 FloodWait等待 {e.x}")
await sleep(e.value + 1)
await FloodClean.try_kick_user(client, message, chat_id, uid)
@staticmethod
async def kick_users(client: Client, message: Message, chat_id: int):
ids = FloodClean.get_need_kick_users(chat_id)
for index, uid in enumerate(ids):
if index % 10 == 0:
with contextlib.suppress(Exception):
await message.edit(f"正在清理 {index}/{len(ids)}")
await FloodClean.try_kick_user(client, message, chat_id, uid)
@listener(command="flood_clean")
async def flood_clean(message: Message):
await message.edit(HELP_MSG)
flood_clean: "CommandHandler"
@flood_clean.sub_command(command="get")
async def flood_clean_get(client: Client, message: Message):
if len(message.parameter) != 2:
return await message.edit(HELP_MSG)
await message.edit("获取中...请等待")
chat_id = int(message.parameter[1])
try:
file_name = await FloodClean.get_event_log(client, chat_id)
except Exception as e:
return await message.edit(f"获取群组管理员日志失败: {e}")
await message.edit("获取成功")
await message.reply_document(file_name)
@flood_clean.sub_command(command="do")
async def flood_clean_do(client: Client, message: Message):
if len(message.parameter) != 2 or (not message.document):
return await message.edit(HELP_MSG)
await message.edit("下载中...请等待")
chat_id = int(message.parameter[1])
file_name = FloodClean.get_file_name(chat_id)
await message.download(str(file_name))
await message.edit("清理中...请等待")
try:
await FloodClean.kick_users(client, message, chat_id)
except Exception as e:
return await message.edit(f"清理用户失败: {e}")
await message.edit("清理成功")