support cookies import and export

This commit is contained in:
omg-xtao 2023-12-19 00:51:36 +08:00 committed by GitHub
parent 78b8889281
commit 39abd0cd52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 275 additions and 97 deletions

@ -1 +1 @@
Subproject commit 4718860a87e5b64eb59966f7122716fd70ece8f0
Subproject commit c935844958321754f04260bd330400b54d01851b

View File

@ -110,6 +110,20 @@ class AccountCookiesPlugin(Plugin.Conversation):
data.device_name = headers.get("x-rpc-device_name")
return data
async def _parse_args(self, update: Update, context: CallbackContext) -> Optional[int]:
args = self.get_args(context)
account_cookies_plugin_data: AccountCookiesPluginData = context.chat_data.get("account_cookies_plugin_data")
if len(args) < 2:
return None
regions = {"米游社": RegionEnum.HYPERION, "HoYoLab": RegionEnum.HOYOLAB}
if args[0] not in regions:
return None
cookies = " ".join(args[1:])
account_cookies_plugin_data.region = regions[args[0]]
if ret := await self.parse_cookies(update, context, cookies):
return ret
return await self.check_cookies(update, context)
@conversation.entry_point
@handler.command(command="setcookie", filters=filters.ChatType.PRIVATE, block=False)
@handler.command(command="setcookies", filters=filters.ChatType.PRIVATE, block=False)
@ -124,6 +138,9 @@ class AccountCookiesPlugin(Plugin.Conversation):
else:
account_cookies_plugin_data.reset()
if ret := await self._parse_args(update, context):
return ret
text = f'你好 {user.mention_markdown_v2()} {escape_markdown("!请选择要绑定的服务器!或回复退出取消操作")}'
reply_keyboard = [["米游社", "HoYoLab"], ["退出"]]
await message.reply_markdown_v2(text, reply_markup=ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True))
@ -155,15 +172,21 @@ class AccountCookiesPlugin(Plugin.Conversation):
@handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
async def input_cookies(self, update: Update, context: CallbackContext) -> int:
message = update.effective_message
user = update.effective_user
account_cookies_plugin_data: AccountCookiesPluginData = context.chat_data.get("account_cookies_plugin_data")
if message.text == "退出":
await message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
if ret := await self.parse_cookies(update, context, message.text):
return ret
return await self.check_cookies(update, context)
async def parse_cookies(self, update: Update, context: CallbackContext, text: str) -> Optional[int]:
user = update.effective_user
message = update.effective_message
account_cookies_plugin_data: AccountCookiesPluginData = context.chat_data.get("account_cookies_plugin_data")
try:
# cookie str to dict
wrapped = (
ArkoWrapper(message.text.split(";"))
ArkoWrapper(text.split(";"))
.filter(lambda x: x != "")
.map(lambda x: x.strip())
.map(lambda x: ((y := x.split("=", 1))[0], y[1]))
@ -186,7 +209,6 @@ class AccountCookiesPlugin(Plugin.Conversation):
await message.reply_text("Cookies格式有误请检查后重新尝试绑定", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
account_cookies_plugin_data.cookies = cookies
return await self.check_cookies(update, context)
async def check_cookies(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user

View File

@ -0,0 +1,201 @@
from http.cookies import SimpleCookie
from typing import List, TYPE_CHECKING
from uuid import uuid4
from pydantic import BaseModel
from telegram import (
InlineKeyboardButton,
SwitchInlineQueryChosenChat,
InlineKeyboardMarkup,
InlineQueryResultArticle,
InputTextMessageContent,
InlineQueryResultsButton,
)
from telegram.error import BadRequest
from telegram.helpers import create_deep_linked_url
from gram_core.basemodel import RegionEnum
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin import Plugin, handler
from gram_core.services.cookies import CookiesService
from gram_core.services.devices import DevicesService
from utils.log import logger
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
if TYPE_CHECKING:
from telegram import Update, InlineQuery
from telegram.ext import ContextTypes
class InlineCookies(BaseModel):
account_id: int
region: RegionEnum
data: str
class CookiesExport(Plugin):
def __init__(
self,
redis: RedisDB,
cookies_service: CookiesService,
devices_service: DevicesService,
):
self.qname = "plugin:cookies_export:"
self.ex = 5 * 60
self.client = redis.client
self.cookies_service = cookies_service
self.devices_service = devices_service
def get_cache_key(self, user_id: int) -> str:
return f"{self.qname}{user_id}"
async def set_cache(self, user_id: int, data: List[InlineCookies]) -> None:
if not data:
return
new_data = jsonlib.dumps([i.dict() for i in data])
await self.client.set(self.get_cache_key(user_id), new_data, ex=self.ex)
async def get_cache(self, user_id: int) -> List[InlineCookies]:
_data = await self.client.get(self.get_cache_key(user_id))
if _data is None:
return []
data_str = _data.decode("utf-8")
data = jsonlib.loads(data_str)
return [InlineCookies(**i) for i in data]
async def get_all_cookies(self, user_id: int) -> List[InlineCookies]:
cookies = await self.cookies_service.get_all(user_id)
if not cookies:
return []
cookies_list = []
for cookie in cookies:
if cookie.region not in [RegionEnum.HYPERION, RegionEnum.HOYOLAB]:
continue
cookies = SimpleCookie()
for key, value in cookie.data.items():
cookies[key] = value
device = None
if cookie.region == RegionEnum.HYPERION:
device = await self.devices_service.get(cookie.account_id)
if device is not None:
cookies["x-rpc-device_id"] = device.device_id
cookies["x-rpc-device_fp"] = device.device_fp
cookie_str = cookies.output(header="", sep=";")
cookies_list.append(
InlineCookies(
account_id=cookie.account_id,
region=cookie.region,
data=cookie_str,
)
)
await self.set_cache(user_id, cookies_list)
return cookies_list
@handler.command("cookies_export", block=False)
async def cookies_export(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
message = update.effective_message
user = update.effective_user
logger.info("用户 %s[%s] cookies_export 命令请求", message.from_user.full_name, message.from_user.id)
data = await self.get_all_cookies(user.id)
if not data:
await message.reply_text("没有查询到任何账号信息")
return
text = "请点击下方按钮导出账号信息到指定 BOT"
buttons = [
[
InlineKeyboardButton(
"选择需要导入账号的 BOT",
switch_inline_query_chosen_chat=SwitchInlineQueryChosenChat(
query="cookies_export", allow_bot_chats=True
),
)
]
]
await message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons))
def gen_cookies_import_buttons(self):
official_bots = config.bot_official.copy()
lower_official_bots = [i.lower() for i in official_bots]
bot_username_lower = self.application.bot.username.lower()
if bot_username_lower in lower_official_bots:
official_bots.pop(lower_official_bots.index(bot_username_lower))
return [
[
InlineKeyboardButton(
text=name,
url=create_deep_linked_url(name, "cookies_export"),
)
]
for name in official_bots
]
@handler.command("cookies_import", block=False)
async def cookies_import(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
message = update.effective_message
user = update.effective_user
logger.info("用户 %s[%s] cookies_import 命令请求", user.full_name, user.id)
text = "请点击下方按钮选择您已经绑定了账号的 BOT"
buttons = self.gen_cookies_import_buttons()
if not buttons:
await message.reply_text("没有可用的BOT")
return
await message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons))
@handler.inline_query(pattern="^cookies_export$", block=False)
async def inline_query(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user = update.effective_user
ilq: "InlineQuery" = update.inline_query
cache_data = await self.get_cache(user.id)
results_list = []
if not cache_data:
results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title="无法导出 Cookies",
description="请先使用命令 /cookies_export",
input_message_content=InputTextMessageContent("/cookies_export"),
)
)
else:
name_map = {RegionEnum.HYPERION: "米游社", RegionEnum.HOYOLAB: "HoYoLab"}
for cookie in cache_data:
region = name_map[cookie.region]
results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title=f"{region} - {cookie.account_id}",
description=f"导出账号ID {cookie.account_id} 的 Cookies",
input_message_content=InputTextMessageContent(f"/setcookies {region} {cookie.data}"),
)
)
try:
await ilq.answer(
results=results_list,
cache_time=0,
auto_pagination=True,
button=InlineQueryResultsButton(
text="!!导出到不信任对话将有盗号风险!!",
start_parameter="cookies_export",
),
)
except BadRequest as exc:
if "Query is too old" in exc.message: # 过时请求全部忽略
logger.warning("用户 %s[%s] inline_query 请求过时", user.full_name, user.id)
return
if "can't parse entities" not in exc.message:
raise exc
logger.warning("inline_query发生BadRequest错误", exc_info=exc)
await ilq.answer(
results=[],
button=InlineQueryResultsButton(
text="糟糕,发生错误了。",
start_parameter="inline_message",
),
)

View File

@ -69,6 +69,8 @@ class SetCommandPlugin(Plugin):
BotCommand("avatars", "查询角色练度"),
BotCommand("reg_time", "账号注册时间"),
BotCommand("daily_material", "今日素材表"),
BotCommand("cookies_import", "从其他 BOT 导入账号信息"),
BotCommand("cookies_export", "导出账号信息给其他 BOT"),
]
admin_command = [
BotCommand("add_admin", "添加管理员"),

View File

@ -98,6 +98,8 @@ class Inline(Plugin):
input_message_content=InputTextMessageContent("角色攻略查询"),
)
)
elif args[0] == "cookies_export":
return
else:
if args[0] == "查看武器列表并查询":
for weapon in self.weapons_list:

View File

@ -3,10 +3,7 @@ from typing import TYPE_CHECKING
from simnet import Region
from simnet.errors import RegionNotSupported
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.constants import ParseMode
from telegram.ext import filters, MessageHandler, CommandHandler
from telegram.helpers import create_deep_linked_url
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
@ -142,7 +139,6 @@ class BirthdayPlugin(Plugin):
self.add_delete_message_job(message, delay=30)
return
logger.info("用户 %s[%s] 领取生日画片命令请求", user.full_name, user.id)
try:
async with self.helper.genshin(user.id) as client:
try:
text = await self.card_system.start_get_card(client)
@ -156,20 +152,3 @@ class BirthdayPlugin(Plugin):
if filters.ChatType.GROUPS.filter(reply_message):
self.add_delete_message_job(message)
self.add_delete_message_job(reply_message)
except (CookiesNotFoundError, PlayerNotFoundError):
buttons = [[InlineKeyboardButton("点我绑定账号", url=create_deep_linked_url(context.bot.username, "set_cookie"))]]
if filters.ChatType.GROUPS.filter(message):
reply_msg = await message.reply_text(
"此功能需要绑定<code>cookie</code>后使用,请先私聊派蒙绑定账号",
reply_markup=InlineKeyboardMarkup(buttons),
parse_mode=ParseMode.HTML,
)
self.add_delete_message_job(reply_msg, delay=30)
self.add_delete_message_job(message, delay=30)
else:
await message.reply_text(
"此功能需要绑定<code>cookie</code>后使用,请先私聊派蒙进行绑定",
parse_mode=ParseMode.HTML,
reply_markup=InlineKeyboardMarkup(buttons),
)
return

View File

@ -4,7 +4,6 @@ from typing import Optional, TYPE_CHECKING, List, Union, Dict, Tuple
from enkanetwork import EnkaNetworkResponse
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import filters
from telegram.helpers import create_deep_linked_url
from core.config import config
from core.dependence.assets import AssetsService
@ -20,6 +19,7 @@ from plugins.genshin.gcsim.renderer import GCSimResultRenderer
from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit, GCSimQueueFull, GCSimResult
from plugins.genshin.model.base import CharacterInfo
from plugins.genshin.model.converters.enka import EnkaConverter
from plugins.tools.genshin import PlayerNotFoundError
from utils.log import logger
if TYPE_CHECKING:
@ -29,18 +29,6 @@ if TYPE_CHECKING:
__all__ = ("GCSimPlugin",)
async def _no_account_return(message: "Message", context: "ContextTypes.DEFAULT_TYPE"):
buttons = [
[
InlineKeyboardButton(
"点我绑定账号",
url=create_deep_linked_url(context.bot.username, "set_uid"),
)
]
]
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons))
async def _no_character_return(user_id: int, uid: int, message: "Message"):
photo = open("resources/img/kitsune.png", "rb")
buttons = [
@ -183,7 +171,7 @@ class GCSimPlugin(Plugin):
uid, names = await self._get_uid_names(user.id, args, message.reply_to_message)
logger.info("用户 %s[%s] 发出 gcsim 命令 UID[%s] NAMES[%s]", user.full_name, user.id, uid, " ".join(names))
if uid is None:
return await _no_account_return(message, context)
raise PlayerNotFoundError(user.id)
character_infos = await self._load_characters(uid)
if not character_infos:

View File

@ -22,7 +22,6 @@ from pydantic import BaseModel
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.ext import filters
from telegram.helpers import create_deep_linked_url
from core.config import config
from core.dependence.assets import DEFAULT_EnkaAssets, AssetsService
@ -35,6 +34,7 @@ from modules.apihelper.client.components.remote import Remote
from modules.gcsim.file import PlayerGCSimScripts
from modules.playercards.file import PlayerCardsFile
from modules.playercards.helpers import ArtifactStatsTheory
from plugins.tools.genshin import PlayerNotFoundError
from utils.enkanetwork import RedisCache, EnkaNetworkAPI
from utils.helpers import download_resource
from utils.log import logger
@ -168,25 +168,7 @@ class PlayerCards(Plugin):
await message.reply_chat_action(ChatAction.TYPING)
uid, character_name = await self.get_uid_and_ch(user.id, args, message.reply_to_message)
if uid is None:
buttons = [
[
InlineKeyboardButton(
"点我绑定账号",
url=create_deep_linked_url(context.bot.username, "set_uid"),
)
]
]
if filters.ChatType.GROUPS.filter(message):
reply_message = await message.reply_text(
"未查询到您所绑定的账号信息,请先私聊派蒙绑定账号",
reply_markup=InlineKeyboardMarkup(buttons),
)
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
else:
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons))
return
raise PlayerNotFoundError(user.id)
original_data = await self._load_history(uid)
if original_data is None or len(original_data["avatarInfoList"]) == 0:
if isinstance(self.kitsune, str):

View File

@ -4,16 +4,13 @@ from typing import TYPE_CHECKING
from simnet.client.routes import InternationalRoute
from simnet.errors import BadRequest as SIMNetBadRequest
from simnet.utils.player import recognize_genshin_server, recognize_genshin_game_biz
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ParseMode
from telegram.ext import filters
from telegram.helpers import create_deep_linked_url
from core.dependence.redisdb import RedisDB
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
from core.services.users.services import UserService
from plugins.tools.genshin import PlayerNotFoundError, CookiesNotFoundError, GenshinHelper
from plugins.tools.genshin import GenshinHelper
from utils.log import logger
if TYPE_CHECKING:
@ -72,7 +69,7 @@ class RegTimePlugin(Plugin):
@handler.command("reg_time", block=False)
@handler.message(filters.Regex(r"^原神账号注册时间$"), block=False)
async def reg_time(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
async def reg_time(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
message = update.effective_message
user = update.effective_user
logger.info("用户 %s[%s] 原神注册时间命令请求", user.full_name, user.id)
@ -81,22 +78,6 @@ class RegTimePlugin(Plugin):
game_uid = client.player_id
reg_time = await self.get_reg_time_from_cache(client)
await message.reply_text(f"你的原神账号 [{game_uid}] 注册时间为:{reg_time}")
except (PlayerNotFoundError, CookiesNotFoundError):
buttons = [[InlineKeyboardButton("点我绑定账号", url=create_deep_linked_url(context.bot.username, "set_cookie"))]]
if filters.ChatType.GROUPS.filter(message):
reply_msg = await message.reply_text(
"此功能需要绑定<code>cookie</code>后使用,请先私聊派蒙绑定账号",
reply_markup=InlineKeyboardMarkup(buttons),
parse_mode=ParseMode.HTML,
)
self.add_delete_message_job(reply_msg, delay=30)
self.add_delete_message_job(message, delay=30)
else:
await message.reply_text(
"此功能需要绑定<code>cookie</code>后使用,请先私聊派蒙进行绑定",
parse_mode=ParseMode.HTML,
reply_markup=InlineKeyboardMarkup(buttons),
)
except SIMNetBadRequest as exc:
if exc.ret_code == -501101:
await message.reply_text("当前角色冒险等阶未达到10级暂时无法获取信息")

View File

@ -67,9 +67,17 @@ class ErrorHandler(Plugin):
if update.inline_query is not None: # 忽略 inline_query
return None
_import_button = InlineKeyboardButton(
"从其他 BOT 导入", url=create_deep_linked_url(context.bot.username, "cookies_import")
)
if "重新绑定" in content:
buttons = InlineKeyboardMarkup(
[[InlineKeyboardButton("点我重新绑定", url=create_deep_linked_url(context.bot.username, "set_cookie"))]]
[
[
InlineKeyboardButton("点我重新绑定", url=create_deep_linked_url(context.bot.username, "set_cookie")),
_import_button,
],
]
)
elif "通过验证" in content:
buttons = InlineKeyboardMarkup(
@ -87,8 +95,9 @@ class ErrorHandler(Plugin):
[
InlineKeyboardButton(
"点我绑定账号", url=create_deep_linked_url(self.application.bot.username, "set_cookie")
)
]
),
_import_button,
],
]
)
else:

View File

@ -7,6 +7,9 @@ from typing import TYPE_CHECKING
import psutil
from telegram import __version__
from git import Repo
from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from core.plugin import Plugin, handler
from utils.log import logger
@ -20,6 +23,14 @@ class Status(Plugin):
self.pid = os.getpid()
self.time_form = "%m/%d %H:%M"
@staticmethod
def get_git_hash() -> str:
try:
repo = Repo()
except (InvalidGitRepositoryError, NoSuchPathError, GitCommandError):
return "非 Git 仓库"
return repo.head.commit.hexsha[:8]
@handler.command(command="status", block=False, admin=True)
async def send_log(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user
@ -47,6 +58,7 @@ class Status(Plugin):
"PaiGram 运行状态\n"
f"Python 版本: `{python_version()}` \n"
f"Telegram 版本: `{__version__}` \n"
f"GramBot 版本: `{self.get_git_hash()}` \n"
f"CPU使用率: `{cpu_percent}%/{process_cpu_use}%` \n"
f"当前使用的内存: `{memory_text}` \n"
f"运行时间: `{self.get_bot_uptime(start_time)}` \n"