Support cookies refresh and export

This commit is contained in:
洛水居室 2023-03-27 09:44:22 +08:00
parent 013fd2a989
commit f03f524a47
No known key found for this signature in database
GPG Key ID: C9DE87DA724B88FC

View File

@ -1,12 +1,17 @@
import html
from http.cookies import SimpleCookie
from typing import Tuple from typing import Tuple
from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import filters, ContextTypes from telegram.ext import filters, ContextTypes
from core.basemodel import RegionEnum
from core.plugin import Plugin, handler from core.plugin import Plugin, handler
from core.services.cookies import CookiesService from core.services.cookies import CookiesService
from core.services.players import PlayersService from core.services.players import PlayersService
from core.services.players.services import PlayerInfoService from core.services.players.services import PlayerInfoService
from modules.apihelper.client.components.authclient import AuthClient
from modules.apihelper.models.genshin.cookies import CookiesModel
from utils.log import logger from utils.log import logger
__all__ = ("PlayersManagesPlugin",) __all__ = ("PlayersManagesPlugin",)
@ -102,6 +107,25 @@ class PlayersManagesPlugin(Plugin):
], ],
] ]
if player.account_id is not None:
cookies = await self.cookies_service.get(player.user_id, player.account_id, player.region)
if cookies is not None:
temp_buttons = [
InlineKeyboardButton(
"导出 Cookies",
callback_data=f"players_manager|export_cookies|{user.id}|{player.player_id}",
)
]
if player.region == RegionEnum.HYPERION:
temp_buttons.append(
InlineKeyboardButton(
"刷新 Cookies",
callback_data=f"players_manager|refresh_cookies|{user.id}|{player.player_id}",
)
)
buttons.insert(-1, temp_buttons)
await callback_query.edit_message_text( await callback_query.edit_message_text(
f"这里是 {player.player_id} {player_info.nickname}\n你想用这个账号做什么?", reply_markup=InlineKeyboardMarkup(buttons) f"这里是 {player.player_id} {player_info.nickname}\n你想用这个账号做什么?", reply_markup=InlineKeyboardMarkup(buttons)
) )
@ -142,6 +166,120 @@ class PlayersManagesPlugin(Plugin):
f"更新玩家信息 {player.player_id} 更新失败 请稍后重试", reply_markup=InlineKeyboardMarkup(buttons) f"更新玩家信息 {player.player_id} 更新失败 请稍后重试", reply_markup=InlineKeyboardMarkup(buttons)
) )
@handler.callback_query(r"^players_manager\|refresh_cookies\|", block=False)
async def refresh_cookies(self, update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
callback_query = update.callback_query
user = callback_query.from_user
_, user_id, player_id = self.players_manager_callback(callback_query.data)
if user.id != user_id:
if callback_query.message:
await callback_query.message.delete()
return
player = await self.players_service.get(user.id, player_id=player_id)
if player is None:
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
return
player_info = await self.player_info_service.get(player)
if player_info is None:
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
return
buttons = [
[
InlineKeyboardButton(
"« 返回",
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
)
],
]
cookies_data = await self.cookies_service.get(player.user_id, player.account_id, player.region)
if cookies_data is None:
await callback_query.edit_message_text(
f"玩家 {player.player_id} {player_info.nickname} cookies 未找到", reply_markup=InlineKeyboardMarkup(buttons)
)
cookies = CookiesModel(**cookies_data.data)
if cookies.stoken is not None:
try:
auth_client = AuthClient(cookies=cookies)
if await auth_client.get_cookie_token_by_stoken():
logger.success("用户 %s[%s] 刷新 cookie_token 成功", user.full_name, user.id)
if await auth_client.get_ltoken_by_stoken():
logger.success("用户 %s[%s] 刷新 ltoken 成功", user.full_name, user.id)
auth_client.cookies.remove_v2()
except Exception as exc: # pylint: disable=W0703
logger.error("刷新 cookie_token 失败 [%s]", (str(exc)))
await callback_query.edit_message_text(
f"玩家 {player.player_id} {player_info.nickname} cookies 刷新失败 可能 stoken 已经过期",
reply_markup=InlineKeyboardMarkup(buttons),
)
return
cookies_data.data = cookies.json()
await self.cookies_service.update(cookies_data)
await callback_query.edit_message_text(
f"玩家 {player.player_id} {player_info.nickname} cookies 刷新成功", reply_markup=InlineKeyboardMarkup(buttons)
)
else:
await callback_query.edit_message_text(
f"玩家 {player.player_id} {player_info.nickname} stoken 未找到", reply_markup=InlineKeyboardMarkup(buttons)
)
@handler.callback_query(r"^players_manager\|export_cookies\|", block=False)
async def export_cookies(self, update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
callback_query = update.callback_query
message = update.effective_message
user = callback_query.from_user
_, user_id, player_id = self.players_manager_callback(callback_query.data)
if user.id != user_id:
if callback_query.message:
await callback_query.message.delete()
return
player = await self.players_service.get(user.id, player_id=player_id)
if player is None:
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
return
player_info = await self.player_info_service.get(player)
if player_info is None:
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
return
buttons = [
[
InlineKeyboardButton(
"« 返回",
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
)
],
]
cookies_data = await self.cookies_service.get(player.user_id, player.account_id, player.region)
if cookies_data is None:
await callback_query.edit_message_text(
f"玩家 {player.player_id} {player_info.nickname} cookies 未找到", reply_markup=InlineKeyboardMarkup(buttons)
)
cookies = SimpleCookie()
for key, value in cookies_data.data.items():
cookies[key] = value
cookie_str = cookies.output(header="", sep=";")
await message.reply_html(
f"<pre>{html.escape(cookie_str)}</pre>",
)
await message.reply_text(
f"玩家 {player.player_id} {player_info.nickname} cookies 导出成功", reply_markup=InlineKeyboardMarkup(buttons)
)
await message.delete()
@handler.callback_query(r"^players_manager\|main\|", block=False) @handler.callback_query(r"^players_manager\|main\|", block=False)
async def set_main(self, update: Update, _: ContextTypes.DEFAULT_TYPE) -> None: async def set_main(self, update: Update, _: ContextTypes.DEFAULT_TYPE) -> None:
callback_query = update.callback_query callback_query = update.callback_query