PaiGram/core/services/quiz/repositories.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

58 lines
2.2 KiB
Python
Raw Permalink Normal View History

2022-07-26 10:07:31 +00:00
from typing import List
from sqlmodel import select
2023-10-25 09:23:01 +00:00
from sqlmodel.ext.asyncio.session import AsyncSession
from core.base_service import BaseService
2023-03-25 03:17:38 +00:00
from core.dependence.database import Database
from core.services.quiz.models import AnswerDB, QuestionDB
2022-07-26 10:07:31 +00:00
__all__ = ("QuizRepository",)
2022-07-26 10:07:31 +00:00
class QuizRepository(BaseService.Component):
2023-03-25 03:17:38 +00:00
def __init__(self, database: Database):
self.engine = database.engine
2022-07-26 10:07:31 +00:00
async def get_question_list(self) -> List[QuestionDB]:
async with AsyncSession(self.engine) as session:
query = select(QuestionDB)
results = await session.exec(query)
return results.all()
2022-08-31 11:19:40 +00:00
async def get_answers_from_question_id(self, question_id: int) -> List[AnswerDB]:
async with AsyncSession(self.engine) as session:
query = select(AnswerDB).where(AnswerDB.question_id == question_id)
results = await session.exec(query)
return results.all()
async def add_question(self, question: QuestionDB):
async with AsyncSession(self.engine) as session:
session.add(question)
await session.commit()
async def get_question_by_text(self, text: str) -> QuestionDB:
async with AsyncSession(self.engine) as session:
query = select(QuestionDB).where(QuestionDB.text == text)
results = await session.exec(query)
return results.first()
async def add_answer(self, answer: AnswerDB):
async with AsyncSession(self.engine) as session:
session.add(answer)
await session.commit()
async def delete_question_by_id(self, question_id: int):
async with AsyncSession(self.engine) as session:
statement = select(QuestionDB).where(QuestionDB.id == question_id)
results = await session.exec(statement)
question = results.one()
await session.delete(question)
async def delete_answer_by_id(self, answer_id: int):
async with AsyncSession(self.engine) as session:
statement = select(AnswerDB).where(AnswerDB.id == answer_id)
results = await session.exec(statement)
answer = results.one()
await session.delete(answer)