mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-24 01:01:25 +00:00
xtaodada
0eb98c7b51
All checks were successful
Github commit to telegram / build (push) Successful in 11s
170 lines
6.0 KiB
Python
170 lines
6.0 KiB
Python
import contextlib
|
||
import csv
|
||
from asyncio import sleep
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List
|
||
|
||
from pyrogram.enums import ChatMemberStatus
|
||
from pyrogram.errors import FloodWait
|
||
from pyrogram.types import ChatEventFilter, ChatEvent
|
||
|
||
from pagermaid.enums import Client, Message
|
||
from pagermaid.enums.command import CommandHandler
|
||
from pagermaid.listener import listener
|
||
|
||
HELP_MSG = """通过表格清理 48 小时内加群的用户
|
||
|
||
- `get chat_id` 通过群组管理员日志生成表格(表格将会发送到当前会话,请注意隐私安全)
|
||
- `do chat_id` 通过表格清理用户,需要附带表格文件,将需要清理的用户最后一列标记为 1 即可。"""
|
||
JOIN_USER_FILTER = ChatEventFilter(new_members=True)
|
||
BAN_USER_FILTER = ChatEventFilter(new_restrictions=True)
|
||
DATA_PATH = Path("data")
|
||
|
||
|
||
class FloodClean:
|
||
@staticmethod
|
||
def get_file_name(chat_id: int) -> Path:
|
||
return DATA_PATH / f"{chat_id}.csv"
|
||
|
||
@staticmethod
|
||
def generate_csv_line(event: "ChatEvent", writer: "csv.DictWriter", ids: List[int], banned_ids: List[int]):
|
||
if not event.user:
|
||
return
|
||
user = event.user
|
||
if event.invited_member and event.invited_member.user:
|
||
user = event.invited_member.user
|
||
if user.is_deleted:
|
||
return
|
||
if user.id in ids:
|
||
return
|
||
ids.append(user.id)
|
||
writer.writerow({
|
||
"uid": user.id,
|
||
"full_name": user.full_name,
|
||
"first_name": user.first_name,
|
||
"last_name": user.last_name or "",
|
||
"is_premium": user.is_premium,
|
||
"is_bot": user.is_bot,
|
||
"has_photo": bool(user.photo),
|
||
"username": user.username or "",
|
||
"date": event.date.strftime("%Y-%m-%d %H:%M:%S"),
|
||
"is_kicked": user.id in banned_ids,
|
||
"need_kick": 0,
|
||
})
|
||
|
||
@staticmethod
|
||
async def get_banned_users(client: Client, chat_id: int) -> List[int]:
|
||
ids = []
|
||
now = datetime.now()
|
||
async for event in client.get_chat_event_log(chat_id, filters=BAN_USER_FILTER):
|
||
np = event.new_member_permissions
|
||
if not np:
|
||
continue
|
||
if np.status != ChatMemberStatus.BANNED:
|
||
continue
|
||
user = np.user
|
||
if not np.user:
|
||
continue
|
||
uid = user.id
|
||
if np.until_date and np.until_date <= now:
|
||
continue
|
||
if uid not in ids:
|
||
ids.append(uid)
|
||
return ids
|
||
|
||
@staticmethod
|
||
async def get_event_log(client: Client, chat_id: int) -> str:
|
||
field_order = [
|
||
"uid",
|
||
"full_name",
|
||
"first_name",
|
||
"last_name",
|
||
"is_premium",
|
||
"is_bot",
|
||
"has_photo",
|
||
"username",
|
||
"date",
|
||
"is_kicked",
|
||
"need_kick",
|
||
]
|
||
file_name = FloodClean.get_file_name(chat_id)
|
||
ids = []
|
||
banned_ids = await FloodClean.get_banned_users(client, chat_id)
|
||
with open(file_name, 'w', encoding="utf-8", newline='') as csvfile:
|
||
writer = csv.DictWriter(csvfile, field_order)
|
||
writer.writeheader()
|
||
async for event in client.get_chat_event_log(chat_id, filters=JOIN_USER_FILTER):
|
||
FloodClean.generate_csv_line(event, writer, ids, banned_ids)
|
||
return str(file_name)
|
||
|
||
@staticmethod
|
||
def get_need_kick_users(chat_id: int) -> List[int]:
|
||
file_name = FloodClean.get_file_name(chat_id)
|
||
ids = []
|
||
with open(file_name, 'r', encoding="utf-8") as csvfile:
|
||
reader = csv.DictReader(csvfile)
|
||
for row in reader:
|
||
if row["need_kick"] == "1":
|
||
uid = int(row["uid"])
|
||
if uid not in ids:
|
||
ids.append(uid)
|
||
return ids
|
||
|
||
@staticmethod
|
||
async def try_kick_user(client: Client, message: Message, chat_id: int, uid: int):
|
||
try:
|
||
await client.ban_chat_member(chat_id, uid)
|
||
except FloodWait as e:
|
||
with contextlib.suppress(Exception):
|
||
await message.edit(f"uid: {uid} 遇到 FloodWait,等待 {e.x} 秒")
|
||
await sleep(e.value + 1)
|
||
await FloodClean.try_kick_user(client, message, chat_id, uid)
|
||
|
||
@staticmethod
|
||
async def kick_users(client: Client, message: Message, chat_id: int):
|
||
ids = FloodClean.get_need_kick_users(chat_id)
|
||
for index, uid in enumerate(ids):
|
||
if index % 10 == 0:
|
||
with contextlib.suppress(Exception):
|
||
await message.edit(f"正在清理 {index}/{len(ids)}")
|
||
await FloodClean.try_kick_user(client, message, chat_id, uid)
|
||
|
||
|
||
@listener(command="flood_clean")
|
||
async def flood_clean(message: Message):
|
||
await message.edit(HELP_MSG)
|
||
|
||
|
||
flood_clean: "CommandHandler"
|
||
|
||
|
||
@flood_clean.sub_command(command="get")
|
||
async def flood_clean_get(client: Client, message: Message):
|
||
if len(message.parameter) != 2:
|
||
return await message.edit(HELP_MSG)
|
||
await message.edit("获取中...请等待")
|
||
chat_id = int(message.parameter[1])
|
||
try:
|
||
file_name = await FloodClean.get_event_log(client, chat_id)
|
||
except Exception as e:
|
||
return await message.edit(f"获取群组管理员日志失败: {e}")
|
||
await message.edit("获取成功")
|
||
await message.reply_document(file_name)
|
||
|
||
|
||
@flood_clean.sub_command(command="do")
|
||
async def flood_clean_do(client: Client, message: Message):
|
||
if len(message.parameter) != 2 or (not message.document):
|
||
return await message.edit(HELP_MSG)
|
||
await message.edit("下载中...请等待")
|
||
chat_id = int(message.parameter[1])
|
||
file_name = FloodClean.get_file_name(chat_id)
|
||
await message.download(str(file_name))
|
||
await message.edit("清理中...请等待")
|
||
try:
|
||
await FloodClean.kick_users(client, message, chat_id)
|
||
except Exception as e:
|
||
return await message.edit(f"清理用户失败: {e}")
|
||
await message.edit("清理成功")
|