mirror of
https://github.com/PaiGramTeam/GramCore.git
synced 2024-11-21 13:48:20 +00:00
89 lines
3.1 KiB
Python
89 lines
3.1 KiB
Python
import contextlib
|
|
import datetime
|
|
from typing import List
|
|
|
|
from sqlalchemy.orm.exc import StaleDataError
|
|
|
|
from gram_core.services.history_data.models import HistoryData
|
|
from gram_core.services.history_data.repositories import HistoryDataRepository
|
|
|
|
__all__ = [
|
|
"HistoryDataBaseServices",
|
|
]
|
|
|
|
|
|
class HistoryDataBaseServices:
|
|
DATA_TYPE: int = 0
|
|
|
|
def __init__(self, task_repository: HistoryDataRepository) -> None:
|
|
self._repository: HistoryDataRepository = task_repository
|
|
|
|
async def add(self, data: HistoryData):
|
|
return await self._repository.add(data)
|
|
|
|
async def remove(self, data: HistoryData):
|
|
return await self._repository.remove(data)
|
|
|
|
async def update(self, data: HistoryData):
|
|
data.time_updated = datetime.datetime.now()
|
|
return await self._repository.update(data)
|
|
|
|
async def get_by_id(self, row_id: int):
|
|
return await self._repository.get_by_id(row_id)
|
|
|
|
async def get_by_user_id(self, user_id: int):
|
|
return await self._repository.get_by_user_id(user_id, self.DATA_TYPE)
|
|
|
|
async def get_by_user_id_data_id(self, user_id: int, data_id: int):
|
|
return await self._repository.get_by_user_id_data_id(user_id, self.DATA_TYPE, data_id)
|
|
|
|
async def get_all(self):
|
|
return await self._repository.get_all(self.DATA_TYPE)
|
|
|
|
async def get_all_by_user_id(self, user_id: int):
|
|
return await self._repository.get_all_by_user_id(self.DATA_TYPE, user_id)
|
|
|
|
@staticmethod
|
|
def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool:
|
|
return False
|
|
|
|
async def remove_same_data_by_data(self, data_list: List[HistoryData]) -> int:
|
|
data_list_len = len(data_list)
|
|
need_delete = []
|
|
for i in range(data_list_len):
|
|
new_data = data_list[i]
|
|
for j in range(i + 1, data_list_len):
|
|
old_data = data_list[j]
|
|
if self.exists_data(new_data, [old_data]) and old_data not in need_delete:
|
|
need_delete.append(old_data)
|
|
if need_delete:
|
|
await self.try_delete(need_delete)
|
|
return len(need_delete)
|
|
|
|
async def try_delete(self, data_list: List[HistoryData]):
|
|
for data in data_list:
|
|
with contextlib.suppress(StaleDataError):
|
|
await self.remove(data)
|
|
|
|
async def remove_same_data_by_user(self, user_id: int) -> int:
|
|
all_data = await self.get_all_by_user_id(user_id)
|
|
# group by data_id
|
|
all_data_map = {}
|
|
for data in all_data:
|
|
if data.data_id not in all_data_map:
|
|
all_data_map[data.data_id] = []
|
|
all_data_map[data.data_id].append(data)
|
|
num = 0
|
|
for _, data_list in all_data_map.items():
|
|
if len(data_list) > 1:
|
|
num += await self.remove_same_data_by_data(data_list)
|
|
return num
|
|
|
|
async def remove_same_data(self) -> int:
|
|
all_data = await self.get_all()
|
|
user_ids = list(set([i.user_id for i in all_data]))
|
|
num = 0
|
|
for user_id in user_ids:
|
|
num += await self.remove_same_data_by_user(user_id)
|
|
return num
|