PaiGram/plugins/quiz.py
洛水.山岭居室 693cd774d3
重构插件传入 service 方式
* 重构插件传入 `service` 方式

重构插件传入 `service` 方式
将 `create_handlers`函数的修饰符改为 `classmethod`
2022-07-07 09:36:34 +08:00

291 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import re
from typing import List, Optional
from redis import DataError, ResponseError
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, Poll, \
ReplyKeyboardRemove, Message
from telegram.constants import ChatAction
from telegram.ext import CallbackContext, ConversationHandler, CommandHandler, MessageHandler, filters
from telegram.helpers import escape_markdown
from logger import Log
from manager import listener_plugins_class
from plugins.base import BasePlugins, restricts
from service import BaseService
from service.base import QuestionData, AnswerData
from utils.random import MT19937_Random
class QuizCommandData:
question_id: int = -1
new_question: str = ""
new_correct_answer: str = ""
new_wrong_answer: List[str] = []
status: int = 0
@listener_plugins_class(need_service=True)
class Quiz(BasePlugins):
"""
派蒙的十万个为什么
合并了香港问题修改/添加/删除
"""
CHECK_COMMAND, VIEW_COMMAND, CHECK_QUESTION, \
GET_NEW_QUESTION, GET_NEW_CORRECT_ANSWER, GET_NEW_WRONG_ANSWER, \
QUESTION_EDIT, SAVE_QUESTION = range(10300, 10308)
def __init__(self, service: BaseService):
self.user_time = {}
self.service = service
self.time_out = 120
self.random = MT19937_Random()
@classmethod
def create_handlers(cls, service: BaseService):
quiz = cls(service)
quiz_handler = ConversationHandler(
entry_points=[CommandHandler('quiz', quiz.command_start, block=True)],
states={
quiz.CHECK_COMMAND: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.check_command, block=True)],
quiz.CHECK_QUESTION: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.check_question, block=True)],
quiz.GET_NEW_QUESTION: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.get_new_question, block=True)],
quiz.GET_NEW_CORRECT_ANSWER: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.get_new_correct_answer, block=True)],
quiz.GET_NEW_WRONG_ANSWER: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.get_new_wrong_answer, block=True),
CommandHandler("finish", quiz.finish_edit)],
quiz.SAVE_QUESTION: [MessageHandler(filters.TEXT & ~filters.COMMAND,
quiz.save_question, block=True)],
},
fallbacks=[CommandHandler('cancel', quiz.cancel, block=True)]
)
return [quiz_handler]
async def send_poll(self, update: Update) -> Optional[Message]:
chat = update.message.chat
user = update.effective_user
question_id_list = await self.service.quiz_service.get_question_id_list()
if filters.ChatType.GROUPS.filter(update.message):
Log.info(f"用户 {user.full_name}[{user.id}] 在群 {chat.title}[{chat.id}] 发送挑战问题命令请求")
if len(question_id_list) == 0:
await update.message.reply_text("旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
if len(question_id_list) == 0:
return None
index = self.random.random(0, len(question_id_list))
question = await self.service.quiz_service.get_question(question_id_list[index])
_options = []
correct_option = None
for answer_id in question["answer_id"]:
answer = await self.service.quiz_service.get_answer(answer_id)
_options.append(answer["answer"])
if answer["is_correct"] == 1:
correct_option = answer["answer"]
if correct_option is None:
question_id = question["question_id"]
Log.warning(f"Quiz模块 correct_option 异常 question_id[{question_id}] ")
return None
random.shuffle(_options)
index = _options.index(correct_option)
return await update.effective_message.reply_poll(question["question"], _options,
correct_option_id=index, is_anonymous=False,
open_period=self.time_out, type=Poll.QUIZ)
@restricts(filters.ChatType.GROUPS, ConversationHandler.END, restricts_time=20, try_delete_message=True)
@restricts(filters.ChatType.PRIVATE, ConversationHandler.END)
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
message = update.message
if filters.ChatType.PRIVATE.filter(message):
Log.info(f"用户 {user.full_name}[{user.id}] quiz命令请求")
admin_list = await self.service.admin.get_admin_list()
if user.id in admin_list:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if quiz_command_data is None:
quiz_command_data = QuizCommandData()
context.chat_data["quiz_command_data"] = quiz_command_data
text = f'你好 {user.mention_markdown_v2()} {escape_markdown("!请选择你的操作!")}'
reply_keyboard = [
["查看问题", "添加问题"],
["重载问题"],
["退出"]
]
await message.reply_markdown_v2(text, reply_markup=ReplyKeyboardMarkup(reply_keyboard,
one_time_keyboard=True))
return self.CHECK_COMMAND
else:
await self.send_poll(update)
elif filters.ChatType.GROUPS.filter(update.message):
await update.message.reply_chat_action(ChatAction.TYPING)
poll_message = await self.send_poll(update)
if poll_message is None:
return ConversationHandler.END
self._add_delete_message_job(context, update.message.chat_id, update.message.message_id, 300)
self._add_delete_message_job(context, poll_message.chat_id, poll_message.message_id, 300)
return ConversationHandler.END
async def view_command(self, update: Update, _: CallbackContext) -> int:
keyboard = [
[
InlineKeyboardButton(text="选择问题", switch_inline_query_current_chat="查看问题 ")
]
]
await update.message.reply_text("请回复你要查看的问题",
reply_markup=InlineKeyboardMarkup(keyboard))
return self.CHECK_COMMAND
async def check_question(self, update: Update, _: CallbackContext) -> int:
reply_keyboard = [
["删除问题"],
["退出"]
]
await update.message.reply_text("请选择你的操作", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.CHECK_COMMAND
async def check_command(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "查看问题":
return await self.view_command(update, context)
elif update.message.text == "添加问题":
return await self.add_question(update, context)
elif update.message.text == "删除问题":
return await self.delete_question(update, context)
# elif update.message.text == "修改问题":
# return await self.edit_question(update, context)
elif update.message.text == "重载问题":
return await self.refresh_question(update, context)
else:
result = re.findall(r"问题ID (\d+)", update.message.text)
if len(result) == 1:
try:
question_id = int(result[0])
except ValueError:
await update.message.reply_text("获取问题ID失败")
return ConversationHandler.END
quiz_command_data.question_id = question_id
await update.message.reply_text("获取问题ID成功")
return await self.check_question(update, context)
await update.message.reply_text("命令错误", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def refresh_question(self, update: Update, _: CallbackContext) -> int:
try:
await self.service.quiz_service.refresh_quiz()
except DataError:
await update.message.reply_text("Redis数据错误重载失败", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
except ResponseError as error:
Log.error("重载问题失败", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await update.message.reply_text("重载成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def add_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 1
await update.message.reply_text("请回复你要添加的问题,或发送 /cancel 取消操作", reply_markup=ReplyKeyboardRemove())
return self.GET_NEW_QUESTION
async def get_new_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写正确答案:"
quiz_command_data.new_question = update.message.text
await update.message.reply_markdown_v2(reply_text)
return self.GET_NEW_CORRECT_ANSWER
async def get_new_correct_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"正确答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写错误答案:"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_correct_answer = update.message.text
return self.GET_NEW_WRONG_ANSWER
async def get_new_wrong_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"错误答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"可继续填写,并使用 {escape_markdown('/finish', version=2)} 结束。"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_wrong_answer.append(update.message.text)
return self.GET_NEW_WRONG_ANSWER
async def finish_edit(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(quiz_command_data.new_question, version=2)}`\n" \
f"正确答案:`{escape_markdown(quiz_command_data.new_correct_answer, version=2)}`\n" \
f"错误答案:`{escape_markdown(' '.join(quiz_command_data.new_wrong_answer), version=2)}`"
await update.message.reply_markdown_v2(reply_text)
reply_keyboard = [["保存并重载配置", "抛弃修改并退出"]]
await update.message.reply_text("请核对问题,并选择下一步操作。", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.SAVE_QUESTION
async def save_question(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if update.message.text == "抛弃修改并退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "保存并重载配置":
if quiz_command_data.status == 1:
answer = [
AnswerData(answer=wrong_answer, is_correct=False) for wrong_answer in
quiz_command_data.new_wrong_answer
]
answer.append(AnswerData(answer=quiz_command_data.new_correct_answer, is_correct=True))
await self.service.quiz_service.save_quiz(
QuestionData(question=quiz_command_data.new_question, answer=answer))
await update.message.reply_text("保存成功", reply_markup=ReplyKeyboardRemove())
try:
await self.service.quiz_service.refresh_quiz()
except ResponseError as error:
Log.error("重载问题失败", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
else:
await update.message.reply_text("回复错误,请重新选择")
return self.SAVE_QUESTION
async def edit_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 2
await update.message.reply_text("请回复你要修改的问题", reply_markup=ReplyKeyboardRemove())
return self.GET_NEW_QUESTION
async def delete_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
# 再问题重载Redis 以免redis数据为空时出现奔溃
try:
await self.service.quiz_service.refresh_quiz()
question = await self.service.quiz_service.get_question(quiz_command_data.question_id)
# 因为外键的存在,先删除答案
for answer_id in question["answer_id"]:
await self.service.repository.delete_answer(answer_id)
await self.service.repository.delete_question(question["question_id"])
await update.message.reply_text("删除问题成功", reply_markup=ReplyKeyboardRemove())
await self.service.quiz_service.refresh_quiz()
except ResponseError as error:
Log.error("重载问题失败", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END