mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-16 20:10:36 +00:00
49 lines
1.6 KiB
Python
49 lines
1.6 KiB
Python
from typing import cast
|
|
|
|
from sqlalchemy import select
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
|
|
|
from core.base.mysql import MySQL
|
|
from .error import UserNotFoundError
|
|
from .models import User
|
|
|
|
|
|
class UserRepository:
|
|
def __init__(self, mysql: MySQL):
|
|
self.mysql = mysql
|
|
|
|
async def get_by_user_id(self, user_id: int) -> User:
|
|
async with self.mysql.Session() as session:
|
|
session = cast(AsyncSession, session)
|
|
statement = select(User).where(User.user_id == user_id)
|
|
results = await session.exec(statement)
|
|
if user := results.first():
|
|
return user[0]
|
|
else:
|
|
raise UserNotFoundError(user_id)
|
|
|
|
async def update_user(self, user: User):
|
|
async with self.mysql.Session() as session:
|
|
session = cast(AsyncSession, session)
|
|
session.add(user)
|
|
await session.commit()
|
|
await session.refresh(user)
|
|
|
|
async def add_user(self, user: User):
|
|
async with self.mysql.Session() as session:
|
|
session = cast(AsyncSession, session)
|
|
session.add(user)
|
|
await session.commit()
|
|
|
|
async def del_user_by_id(self, user_id):
|
|
async with self.mysql.Session() as session:
|
|
session = cast(AsyncSession, session)
|
|
statement = select(User).where(User.user_id == user_id)
|
|
results = await session.execute(statement)
|
|
user = results.unique().scalar_one()
|
|
if user:
|
|
await session.delete(user)
|
|
await session.commit()
|
|
else:
|
|
raise UserNotFoundError(user_id)
|