mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-21 13:48:19 +00:00
413 lines
17 KiB
Python
413 lines
17 KiB
Python
import html
|
||
from http.cookies import SimpleCookie
|
||
from typing import Tuple, TYPE_CHECKING
|
||
|
||
from simnet import Region, StarRailClient
|
||
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
|
||
from telegram.ext import filters
|
||
|
||
from core.basemodel import RegionEnum
|
||
from core.plugin import Plugin, handler
|
||
from core.services.cookies import CookiesService
|
||
from core.services.players import PlayersService
|
||
from core.services.players.services import PlayerInfoService
|
||
from gram_core.services.cookies.models import CookiesStatusEnum
|
||
from gram_core.services.devices import DevicesService
|
||
from modules.apihelper.models.genshin.cookies import CookiesModel
|
||
from utils.log import logger
|
||
|
||
if TYPE_CHECKING:
|
||
from telegram import Update
|
||
from telegram.ext import ContextTypes
|
||
|
||
|
||
__all__ = ("PlayersManagesPlugin",)
|
||
|
||
|
||
class PlayersManagesPlugin(Plugin):
|
||
def __init__(
|
||
self,
|
||
players: PlayersService,
|
||
cookies: CookiesService,
|
||
player_info_service: PlayerInfoService,
|
||
devices_service: DevicesService,
|
||
):
|
||
self.cookies_service = cookies
|
||
self.players_service = players
|
||
self.player_info_service = player_info_service
|
||
self.devices_service = devices_service
|
||
|
||
@staticmethod
|
||
def players_manager_callback(callback_query_data: str) -> Tuple[str, int, int]:
|
||
_data = callback_query_data.split("|")
|
||
_handle = _data[-3]
|
||
_user_id = int(_data[-2])
|
||
_player_id = int(_data[-1])
|
||
logger.debug(
|
||
"players_manager_callback函数返回 handle[%s] user_id[%s] player_id[%s]", _handle, _user_id, _player_id
|
||
)
|
||
return _handle, _user_id, _player_id
|
||
|
||
@handler.command(command="player", filters=filters.ChatType.PRIVATE, block=False)
|
||
@handler.command(command="players", filters=filters.ChatType.PRIVATE, block=False)
|
||
@handler.callback_query(r"^players_manager\|list", block=False)
|
||
async def command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = update.effective_user
|
||
message = update.effective_message
|
||
players = await self.players_service.get_all_by_user_id(user.id)
|
||
if len(players) == 0:
|
||
if callback_query:
|
||
await callback_query.edit_message_text("未查询到您所绑定的账号信息,请先绑定账号")
|
||
else:
|
||
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号")
|
||
return
|
||
buttons = []
|
||
for player in players:
|
||
nickname = "未知"
|
||
player_info = await self.player_info_service.get(player)
|
||
if player_info:
|
||
nickname = player_info.nickname
|
||
text = f"{player.player_id} {nickname}"
|
||
buttons.append(
|
||
[
|
||
InlineKeyboardButton(
|
||
text,
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
]
|
||
)
|
||
if callback_query:
|
||
await callback_query.edit_message_text(
|
||
"从下面的列表中选择一个玩家", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
else:
|
||
await message.reply_text("从下面的列表中选择一个玩家", reply_markup=InlineKeyboardMarkup(buttons))
|
||
|
||
@handler.callback_query(r"^players_manager\|get\|", block=False)
|
||
async def get_player(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = callback_query.from_user
|
||
|
||
_, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
player_info = await self.player_info_service.get(player)
|
||
if player_info is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
|
||
return
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"设置为主账号",
|
||
callback_data=f"players_manager|main|{user.id}|{player.player_id}",
|
||
),
|
||
InlineKeyboardButton(
|
||
"删除账号",
|
||
callback_data=f"players_manager|del|{user.id}|{player.player_id}",
|
||
),
|
||
InlineKeyboardButton(
|
||
"更新账号信息",
|
||
callback_data=f"players_manager|update|{user.id}|{player.player_id}",
|
||
),
|
||
],
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回玩家列表",
|
||
callback_data="players_manager|list",
|
||
)
|
||
],
|
||
]
|
||
|
||
if player.account_id is not None:
|
||
cookies = await self.cookies_service.get(player.user_id, player.account_id, player.region)
|
||
if cookies is not None:
|
||
temp_buttons = [
|
||
InlineKeyboardButton(
|
||
"导出 Cookies",
|
||
callback_data=f"players_manager|export_cookies|{user.id}|{player.player_id}",
|
||
),
|
||
InlineKeyboardButton(
|
||
"刷新 Cookies",
|
||
callback_data=f"players_manager|refresh_cookies|{user.id}|{player.player_id}",
|
||
),
|
||
]
|
||
|
||
buttons.insert(-1, temp_buttons)
|
||
|
||
await callback_query.edit_message_text(
|
||
f"这里是 {player.player_id} {player_info.nickname}\n你想用这个账号做什么?",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
|
||
@handler.callback_query(r"^players_manager\|update\|", block=False)
|
||
async def update_user(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = callback_query.from_user
|
||
|
||
_, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
status = await self.player_info_service.update_from_mihomo(player)
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回",
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
]
|
||
|
||
if status:
|
||
await callback_query.edit_message_text(
|
||
f"更新玩家信息 {player.player_id} 成功", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
else:
|
||
await callback_query.edit_message_text(
|
||
f"更新玩家信息 {player.player_id} 更新失败 请稍后重试", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
|
||
@handler.callback_query(r"^players_manager\|refresh_cookies\|", block=False)
|
||
async def refresh_cookies(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = callback_query.from_user
|
||
|
||
_, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
player_info = await self.player_info_service.get(player)
|
||
if player_info is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
|
||
return
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回",
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
]
|
||
|
||
cookies_data = await self.cookies_service.get(player.user_id, player.account_id, player.region)
|
||
if cookies_data is None:
|
||
await callback_query.edit_message_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} cookies 未找到",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
|
||
cookies = CookiesModel(**cookies_data.data)
|
||
|
||
if cookies.stoken is not None:
|
||
try:
|
||
region = Region.CHINESE if player.region.value == 1 else Region.OVERSEAS
|
||
async with StarRailClient(cookies=cookies.to_dict(), region=region) as client:
|
||
cookies.cookie_token = await client.get_cookie_token_by_stoken()
|
||
logger.success("用户 %s[%s] 刷新 cookie_token 成功", user.full_name, user.id)
|
||
cookies.ltoken = await client.get_ltoken_by_stoken()
|
||
logger.success("用户 %s[%s] 刷新 ltoken 成功", user.full_name, user.id)
|
||
cookies.remove_v2()
|
||
except Exception as exc: # pylint: disable=W0703
|
||
logger.error("刷新 cookie_token 失败 [%s]", (str(exc)))
|
||
await callback_query.edit_message_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} cookies 刷新失败 可能 stoken 已经过期",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
return
|
||
cookies_data.data = cookies.to_dict()
|
||
cookies_data.status = CookiesStatusEnum.STATUS_SUCCESS
|
||
await self.cookies_service.update(cookies_data)
|
||
await callback_query.edit_message_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} cookies 刷新成功",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
else:
|
||
await callback_query.edit_message_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} stoken 未找到",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
|
||
@handler.callback_query(r"^players_manager\|export_cookies\|", block=False)
|
||
async def export_cookies(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
message = update.effective_message
|
||
user = callback_query.from_user
|
||
|
||
_, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
player_info = await self.player_info_service.get(player)
|
||
if player_info is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
|
||
return
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回",
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
]
|
||
|
||
cookies_data = await self.cookies_service.get(player.user_id, player.account_id, player.region)
|
||
if cookies_data is None:
|
||
await callback_query.edit_message_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} cookies 未找到",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
return
|
||
device = await self.devices_service.get(player.account_id)
|
||
|
||
cookies = SimpleCookie()
|
||
for key, value in cookies_data.data.items():
|
||
cookies[key] = value
|
||
if device is not None:
|
||
cookies["x-rpc-device_id"] = device.device_id
|
||
cookies["x-rpc-device_fp"] = device.device_fp
|
||
|
||
cookie_str = cookies.output(header="", sep=";")
|
||
|
||
await message.reply_html(
|
||
f"<pre>{html.escape(cookie_str)}</pre>",
|
||
)
|
||
await message.reply_text(
|
||
f"玩家 {player.player_id} {player_info.nickname} cookies 导出成功",
|
||
reply_markup=InlineKeyboardMarkup(buttons),
|
||
)
|
||
await message.delete()
|
||
|
||
@handler.callback_query(r"^players_manager\|main\|", block=False)
|
||
async def set_main(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = callback_query.from_user
|
||
|
||
_, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
player_info = await self.player_info_service.get(player)
|
||
if player_info is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
|
||
return
|
||
|
||
await self.players_service.set_player_to_main(user_id, player_id)
|
||
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回",
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
]
|
||
|
||
await callback_query.edit_message_text(
|
||
f"成功设置 {player.player_id} {player_info.nickname} 为主账号", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
|
||
@handler.callback_query(r"^players_manager\|del\|", block=False)
|
||
async def delete(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
|
||
callback_query = update.callback_query
|
||
user = callback_query.from_user
|
||
|
||
_handle, user_id, player_id = self.players_manager_callback(callback_query.data)
|
||
if user.id != user_id:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|
||
return
|
||
|
||
player = await self.players_service.get(user.id, player_id=player_id)
|
||
if player is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 未找到")
|
||
return
|
||
|
||
player_info = await self.player_info_service.get_form_sql(player)
|
||
if player_info is None:
|
||
await callback_query.edit_message_text(f"账号 {player_id} 信息未找到")
|
||
return
|
||
|
||
if _handle == "true":
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"« 返回玩家列表",
|
||
callback_data="players_manager|list",
|
||
)
|
||
],
|
||
]
|
||
await self.players_service.delete(player)
|
||
cookies = await self.cookies_service.get(player.user_id, player.account_id, player.region)
|
||
if cookies:
|
||
players = await self.players_service.get_all_by_account_id(cookies.account_id, cookies.region)
|
||
if len(players) == 0:
|
||
await self.cookies_service.delete(cookies)
|
||
player_info = await self.player_info_service.get_form_sql(player)
|
||
if player_info is not None:
|
||
await self.player_info_service.delete(player_info)
|
||
await callback_query.edit_message_text(
|
||
f"成功删除 {player.player_id} ", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
elif _handle == "del":
|
||
buttons = [
|
||
[
|
||
InlineKeyboardButton(
|
||
"是的我非常确定",
|
||
callback_data=f"players_manager|del|true|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
[
|
||
InlineKeyboardButton(
|
||
"取消操作",
|
||
callback_data=f"players_manager|get|{user.id}|{player.player_id}",
|
||
)
|
||
],
|
||
]
|
||
await callback_query.edit_message_text(
|
||
"请问你真的要从Bot中删除改账号吗?", reply_markup=InlineKeyboardMarkup(buttons)
|
||
)
|
||
else:
|
||
if callback_query.message:
|
||
await callback_query.message.delete()
|