PaiGram/plugins/quiz.py

247 lines
14 KiB
Python
Raw Normal View History

2022-04-14 07:18:45 +00:00
import random
import re
import time
from typing import List
from numpy.random import MT19937, Generator
from redis import DataError, ResponseError
2022-04-14 07:18:45 +00:00
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, Poll, ReplyKeyboardRemove
from telegram.ext import CallbackContext, filters, ConversationHandler
from telegram.helpers import escape_markdown
from logger import Log
2022-05-17 12:54:45 +00:00
from plugins.base import BasePlugins
2022-04-14 07:18:45 +00:00
from service import BaseService
from service.base import QuestionData, AnswerData
class QuizCommandData:
question_id: int = -1
new_question: str = ""
new_correct_answer: str = ""
new_wrong_answer: List[str] = []
status: int = 0
2022-05-17 12:54:45 +00:00
class Quiz(BasePlugins):
2022-04-14 07:18:45 +00:00
CHECK_COMMAND, VIEW_COMMAND, CHECK_QUESTION, \
GET_NEW_QUESTION, GET_NEW_CORRECT_ANSWER, GET_NEW_WRONG_ANSWER, \
QUESTION_EDIT, SAVE_QUESTION = range(10300, 10308)
def __init__(self, service: BaseService):
2022-05-17 12:54:45 +00:00
super().__init__(service)
2022-04-14 07:18:45 +00:00
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
self.service = service
self.time_out = 120
def random(self, low: int, high: int) -> int:
if self.send_time + 24 * 60 * 60 >= time.time():
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
return int(self.generator.uniform(low, high))
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
if filters.ChatType.PRIVATE.filter(update.message):
admin_list = await self.service.admin.get_admin_list()
if user.id in admin_list:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if quiz_command_data is None:
quiz_command_data = QuizCommandData()
context.chat_data["quiz_command_data"] = quiz_command_data
message = f'你好 {user.mention_markdown_v2()} {escape_markdown("!请选择你的操作!")}'
reply_keyboard = [
["查看问题", "添加问题"],
["重载问题"],
["退出"]
]
await update.message.reply_markdown_v2(message,
reply_markup=ReplyKeyboardMarkup(reply_keyboard,
one_time_keyboard=True))
return self.CHECK_COMMAND
if filters.ChatType.GROUPS.filter(update.message):
question_id_list = await self.service.quiz_service.get_question_id_list()
if len(question_id_list) == 0:
await update.message.reply_text(f"旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
return ConversationHandler.END
index = self.random(0, len(question_id_list))
question = await self.service.quiz_service.get_question(question_id_list[index])
options = []
correct_option = ""
for answer_id in question["answer_id"]:
answer = await self.service.quiz_service.get_answer(answer_id)
options.append(answer["answer"])
if answer["is_correct"] == 1:
correct_option = answer["answer"]
2022-05-17 12:46:53 +00:00
random.shuffle(options)
2022-04-14 07:18:45 +00:00
index = options.index(correct_option)
2022-05-17 12:54:45 +00:00
poll_message = await update.effective_message.reply_poll(question["question"], options,
correct_option_id=index, is_anonymous=False,
open_period=self.time_out, type=Poll.QUIZ)
2022-05-17 12:55:40 +00:00
self._add_delete_message_job(context, update.message.chat_id, update.message.message_id, 300)
self._add_delete_message_job(context, poll_message.chat_id, poll_message.message_id, 300)
2022-04-14 07:18:45 +00:00
return ConversationHandler.END
return ConversationHandler.END
async def view_command(self, update: Update, context: CallbackContext) -> int:
keyboard = [
[
InlineKeyboardButton(text="选择问题", switch_inline_query_current_chat="查看问题")
]
]
await update.message.reply_text("请回复你要查看的问题",
reply_markup=InlineKeyboardMarkup(keyboard))
return self.CHECK_COMMAND
async def check_question(self, update: Update, context: CallbackContext) -> int:
reply_keyboard = [
["删除问题"],
["退出"]
]
await update.message.reply_text("请选择你的操作", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.CHECK_COMMAND
async def check_command(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "查看问题":
return await self.view_command(update, context)
elif update.message.text == "添加问题":
return await self.add_question(update, context)
elif update.message.text == "删除问题":
return await self.delete_question(update, context)
# elif update.message.text == "修改问题":
# return await self.edit_question(update, context)
elif update.message.text == "重载问题":
return await self.refresh_question(update, context)
else:
result = re.findall(r"问题ID (\d+)", update.message.text)
if len(result) == 1:
try:
question_id = int(result[0])
except ValueError:
await update.message.reply_text("获取问题ID失败")
return ConversationHandler.END
quiz_command_data.question_id = question_id
await update.message.reply_text("获取问题ID成功")
return await self.check_question(update, context)
await update.message.reply_text("命令错误", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def refresh_question(self, update: Update, context: CallbackContext) -> int:
try:
await self.service.quiz_service.refresh_quiz()
except DataError:
await update.message.reply_text("Redis数据错误重载失败", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
except ResponseError as error:
Log.error("重载问题失败 /n", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
2022-04-14 07:18:45 +00:00
await update.message.reply_text("重载成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def add_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 1
await update.message.reply_text("请回复你要添加的问题,或发送 /cancel 取消操作", reply_markup=ReplyKeyboardRemove())
2022-04-14 07:18:45 +00:00
return self.GET_NEW_QUESTION
async def get_new_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写正确答案:"
quiz_command_data.new_question = update.message.text
await update.message.reply_markdown_v2(reply_text)
return self.GET_NEW_CORRECT_ANSWER
async def get_new_correct_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"正确答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写错误答案:"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_correct_answer = update.message.text
return self.GET_NEW_WRONG_ANSWER
async def get_new_wrong_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"错误答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"可继续填写,并使用 {escape_markdown('/finish', version=2)} 结束。"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_wrong_answer.append(update.message.text)
return self.GET_NEW_WRONG_ANSWER
async def finish_edit(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(quiz_command_data.new_question, version=2)}`\n" \
f"正确答案:`{escape_markdown(quiz_command_data.new_correct_answer, version=2)}`\n" \
f"错误答案:`{escape_markdown(' '.join(quiz_command_data.new_wrong_answer), version=2)}`"
await update.message.reply_markdown_v2(reply_text)
2022-04-20 12:38:34 +00:00
reply_keyboard = [["保存并重载配置", "抛弃修改并退出"]]
2022-04-14 07:18:45 +00:00
await update.message.reply_text("请核对问题,并选择下一步操作。", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.SAVE_QUESTION
async def save_question(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
2022-04-20 07:52:15 +00:00
if update.message.text == "抛弃修改并退出":
2022-04-14 07:18:45 +00:00
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "保存并重载配置":
if quiz_command_data.status == 1:
answer = [
AnswerData(answer=wrong_answer, is_correct=False) for wrong_answer in
quiz_command_data.new_wrong_answer
]
answer.append(AnswerData(answer=quiz_command_data.new_correct_answer, is_correct=True))
await self.service.quiz_service.save_quiz(
QuestionData(question=quiz_command_data.new_question, answer=answer))
await update.message.reply_text("保存成功", reply_markup=ReplyKeyboardRemove())
try:
await self.service.quiz_service.refresh_quiz()
except ResponseError as error:
Log.error("重载问题失败 /n", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
2022-04-20 12:38:34 +00:00
return ConversationHandler.END
2022-04-14 07:18:45 +00:00
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
else:
await update.message.reply_text("回复错误,请重新选择")
return self.SAVE_QUESTION
async def edit_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 2
await update.message.reply_text("请回复你要修改的问题", reply_markup=ReplyKeyboardRemove())
return self.GET_NEW_QUESTION
async def delete_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
# 再问题重载Redis 以免redis数据为空时出现奔溃
2022-04-20 12:38:34 +00:00
try:
await self.service.quiz_service.refresh_quiz()
question = await self.service.quiz_service.get_question(quiz_command_data.question_id)
# 因为外键的存在,先删除答案
for answer_id in question["answer_id"]:
await self.service.repository.delete_answer(answer_id)
await self.service.repository.delete_question(question["question_id"])
await update.message.reply_text("删除问题成功", reply_markup=ReplyKeyboardRemove())
await self.service.quiz_service.refresh_quiz()
except ResponseError as error:
Log.error("重载问题失败 /n", error)
await update.message.reply_text("重载问题失败异常抛出Redis请求错误异常详情错误请看日记",
reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
2022-04-14 07:18:45 +00:00
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END