diff --git a/plugins/gacha/__init__.py b/plugins/gacha/__init__.py new file mode 100644 index 00000000..997efa65 --- /dev/null +++ b/plugins/gacha/__init__.py @@ -0,0 +1,7 @@ +from utils.plugins.manager import listener_plugins_class +from .gacha import Gacha + + +@listener_plugins_class() +class GachaPlugins(Gacha): + pass diff --git a/plugins/gacha/gacha.py b/plugins/gacha/gacha.py new file mode 100644 index 00000000..baa8bb26 --- /dev/null +++ b/plugins/gacha/gacha.py @@ -0,0 +1,124 @@ +import os + +from pyppeteer import launch +from telegram import Update +from telegram.constants import ChatAction +from telegram.ext import filters, CommandHandler, MessageHandler, CallbackContext + +from app.template import TemplateService +from logger import Log +from model.apihelper.gacha import GachaInfo +from plugins.base import BasePlugins +from plugins.gacha.wish import WishCountInfo, get_one +from utils.app.inject import inject +from utils.bot import get_all_args +from utils.decorators.error import error_callable +from utils.decorators.restricts import restricts +from utils.plugins.manager import listener_plugins_class + + +@listener_plugins_class() +class Gacha(BasePlugins): + """抽卡模拟器(非首模拟器/减寿模拟器)""" + + CHECK_SERVER, COMMAND_RESULT = range(10600, 10602) + + @classmethod + def create_handlers(cls) -> list: + gacha = cls() + return [ + CommandHandler("gacha", gacha.command_start, block=False), + MessageHandler(filters.Regex("^抽卡模拟器(.*)"), gacha.command_start, block=False), + MessageHandler(filters.Regex("^非首模拟器(.*)"), gacha.command_start, block=False), + ] + + @inject + def __init__(self, template_service: TemplateService = None): + self.gacha = GachaInfo() + self.template_service = template_service + self.browser: launch = None + self.current_dir = os.getcwd() + self.resources_dir = os.path.join(self.current_dir, "resources") + self.character_gacha_card = {} + self.user_time = {} + + async def gacha_info(self, gacha_name: str = "角色活动", default: bool = False): + gacha_list_info = await self.gacha.get_gacha_list_info() + gacha_id = "" + for gacha in gacha_list_info.data["list"]: + if gacha["gacha_name"] == gacha_name: + gacha_id = gacha["gacha_id"] + if gacha_id == "": + if default and len(gacha_list_info.data["list"]) > 0: + gacha_id = gacha_list_info.data["list"][0]["gacha_id"] + else: + return {} + gacha_info = await self.gacha.get_gacha_info(gacha_id) + gacha_info["gacha_id"] = gacha_id + return gacha_info + + @restricts(filters.ChatType.GROUPS, restricts_time=20, try_delete_message=True) + @restricts(filters.ChatType.PRIVATE) + @error_callable + async def command_start(self, update: Update, context: CallbackContext) -> None: + message = update.message + user = update.effective_user + args = get_all_args(context) + gacha_name = "角色活动" + if len(args) >= 1: + gacha_name = args[0] + if gacha_name not in ("角色活动-2", "武器活动", "常驻", "角色活动"): + for key, value in {"2": "角色活动-2", "武器": "武器活动", "普通": "常驻"}.items(): + if key == gacha_name: + gacha_name = value + break + gacha_info = await self.gacha_info(gacha_name) + if gacha_info.get("gacha_id") is None: + await message.reply_text(f"没有找到名为 {gacha_name} 的卡池") + return + else: + gacha_info = await self.gacha_info(default=True) + Log.info(f"用户 {user.full_name}[{user.id}] 抽卡模拟器命令请求 || 参数 {gacha_name}") + # 用户数据储存和处理 + gacha_id: str = gacha_info["gacha_id"] + user_gacha: dict[str, WishCountInfo] = context.user_data.get("gacha") + if user_gacha is None: + user_gacha = context.user_data["gacha"] = {} + user_gacha_count: WishCountInfo = user_gacha.get(gacha_id) + if user_gacha_count is None: + user_gacha_count = user_gacha[gacha_id] = WishCountInfo(user_id=user.id) + # 用户数据储存和处理 + await message.reply_chat_action(ChatAction.TYPING) + data = { + "_res_path": f"file://{self.resources_dir}", + "name": f"{user.full_name}", + "info": gacha_name, + "poolName": gacha_info["title"], + "items": [], + } + for _ in range(10): + item = get_one(user_gacha_count, gacha_info) + # 下面为忽略的代码,因为metadata未完善,具体武器和角色类型无法显示 + # item_name = item["item_name"] + # item_type = item["item_type"] + # if item_type == "角色": + # gacha_card = self.character_gacha_card.get(item_name) + # if gacha_card is None: + # await message.reply_text(f"获取角色 {item_name} GachaCard信息失败") + # return + # item["item_character_img"] = await url_to_file(gacha_card) + data["items"].append(item) + + def take_rang(elem: dict): + return elem["rank"] + + data["items"].sort(key=take_rang, reverse=True) + await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + # 因为 gacha_info["title"] 返回的是 HTML 标签 尝试关闭自动转义 + png_data = await self.template_service.render('genshin/gacha', "gacha.html", data, + {"width": 1157, "height": 603}, False, False) + + reply_message = await message.reply_photo(png_data) + if filters.ChatType.GROUPS.filter(message): + self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id, 300) + self._add_delete_message_job(context, message.chat_id, message.message_id, 300) diff --git a/plugins/gacha/wish.py b/plugins/gacha/wish.py new file mode 100644 index 00000000..c719bcb7 --- /dev/null +++ b/plugins/gacha/wish.py @@ -0,0 +1,131 @@ +import random +from enum import Enum + + +class GachaType(Enum): + activity = 301 # 限定卡池 + activity2 = 400 # 限定卡池 + weapon = 302 # 武器卡池 + permanent = 200 # 常驻卡池 + + +class WishCountInfo: + def __init__(self, user_id: int): + self.user_id = user_id + self.five_stars_count: int = 1 + self.four_stars_count: int = 1 + self.is_up: bool = False + self.maximum_fate_points: int = 0 + + +def character_probability(rank, count): + ret = 0 + if rank == 5 and count <= 73: + ret = 60 + elif rank == 5 and count >= 74: + ret = 60 + 600 * (count - 73) + elif rank == 4 and count <= 8: + ret = 510 + elif rank == 4 and count >= 9: + ret = 510 + 5100 * (count - 8) + return ret + + +def weapon_probability(rank, count): + ret = 0 + if rank == 5 and count <= 62: + ret = 70 + elif rank == 5 and count <= 73: + ret = 70 + 700 * (count - 62) + elif rank == 5 and count >= 74: + ret = 7770 + 350 * (count - 73) + elif rank == 4 and count <= 7: + ret = 600 + elif rank == 4 and count == 8: + ret = 6600 + elif rank == 4 and count >= 9: + ret = 6600 + 3000 * (count - 8) + return ret + + +def is_character_gacha(gacha_type: GachaType) -> bool: + return gacha_type in (GachaType.activity, GachaType.activity2, GachaType.permanent) + + +def random_int(): + return random.randint(0, 10000) + + +def get_is_up(rank: int, count: WishCountInfo, gacha_type: GachaType): + if gacha_type == GachaType.permanent: + return False + elif gacha_type == GachaType.weapon: + return random_int() <= 7500 + else: + return random_int() <= 5000 or (rank == 5 and count.is_up) + + +def get_rank(count: WishCountInfo, gacha_type: GachaType): + value = random_int() + probability_fn = character_probability if is_character_gacha(gacha_type) else weapon_probability + index_5 = probability_fn(5, count.five_stars_count) + index_4 = probability_fn(4, count.four_stars_count) + index_5 + if value <= index_5: + return 5 + elif value <= index_4: + return 4 + else: + return 3 + + +def get_one(count: WishCountInfo, gacha_info: dict, weapon_name: str = "") -> dict: + gacha_type = GachaType(gacha_info["gacha_type"]) + rank = get_rank(count, gacha_type) + is_up = get_is_up(rank, count, gacha_type) + if rank == 5: + count.five_stars_count = 1 + if is_up: + data = random.choice(gacha_info["r5_up_items"]) + else: + data = random.choice(gacha_info["r5_prob_list"]) + if gacha_type == GachaType.weapon: + if data["item_name"] == weapon_name: + count.maximum_fate_points = 0 + elif count.maximum_fate_points == 2: + count.maximum_fate_points = 0 + for temp_item in gacha_info["r5_up_items"]: + if temp_item["item_name"] == weapon_name: + data = temp_item + break + else: + count.maximum_fate_points += 1 + if gacha_type in (GachaType.activity, GachaType.activity2, GachaType.weapon): + count.is_up = not is_up + return { + "item_type": data["item_type"], + "item_name": data["item_name"], + "rank": 5, + } + elif rank == 4: + count.five_stars_count += 1 + count.four_stars_count = 1 + if is_up: + data = random.choice(gacha_info["r4_up_items"]) + else: + data = random.choice(gacha_info["r4_prob_list"]) + return { + "item_type": data["item_type"], + "item_name": data["item_name"], + "rank": 4, + } + elif rank == 3: + count.five_stars_count += 1 + count.four_stars_count += 1 + data = random.choice(gacha_info["r3_prob_list"]) + return { + "item_type": data["item_type"], + "item_name": data["item_name"], + "rank": 3, + } + else: + raise ValueError("rank value error")