GramCore/services/history_data/repositories.py

68 lines
2.8 KiB
Python

from typing import List, Optional
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from gram_core.base_service import BaseService
from gram_core.dependence.database import Database
from gram_core.services.history_data.models import HistoryData
__all__ = ("HistoryDataRepository",)
class HistoryDataRepository(BaseService.Component):
def __init__(self, database: Database):
self.engine = database.engine
async def add(self, data: HistoryData):
async with AsyncSession(self.engine) as session:
session.add(data)
await session.commit()
async def remove(self, data: HistoryData):
async with AsyncSession(self.engine) as session:
await session.delete(data)
await session.commit()
async def update(self, data: HistoryData) -> HistoryData:
async with AsyncSession(self.engine) as session:
session.add(data)
await session.commit()
await session.refresh(data)
return data
async def get_by_id(self, row_id: int) -> Optional[HistoryData]:
async with AsyncSession(self.engine) as session:
statement = select(HistoryData).where(HistoryData.id == row_id)
result = await session.exec(statement)
return result.first()
async def get_by_user_id(self, user_id: int, data_type: int) -> List[Optional[HistoryData]]:
async with AsyncSession(self.engine) as session:
statement = select(HistoryData).where(HistoryData.user_id == user_id).where(HistoryData.type == data_type)
results = await session.exec(statement)
return results.all()
async def get_by_user_id_data_id(self, user_id: int, data_type: int, data_id: int) -> List[Optional[HistoryData]]:
async with AsyncSession(self.engine) as session:
statement = (
select(HistoryData)
.where(HistoryData.user_id == user_id)
.where(HistoryData.type == data_type)
.where(HistoryData.data_id == data_id)
)
results = await session.exec(statement)
return results.all()
async def get_all(self, data_type: int) -> List[HistoryData]:
async with AsyncSession(self.engine) as session:
query = select(HistoryData).where(HistoryData.type == data_type)
results = await session.exec(query)
return results.all()
async def get_all_by_user_id(self, data_type: int, user_id: int) -> List[HistoryData]:
async with AsyncSession(self.engine) as session:
query = select(HistoryData).where(HistoryData.type == data_type).where(HistoryData.user_id == user_id)
results = await session.exec(query)
return results.all()