PaiGram/plugins/system/admin.py

83 lines
3.3 KiB
Python
Raw Permalink Normal View History

2022-07-26 10:07:31 +00:00
from telegram import Update
from telegram.error import BadRequest, Forbidden
from telegram.ext import CallbackContext, CommandHandler
from core.admin import BotAdminService
2022-07-26 10:07:31 +00:00
from logger import Log
from utils.decorators.admins import bot_admins_rights_check
from utils.plugins.manager import listener_plugins_class
from utils.service.inject import inject
2022-07-26 10:07:31 +00:00
@listener_plugins_class()
class Admin:
"""有关BOT ADMIN处理"""
2022-07-31 04:51:22 +00:00
@inject
def __init__(self, bot_admin_service: BotAdminService = None):
2022-07-31 04:51:22 +00:00
self.bot_admin_service = bot_admin_service
2022-07-26 10:07:31 +00:00
@classmethod
def create_handlers(cls) -> list:
admin = cls()
return [
CommandHandler("add_admin", admin.add_admin, block=False),
CommandHandler("del_admin", admin.del_admin, block=False),
CommandHandler("leave_chat", admin.leave_chat, block=False),
]
@bot_admins_rights_check
2022-07-31 04:51:22 +00:00
async def add_admin(self, update: Update, _: CallbackContext):
2022-07-26 10:07:31 +00:00
message = update.message
reply_to_message = message.reply_to_message
if reply_to_message is None:
await message.reply_text("请回复对应消息")
else:
2022-07-31 04:51:22 +00:00
admin_list = await self.bot_admin_service.get_admin_list()
2022-07-26 10:07:31 +00:00
if reply_to_message.from_user.id in admin_list:
await message.reply_text("该用户已经存在管理员列表")
else:
2022-07-31 04:51:22 +00:00
await self.bot_admin_service.add_admin(reply_to_message.from_user.id)
2022-07-26 10:07:31 +00:00
await message.reply_text("添加成功")
@bot_admins_rights_check
2022-07-31 04:51:22 +00:00
async def del_admin(self, update: Update, _: CallbackContext):
2022-07-26 10:07:31 +00:00
message = update.message
reply_to_message = message.reply_to_message
2022-07-31 04:51:22 +00:00
admin_list = await self.bot_admin_service.get_admin_list()
2022-07-26 10:07:31 +00:00
if reply_to_message is None:
await message.reply_text("请回复对应消息")
else:
if reply_to_message.from_user.id in admin_list:
2022-07-31 04:51:22 +00:00
await self.bot_admin_service.delete_admin(reply_to_message.from_user.id)
2022-07-26 10:07:31 +00:00
await message.reply_text("删除成功")
else:
await message.reply_text("该用户不存在管理员列表")
@bot_admins_rights_check
async def leave_chat(self, update: Update, context: CallbackContext):
message = update.message
try:
args = message.text.split()
if len(args) >= 2:
chat_id = int(args[1])
else:
await message.reply_text("输入错误")
return
except ValueError as error:
Log.error("获取 chat_id 发生错误! 错误信息为 \n", error)
await message.reply_text("输入错误")
return
try:
try:
char = await context.bot.get_chat(chat_id)
await message.reply_text(f"正在尝试退出群 {char.title}[{char.id}]")
except (BadRequest, Forbidden):
pass
await context.bot.leave_chat(chat_id)
except (BadRequest, Forbidden) as error:
Log.error(f"退出 chat_id[{chat_id}] 发生错误! 错误信息为 \n", error)
await message.reply_text(f"退出 chat_id[{chat_id}] 发生错误! 错误信息为 {str(error)}")
return
await message.reply_text(f"退出 chat_id[{chat_id}] 成功!")