From 01b576f9f5d4d4561e3b86f4696c3dda6bf76910 Mon Sep 17 00:00:00 2001 From: omg-xtao <100690902+omg-xtao@users.noreply.github.com> Date: Sat, 16 Dec 2023 18:01:27 +0800 Subject: [PATCH] :sparkles: Support migrate user data --- .devcontainer/devcontainer.json | 26 ++++++ core/services/players/services.py | 6 ++ gram_core | 2 +- modules/gacha_log/log.py | 17 ++++ modules/gacha_log/migrate.py | 50 +++++++++++ modules/pay_log/log.py | 17 ++++ modules/pay_log/migrate.py | 50 +++++++++++ plugins/account/account.py | 6 ++ plugins/account/migrate.py | 137 ++++++++++++++++++++++++++++++ plugins/admin/migrate.py | 122 ++++++++++++++++++++++++++ plugins/genshin/pay_log.py | 14 ++- plugins/genshin/wish_log.py | 10 ++- plugins/tools/daily_note.py | 64 ++++++++++++++ 13 files changed, 516 insertions(+), 5 deletions(-) create mode 100644 .devcontainer/devcontainer.json create mode 100644 modules/gacha_log/migrate.py create mode 100644 modules/pay_log/migrate.py create mode 100644 plugins/account/migrate.py create mode 100644 plugins/admin/migrate.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..9361826 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,26 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/python +{ + "name": "Python 3", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/python:1-3.12-bookworm", + "features": { + "ghcr.io/devcontainers-contrib/features/pdm:2": {}, + "ghcr.io/devcontainers-contrib/features/poetry:2": {} + }, + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + "postCreateCommand": "poetry install --extras all" + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/core/services/players/services.py b/core/services/players/services.py index 4d4b7f3..e0c3986 100644 --- a/core/services/players/services.py +++ b/core/services/players/services.py @@ -151,3 +151,9 @@ class PlayerInfoService(BaseService): async def delete(self, player_info: PlayerInfoSQLModel): await self._players_info_repository.delete(player_info) + + async def update(self, player_info: PlayerInfoSQLModel): + await self._players_info_repository.update(player_info) + + async def get_all_by_user_id(self, user_id: int): + return await self._players_info_repository.get_all_by_user_id(user_id) diff --git a/gram_core b/gram_core index 9dd1aca..4718860 160000 --- a/gram_core +++ b/gram_core @@ -1 +1 @@ -Subproject commit 9dd1acab7842fb2c7cfd5ca507de93778318dc82 +Subproject commit 4718860a87e5b64eb59966f7122716fd70ece8f0 diff --git a/modules/gacha_log/log.py b/modules/gacha_log/log.py index 432afbb..a894ba6 100644 --- a/modules/gacha_log/log.py +++ b/modules/gacha_log/log.py @@ -107,6 +107,23 @@ class GachaLog: return True return False + async def move_history_info(self, user_id: str, uid: str, new_user_id: str) -> bool: + """移动历史抽卡记录数据 + :param user_id: 用户id + :param uid: 原神uid + :param new_user_id: 新用户id + :return: 是否移动成功 + """ + old_file_path = self.gacha_log_path / f"{user_id}-{uid}.json" + new_file_path = self.gacha_log_path / f"{new_user_id}-{uid}.json" + if (not old_file_path.exists()) or new_file_path.exists(): + return False + try: + old_file_path.rename(new_file_path) + return True + except PermissionError: + return False + async def save_gacha_log_info(self, user_id: str, uid: str, info: GachaLogInfo): """保存抽卡记录数据 :param user_id: 用户id diff --git a/modules/gacha_log/migrate.py b/modules/gacha_log/migrate.py new file mode 100644 index 0000000..7cfea8e --- /dev/null +++ b/modules/gacha_log/migrate.py @@ -0,0 +1,50 @@ +from typing import Optional, List, TYPE_CHECKING + +from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException +from modules.gacha_log.log import GachaLog + +if TYPE_CHECKING: + from gram_core.services.players.models import Player + + +class GachaLogMigrate(IMigrateData, GachaLog): + old_user_id: int + new_user_id: int + old_uid_list: List[int] + + async def migrate_data_msg(self) -> str: + return f"{len(self.old_uid_list)} 个账号的抽卡记录数据" + + async def migrate_data(self) -> bool: + uid_list = [] + for uid in self.old_uid_list: + if not await self.move_history_info(str(self.old_user_id), str(uid), str(self.new_user_id)): + uid_list.append(str(uid)) + if uid_list: + raise MigrateDataException(f"抽卡记录数据迁移失败:uid {','.join(uid_list)}") + return True + + @classmethod + async def create( + cls, + old_user_id: int, + new_user_id: int, + players: List["Player"], + ) -> Optional["GachaLogMigrate"]: + if not players: + return None + _uid_list = [player.player_id for player in players if player and player.player_id] + if not _uid_list: + return None + self = cls() + old_uid_list = [] + for uid in _uid_list: + _, status = await self.load_history_info(str(old_user_id), str(uid), True) + if status: + old_uid_list.append(uid) + if not old_uid_list: + return None + self.old_user_id = old_user_id + self.new_user_id = new_user_id + self.old_uid_list = old_uid_list + return self diff --git a/modules/pay_log/log.py b/modules/pay_log/log.py index 6e3b466..b684185 100644 --- a/modules/pay_log/log.py +++ b/modules/pay_log/log.py @@ -96,6 +96,23 @@ class PayLog: return True return False + async def move_history_info(self, user_id: str, uid: str, new_user_id: str) -> bool: + """移动历史充值记录数据 + :param user_id: 用户id + :param uid: 原神uid + :param new_user_id: 新用户id + :return: 是否移动成功 + """ + old_file_path = self.get_file_path(user_id, uid) + new_file_path = self.get_file_path(new_user_id, uid) + if (not old_file_path.exists()) or new_file_path.exists(): + return False + try: + old_file_path.rename(new_file_path) + return True + except PermissionError: + return False + async def save_pay_log_info(self, user_id: str, uid: str, info: PayLogModel) -> None: """保存日志记录数据 :param user_id: 用户id diff --git a/modules/pay_log/migrate.py b/modules/pay_log/migrate.py new file mode 100644 index 0000000..32874c2 --- /dev/null +++ b/modules/pay_log/migrate.py @@ -0,0 +1,50 @@ +from typing import Optional, List, TYPE_CHECKING + +from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException +from modules.pay_log.log import PayLog + +if TYPE_CHECKING: + from gram_core.services.players.models import Player + + +class PayLogMigrate(IMigrateData, PayLog): + old_user_id: int + new_user_id: int + old_uid_list: List[int] + + async def migrate_data_msg(self) -> str: + return f"{len(self.old_uid_list)} 个账号的充值记录数据" + + async def migrate_data(self) -> bool: + uid_list = [] + for uid in self.old_uid_list: + if not await self.move_history_info(str(self.old_user_id), str(uid), str(self.new_user_id)): + uid_list.append(str(uid)) + if uid_list: + raise MigrateDataException(f"充值记录数据迁移失败:uid {','.join(uid_list)}") + return True + + @classmethod + async def create( + cls, + old_user_id: int, + new_user_id: int, + players: List["Player"], + ) -> Optional["PayLogMigrate"]: + if not players: + return None + _uid_list = [player.player_id for player in players if player and player.player_id] + if not _uid_list: + return None + self = cls() + old_uid_list = [] + for uid in _uid_list: + _, status = await self.load_history_info(str(old_user_id), str(uid), True) + if status: + old_uid_list.append(uid) + if not old_uid_list: + return None + self.old_user_id = old_user_id + self.new_user_id = new_user_id + self.old_uid_list = old_uid_list + return self diff --git a/plugins/account/account.py b/plugins/account/account.py index a9b8232..eeb8b13 100644 --- a/plugins/account/account.py +++ b/plugins/account/account.py @@ -19,6 +19,7 @@ from core.services.cookies.error import TooManyRequestPublicCookies, CookiesCach from core.services.cookies.services import CookiesService, PublicCookiesService from core.services.players.models import PlayersDataBase as Player, PlayerInfoSQLModel from core.services.players.services import PlayersService, PlayerInfoService +from plugins.account.migrate import AccountMigrate from plugins.tools.genshin import GenshinHelper from utils.log import logger @@ -292,3 +293,8 @@ class BindAccountPlugin(Plugin.Conversation): return ConversationHandler.END await message.reply_text("回复错误,请重新输入") return COMMAND_RESULT + + async def get_migrate_data(self, old_user_id: int, new_user_id: int, _) -> Optional[AccountMigrate]: + return await AccountMigrate.create( + old_user_id, new_user_id, self.players_service, self.player_info_service, self.cookies_service + ) diff --git a/plugins/account/migrate.py b/plugins/account/migrate.py new file mode 100644 index 0000000..9b5d014 --- /dev/null +++ b/plugins/account/migrate.py @@ -0,0 +1,137 @@ +from typing import Optional, List + +from sqlalchemy.orm.exc import StaleDataError + +from core.services.players.services import PlayerInfoService +from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException +from gram_core.services.cookies import CookiesService +from gram_core.services.cookies.models import CookiesDataBase as Cookies +from gram_core.services.players import PlayersService +from gram_core.services.players.models import PlayersDataBase as Player, PlayerInfoSQLModel as PlayerInfo + + +class AccountMigrate(IMigrateData): + old_user_id: int + new_user_id: int + players_service: PlayersService + player_info_service: PlayerInfoService + cookies_service: CookiesService + need_migrate_player: List[Player] + need_migrate_player_info: List[PlayerInfo] + need_migrate_cookies: List[Cookies] + + async def migrate_data_msg(self) -> str: + text = [] + if self.need_migrate_player: + text.append(f"player 数据 {len(self.need_migrate_player)} 条") + if self.need_migrate_player_info: + text.append(f"player_info 数据 {len(self.need_migrate_player_info)} 条") + if self.need_migrate_cookies: + text.append(f"cookies 数据 {len(self.need_migrate_cookies)} 条") + return "、".join(text) + + async def migrate_data(self) -> bool: + players, players_info, cookies = [], [], [] + for player in self.need_migrate_player: + try: + await self.players_service.update(player) + except StaleDataError: + players.append(str(player.player_id)) + for player_info in self.need_migrate_player_info: + try: + await self.player_info_service.update(player_info) + except StaleDataError: + players_info.append(str(player_info.player_id)) + for cookie in self.need_migrate_cookies: + try: + await self.cookies_service.update(cookie) + except StaleDataError: + cookies.append(str(cookie.account_id)) + if any([players, players_info, cookies]): + text = [] + if players: + text.append(f"player 数据迁移失败 player_id {','.join(players)}") + if players_info: + text.append(f"player_info 数据迁移失败 player_id {','.join(players_info)}") + if cookies: + text.append(f"cookies 数据迁移失败 account_id {','.join(cookies)}") + raise MigrateDataException("、".join(text)) + return True + + @staticmethod + async def create_players( + old_user_id: int, + new_user_id: int, + players_service: PlayersService, + ) -> List[Player]: + need_migrate, new_data = await AccountMigrate.filter_sql_data( + Player, + players_service.get_all_by_user_id, + old_user_id, + new_user_id, + (Player.account_id, Player.player_id, Player.region), + ) + for i in need_migrate: + i.user_id = new_user_id + if new_data: + i.is_chosen = False + return need_migrate + + @staticmethod + async def create_players_info( + old_user_id: int, + new_user_id: int, + player_info_service: PlayerInfoService, + ) -> List[PlayerInfo]: + need_migrate, _ = await AccountMigrate.filter_sql_data( + PlayerInfo, + player_info_service.get_all_by_user_id, + old_user_id, + new_user_id, + (PlayerInfo.user_id, PlayerInfo.player_id), + ) + for i in need_migrate: + i.user_id = new_user_id + return need_migrate + + @staticmethod + async def create_cookies( + old_user_id: int, + new_user_id: int, + cookies_service: CookiesService, + ) -> List[Cookies]: + need_migrate, _ = await AccountMigrate.filter_sql_data( + Cookies, + cookies_service.get_all, + old_user_id, + new_user_id, + (Cookies.user_id, Cookies.account_id, Cookies.region), + ) + for i in need_migrate: + i.user_id = new_user_id + return need_migrate + + @classmethod + async def create( + cls, + old_user_id: int, + new_user_id: int, + players_service: PlayersService, + player_info_service: PlayerInfoService, + cookies_service: CookiesService, + ) -> Optional["AccountMigrate"]: + need_migrate_player = await cls.create_players(old_user_id, new_user_id, players_service) + need_migrate_player_info = await cls.create_players_info(old_user_id, new_user_id, player_info_service) + need_migrate_cookies = await cls.create_cookies(old_user_id, new_user_id, cookies_service) + if not any([need_migrate_player, need_migrate_player_info, need_migrate_cookies]): + return None + self = cls() + self.old_user_id = old_user_id + self.new_user_id = new_user_id + self.players_service = players_service + self.player_info_service = player_info_service + self.cookies_service = cookies_service + self.need_migrate_player = need_migrate_player + self.need_migrate_player_info = need_migrate_player_info + self.need_migrate_cookies = need_migrate_cookies + return self diff --git a/plugins/admin/migrate.py b/plugins/admin/migrate.py new file mode 100644 index 0000000..9cc6468 --- /dev/null +++ b/plugins/admin/migrate.py @@ -0,0 +1,122 @@ +from functools import partial +from typing import Dict, List, TYPE_CHECKING + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup + +from core.plugin import Plugin, handler +from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException +from gram_core.services.players import PlayersService +from utils.log import logger + +if TYPE_CHECKING: + from telegram import Update + from telegram.ext import ContextTypes + + +class MigrateAdmin(Plugin): + def __init__(self, players_service: PlayersService): + self.players_service = players_service + self.cache_data: Dict[int, List[IMigrateData]] = {} + self.wait_time = 60 + + async def _add_pop_cache_job(self, user_id: int) -> None: + if user_id in self.cache_data: + del self.cache_data[user_id] + + def add_pop_cache_job(self, user_id: int) -> None: + job_queue = self.application.job_queue + if job_queue is None: + raise RuntimeError + job_queue.run_once( + callback=partial(self._add_pop_cache_job, user_id=user_id), when=60, name=f"{user_id}|migrate_pop_cache" + ) + + def cancel_pop_cache_job(self, user_id: int) -> None: + job_queue = self.application.job_queue + if job_queue is None: + raise RuntimeError + if job := job_queue.get_jobs_by_name(f"{user_id}|migrate_pop_cache"): + job[0].schedule_removal() + + @handler.command(command="migrate_admin", block=False, admin=True) + async def migrate_admin_command(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"): + message = update.effective_message + args = self.get_args(context) + logger.info("管理员 %s[%s] migrate_admin 命令请求", message.from_user.full_name, message.from_user.id) + if (not args) or len(args) < 2: + await message.reply_text("参数错误,请指定新旧用户 id !") + return + try: + old_user_id, new_user_id = int(args[0]), int(args[1]) + except ValueError: + await message.reply_text("参数错误,请指定新旧用户 id !") + return + if old_user_id in self.cache_data: + await message.reply_text("该用户正在迁移数据中,请稍后再试!") + return + data = [] + players = await self.players_service.get_all_by_user_id(old_user_id) + for _, instance in self.application.managers.plugins_map.items(): + if _data := await instance.get_migrate_data(old_user_id, new_user_id, players): + data.append(_data) + if not data: + await message.reply_text("没有需要迁移的数据!") + return + text = "确定迁移以下数据?\n\n" + for d in data: + text += f"- {await d.migrate_data_msg()}\n" + self.cache_data[old_user_id] = data + buttons = [ + [ + InlineKeyboardButton( + "确定迁移", + callback_data=f"migrate_admin|{old_user_id}", + ) + ], + ] + await message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons)) + self.add_pop_cache_job(old_user_id) + + async def try_migrate_data(self, user_id: int) -> str: + text = [] + for d in self.cache_data[user_id]: + try: + logger.info("开始迁移数据 class[%s]", d.__class__.__name__) + await d.migrate_data() + logger.info("迁移数据成功 class[%s]", d.__class__.__name__) + except MigrateDataException as e: + text.append(e.msg) + except Exception as e: + logger.exception("迁移数据失败,未知错误! class[%s]", d.__class__.__name__, exc_info=e) + text.append("迁移部分数据出现未知错误,请联系管理员!") + if text: + return "- " + "\n- ".join(text) + + @handler.callback_query(pattern=r"^migrate_admin\|", block=False) + async def callback_query_migrate_admin(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + logger.info("管理员 %s[%s] migrate_admin callback 请求", user.full_name, user.id) + + async def get_migrate_admin_callback(callback_query_data: str) -> int: + _data = callback_query_data.split("|") + _old_user_id = int(_data[1]) + logger.debug("callback_query_data函数返回 old_user_id[%s]", _old_user_id) + return _old_user_id + + old_user_id = await get_migrate_admin_callback(callback_query.data) + if old_user_id not in self.cache_data: + await callback_query.answer("请求已过期,请重新发起请求!", show_alert=True) + self.add_delete_message_job(message, delay=5) + return + self.cancel_pop_cache_job(old_user_id) + await message.edit_text("正在迁移数据,请稍后...", reply_markup=None) + try: + text = await self.try_migrate_data(old_user_id) + finally: + await self._add_pop_cache_job(old_user_id) + if text: + await message.edit_text(f"迁移部分数据失败!\n\n{text}") + return + await message.edit_text("迁移数据成功!") diff --git a/plugins/genshin/pay_log.py b/plugins/genshin/pay_log.py index 6603dcc..c234940 100644 --- a/plugins/genshin/pay_log.py +++ b/plugins/genshin/pay_log.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, List from simnet import GenshinClient, Region from telegram import InlineKeyboardButton, InlineKeyboardMarkup @@ -14,6 +14,7 @@ from core.services.template.services import TemplateService from modules.gacha_log.helpers import from_url_get_authkey from modules.pay_log.error import PayLogNotFound, PayLogAccountNotFound, PayLogInvalidAuthkey, PayLogAuthkeyTimeout from modules.pay_log.log import PayLog +from modules.pay_log.migrate import PayLogMigrate from plugins.tools.genshin import PlayerNotFoundError, CookiesNotFoundError from plugins.tools.player_info import PlayerInfoSystem from utils.log import logger @@ -21,6 +22,7 @@ from utils.log import logger if TYPE_CHECKING: from telegram import Update, User from telegram.ext import ContextTypes + from gram_core.services.players.models import Player INPUT_URL, CONFIRM_DELETE = range(10100, 10102) @@ -96,9 +98,9 @@ class PayLogPlugin(Plugin.Conversation): await message.reply_text("该功能需要绑定 stoken 才能使用") return ConversationHandler.END else: - raise CookiesNotFoundError + raise CookiesNotFoundError(user.id) else: - raise CookiesNotFoundError + raise CookiesNotFoundError(user.id) text = "小派蒙正在从服务器获取数据,请稍后" reply = await message.reply_text(text) await message.reply_chat_action(ChatAction.TYPING) @@ -229,3 +231,9 @@ class PayLogPlugin(Plugin.Conversation): [InlineKeyboardButton("点我导入", url=create_deep_linked_url(context.bot.username, "pay_log_import"))] ] await message.reply_text("派蒙没有找到你的充值记录,快来点击按钮私聊派蒙导入吧~", reply_markup=InlineKeyboardMarkup(buttons)) + + @staticmethod + async def get_migrate_data( + old_user_id: int, new_user_id: int, old_players: List["Player"] + ) -> Optional[PayLogMigrate]: + return await PayLogMigrate.create(old_user_id, new_user_id, old_players) diff --git a/plugins/genshin/wish_log.py b/plugins/genshin/wish_log.py index 97f8fd3..fdf687e 100644 --- a/plugins/genshin/wish_log.py +++ b/plugins/genshin/wish_log.py @@ -1,5 +1,5 @@ from io import BytesIO -from typing import Optional, TYPE_CHECKING +from typing import Optional, TYPE_CHECKING, List from urllib.parse import urlencode from aiofiles import open as async_open @@ -29,6 +29,7 @@ from modules.gacha_log.error import ( ) from modules.gacha_log.helpers import from_url_get_authkey from modules.gacha_log.log import GachaLog +from modules.gacha_log.migrate import GachaLogMigrate from plugins.tools.genshin import PlayerNotFoundError from plugins.tools.player_info import PlayerInfoSystem from utils.log import logger @@ -43,6 +44,7 @@ except ImportError: if TYPE_CHECKING: from telegram import Update, Message, User, Document from telegram.ext import ContextTypes + from gram_core.services.players.models import Player INPUT_URL, INPUT_FILE, CONFIRM_DELETE = range(10100, 10103) @@ -447,3 +449,9 @@ class WishLogPlugin(Plugin.Conversation): [InlineKeyboardButton("点我导入", url=create_deep_linked_url(context.bot.username, "gacha_log_import"))] ] await message.reply_text("派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~", reply_markup=InlineKeyboardMarkup(buttons)) + + @staticmethod + async def get_migrate_data( + old_user_id: int, new_user_id: int, old_players: List["Player"] + ) -> Optional[GachaLogMigrate]: + return await GachaLogMigrate.create(old_user_id, new_user_id, old_players) diff --git a/plugins/tools/daily_note.py b/plugins/tools/daily_note.py index 48c27a6..dbf0f1d 100644 --- a/plugins/tools/daily_note.py +++ b/plugins/tools/daily_note.py @@ -11,12 +11,14 @@ from telegram.error import BadRequest, Forbidden from core.plugin import Plugin from core.services.task.models import Task as TaskUser, TaskStatusEnum from core.services.task.services import TaskResinServices, TaskRealmServices, TaskExpeditionServices +from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNotFoundError from utils.log import logger if TYPE_CHECKING: from simnet import GenshinClient from telegram.ext import ContextTypes + from gram_core.services.task.services import TaskServices class TaskDataBase(BaseModel): @@ -368,3 +370,65 @@ class DailyNoteSystem(Plugin): else: task_user_db.status = TaskStatusEnum.STATUS_SUCCESS await self.update_task_user(task_db) + + async def get_migrate_data(self, old_user_id: int, new_user_id: int, _) -> Optional["TaskMigrate"]: + return await TaskMigrate.create( + old_user_id, + new_user_id, + self.resin_service, + ) + + +class TaskMigrate(IMigrateData): + old_user_id: int + new_user_id: int + task_services: "TaskServices" + need_migrate_tasks: List[TaskUser] + + async def migrate_data_msg(self) -> str: + return f"定时任务数据 {len(self.need_migrate_tasks)} 条" + + async def migrate_data(self) -> bool: + id_list = [] + for task in self.need_migrate_tasks: + try: + await self.task_services.update(task) + except StaleDataError: + id_list.append(str(task.id)) + if id_list: + raise MigrateDataException(f"任务数据迁移失败:id {','.join(id_list)}") + return True + + @staticmethod + async def create_tasks( + old_user_id: int, + new_user_id: int, + task_services: "TaskServices", + ) -> List[TaskUser]: + need_migrate, _ = await TaskMigrate.filter_sql_data( + TaskUser, + task_services.get_all_by_user_id, + old_user_id, + new_user_id, + (TaskUser.user_id, TaskUser.type), + ) + for i in need_migrate: + i.user_id = new_user_id + return need_migrate + + @classmethod + async def create( + cls, + old_user_id: int, + new_user_id: int, + task_services: "TaskServices", + ) -> Optional["TaskMigrate"]: + need_migrate_tasks = await cls.create_tasks(old_user_id, new_user_id, task_services) + if not need_migrate_tasks: + return None + self = cls() + self.old_user_id = old_user_id + self.new_user_id = new_user_id + self.task_services = task_services + self.need_migrate_tasks = need_migrate_tasks + return self