import contextlib import html import os.path from datetime import datetime from typing import Tuple from telegram import Update, Chat, ChatMember, ChatMemberOwner, ChatMemberAdministrator from telegram.error import BadRequest, Forbidden from telegram.ext import CommandHandler, CallbackContext from core.cookies import CookiesService from core.cookies.error import CookiesNotFoundError from core.plugin import Plugin, handler from core.sign import SignServices from core.user import UserService from core.user.error import UserNotFoundError from core.user.models import User from modules.gacha_log.log import GachaLog from modules.pay_log.log import PayLog from modules.playercards.file import PlayerCardsFile from utils.bot import get_args, get_chat as get_chat_with_cache from utils.decorators.admins import bot_admins_rights_check from utils.helpers import get_genshin_client from utils.log import logger from utils.models.base import RegionEnum class GetChat(Plugin): def __init__( self, user_service: UserService = None, cookies_service: CookiesService = None, sign_service: SignServices = None, ): self.cookies_service = cookies_service self.user_service = user_service self.sign_service = sign_service self.gacha_log = GachaLog() self.pay_log = PayLog() self.player_cards_file = PlayerCardsFile() async def parse_group_chat(self, chat: Chat, admins: Tuple[ChatMember]) -> str: text = f"群 ID:{chat.id}\n群名称:{chat.title}\n" if chat.username: text += f"群用户名:@{chat.username}\n" sign_info = await self.sign_service.get_by_chat_id(chat.id) if sign_info: text += f"自动签到推送人数:{len(sign_info)}\n" if chat.description: text += f"群简介:{html.escape(chat.description)}\n" if admins: for admin in admins: text += f'{html.escape(admin.user.full_name)} ' if isinstance(admin, ChatMemberAdministrator): text += "C" if admin.can_change_info else "_" text += "D" if admin.can_delete_messages else "_" text += "R" if admin.can_restrict_members else "_" text += "I" if admin.can_invite_users else "_" text += "T" if admin.can_manage_topics else "_" text += "P" if admin.can_pin_messages else "_" text += "V" if admin.can_manage_video_chats else "_" text += "N" if admin.can_promote_members else "_" text += "A" if admin.is_anonymous else "_" elif isinstance(admin, ChatMemberOwner): text += "创建者" text += "\n" return text @staticmethod async def parse_private_bind(user_info: User, chat_id: int) -> Tuple[str, int]: if user_info.region == RegionEnum.HYPERION: text = "米游社绑定:" uid = user_info.yuanshen_uid else: text = "原神绑定:" uid = user_info.genshin_uid temp = "Cookie 绑定" try: await get_genshin_client(chat_id) except CookiesNotFoundError: temp = "UID 绑定" return f"{text}{temp}\n游戏 ID:{uid}", uid async def parse_private_sign(self, chat_id: int) -> str: sign_info = await self.sign_service.get_by_user_id(chat_id) if sign_info is not None: text = ( f"\n自动签到:已开启" f"\n推送会话:{sign_info.chat_id}" f"\n开启时间:{sign_info.time_created}" f"\n更新时间:{sign_info.time_updated}" f"\n签到状态:{sign_info.status.name}" ) else: text = "\n自动签到:未开启" return text async def parse_private_gacha_log(self, chat_id: int, uid: int) -> str: gacha_log, status = await self.gacha_log.load_history_info(str(chat_id), str(uid)) if status: text = "\n抽卡记录:" for key, value in gacha_log.item_list.items(): text += f"\n - {key}:{len(value)} 条" text += f"\n - 最后更新:{gacha_log.update_time.strftime('%Y-%m-%d %H:%M:%S')}" else: text = "\n抽卡记录:未导入" return text async def parse_private_pay_log(self, chat_id: int, uid: int) -> str: pay_log, status = await self.pay_log.load_history_info(str(chat_id), str(uid)) return ( f"\n充值记录:\n - 已导入 {len(pay_log.list)} 条\n - 最后更新:{pay_log.info.export_time}" if status else "\n充值记录:未导入" ) @staticmethod def get_file_modify_time(path: str) -> datetime: return datetime.fromtimestamp(os.path.getmtime(path)) async def parse_private_player_cards_file(self, uid: int) -> str: player_cards = await self.player_cards_file.load_history_info(uid) if player_cards is None: text = "\n角色卡片:未缓存" else: time = self.get_file_modify_time(self.player_cards_file.get_file_path(uid)) text = ( f"\n角色卡片:" f"\n - 已缓存 {len(player_cards.get('avatarInfoList', []))} 个角色" f"\n - 最后更新:{time.strftime('%Y-%m-%d %H:%M:%S')}" ) return text async def parse_private_chat(self, chat: Chat) -> str: text = ( f'MENTION\n' f"用户 ID:{chat.id}\n" f"用户名称:{chat.full_name}\n" ) if chat.username: text += f"用户名:@{chat.username}\n" try: user_info = await self.user_service.get_user_by_id(chat.id) except UserNotFoundError: user_info = None if user_info is not None: temp, uid = await self.parse_private_bind(user_info, chat.id) text += temp text += await self.parse_private_sign(chat.id) with contextlib.suppress(Exception): text += await self.parse_private_gacha_log(chat.id, uid) with contextlib.suppress(Exception): text += await self.parse_private_pay_log(chat.id, uid) with contextlib.suppress(Exception): text += await self.parse_private_player_cards_file(uid) return text @handler(CommandHandler, command="get_chat", block=False) @bot_admins_rights_check async def get_chat(self, update: Update, context: CallbackContext): user = update.effective_user logger.info("用户 %s[%s] get_chat 命令请求", user.full_name, user.id) message = update.effective_message args = get_args(context) if not args: await message.reply_text("参数错误,请指定群 id !") return try: chat_id = int(args[0]) except ValueError: await message.reply_text("参数错误,请指定群 id !") return try: chat = await get_chat_with_cache(args[0]) if chat_id < 0: admins = await chat.get_administrators() if chat_id < 0 else None text = await self.parse_group_chat(chat, admins) else: text = await self.parse_private_chat(chat) await message.reply_text(text, parse_mode="HTML") except (BadRequest, Forbidden) as exc: await message.reply_text(f"通过 id 获取会话信息失败,API 返回:{exc.message}")