Initial commit

This commit is contained in:
洛水.山岭居室 2022-04-14 15:18:45 +08:00
parent 9a3ba38020
commit 760ea19c76
207 changed files with 38585 additions and 1 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/config/config.json
**_test.html

View File

@ -1 +1,52 @@
# TGPaimonBot
# TGPaimonBot
## 简介
基于
[python-telegram-bot](https://github.com/python-telegram-bot/python-telegram-bot)
的异步测试分支的原神BOT
~~为啥用测试版本?别问,问就是我想急着用了。~~
### 其他说明
这个项目目前正在扩展,加入更多原神相关娱乐和信息查询功能,敬请期待。
## 命令
### 群验证
> 下面都是最初在同步版本的说明,现在哈子迁移
### inline命令
目前本项目最初大多数基于 [inline](https://core.telegram.org/bots/inline) 为主的命令。
在聊天框输入对应的机器人名称,
如官方的贴纸机器人 `@Stickers` 在选择贴纸的时候按钮会拉起inline十分方便选择要修改的贴纸。
### 命令列表
| Quite | Return |
| :-------------------------:|--------------------------------------------|
| 对应的角色名称(如 `胡桃` | 角色相关信息 |
| 输入 `抽卡模拟器` | 启动抽卡模拟器以10发进行计算后面加上数字并用空格分开可进行自定义的许愿次数 |
### 关于inline命令
#### inline命令带来的好处
无需把BOT拉进群在输入框输入BOT名称即可使用。
#### inline命令带来的限制
用户输入的信息`Quite`后10S内机器人必须处理完毕否则会抛出
`telegram.error.BadRequest: Query is too old and response timeout expired or query id is invalid`
错误。
而且返回图片时只能传递图片连接,不能传递 `bytes`
## Thanks
| Nickname | Contribution |
| :----------------------------------------------------------: | ----------------------------------- |
|[原神抽卡全机制总结](https://www.bilibili.com/read/cv10468091) | 本项目抽卡模拟器使用的逻辑 |
|[西风驿站](https://bbs.mihoyo.com/ys/collection/307224)| 本项目攻略图图源 |

25
config.py Normal file
View File

@ -0,0 +1,25 @@
import ujson
import os
class Config(object):
def __init__(self):
project_path = os.path.dirname(__file__)
config_file = os.path.join(project_path, './config', 'config.json')
if not os.path.exists(config_file):
config_file = os.path.join(project_path, './config', 'config.example.json')
with open(config_file, 'r', encoding='utf-8') as f:
self._config_json: dict = ujson.load(f)
self.DEBUG: bool = self.get_config("debug")
self.ADMINISTRATORS = self.get_config('administrators')
self.MYSQL = self.get_config('mysql')
self.TELEGRAM = self.get_config('telegram')
self.FUNCTION = self.get_config('function')
def get_config(self, name: str):
return self._config_json.get(name, '')
config = Config()

View File

@ -0,0 +1,29 @@
{
"mysql": {
"host": "127.0.0.1",
"port": 3306,
"user": "",
"pass": "",
"database": ""
},
"redis": {
"host": "127.0.0.1",
"port": 6379,
"database": 0
},
"telegram": {
"token": "",
"administrators":
[
{
"username": "",
"userid": ,
"type": 0
},
{
"username": "",
"userid": ,
"type": 1
}
]
}

75
logger.py Normal file
View File

@ -0,0 +1,75 @@
import logging
from logging.handlers import RotatingFileHandler # 按文件大小滚动备份
import colorlog # 控制台日志输入颜色
import os
cur_path = os.path.realpath(os.getcwd()) # log_path是存放日志的路径
log_path = os.path.join(cur_path, 'logs')
if not os.path.exists(log_path):
os.mkdir(log_path) # 如果不存在这个logs文件夹就自动创建一个
logName = os.path.join(log_path, 'log.log') # 文件的命名
log_colors_config = {
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
class Logger:
def __init__(self):
self.logName = logName
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
# self.formatter = colorlog.ColoredFormatter( '%(log_color)s[%(asctime)s] [%(filename)s:%(lineno)d] [%(
# module)s:%(funcName)s] [%(levelname)s] - %(message)s', log_colors=log_colors_config)
self.formatter = colorlog.ColoredFormatter(
'%(log_color)s[%(asctime)s] [%(levelname)s] - %(message)s', log_colors=log_colors_config)
self.formatter2 = colorlog.ColoredFormatter(
'[%(asctime)s] [%(levelname)s] - %(message)s')
def getLogger(self):
return self.logger
def __console(self, level, message, exc_info=None):
# 创建一个FileHandler用于写到本地
fh = RotatingFileHandler(filename=self.logName, maxBytes=1024 * 1024 * 5, backupCount=5,
encoding='utf-8') # 使用RotatingFileHandler类滚动备份日志
fh.setLevel(logging.INFO)
fh.setFormatter(self.formatter2)
self.logger.addHandler(fh)
ch = colorlog.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(self.formatter)
self.logger.addHandler(ch)
if level == 'info':
self.logger.info(msg=message, exc_info=exc_info)
elif level == 'debug':
self.logger.debug(msg=message, exc_info=exc_info)
elif level == 'warning':
self.logger.warning(msg=message, exc_info=exc_info)
elif level == 'error':
self.logger.error(msg=message, exc_info=exc_info)
# 这两行代码是为了避免日志输出重复问题
self.logger.removeHandler(ch)
self.logger.removeHandler(fh)
fh.close() # 关闭打开的文件
def debug(self, msg, exc_info=None):
self.__console('debug', msg, exc_info)
def info(self, msg, exc_info=None):
self.__console('info', msg, exc_info)
def warning(self, msg, exc_info=None):
self.__console('warning', msg, exc_info)
def error(self, msg, exc_info=None):
self.__console('error', msg, exc_info)
Log = Logger()

108
main.py Normal file
View File

@ -0,0 +1,108 @@
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ConversationHandler, \
CallbackQueryHandler, InlineQueryHandler, CallbackContext
from plugins.admin import Admin
from plugins.auth import Auth
from plugins.cookies import Cookies
from plugins.errorhandler import error_handler
from plugins.gacha import Gacha
from plugins.get_user import GetUser
from plugins.inline import Inline
from plugins.job_queue import JobQueue
from plugins.quiz import Quiz
from plugins.sign import Sign
from plugins.start import start, help_command, new_chat_members, emergency_food, ping
from plugins.weapon import Weapon
from service import StartService
from service.repository import AsyncRepository
from config import config
from service.cache import RedisCache
def main() -> None:
repository = AsyncRepository(mysql_host=config.MYSQL["host"],
mysql_user=config.MYSQL["user"],
mysql_password=config.MYSQL["password"],
mysql_port=config.MYSQL["port"],
mysql_database=config.MYSQL["database"]
)
cache = RedisCache(db=6)
service = StartService(repository, cache)
application = Application.builder().token(config.TELEGRAM["token"]).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("ping", ping))
# application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, new_chat_members))
auth = Auth(service)
application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, auth.new_mem))
application.add_handler(CallbackQueryHandler(auth.query, pattern=r"^auth_challenge\|"))
application.add_handler(CallbackQueryHandler(auth.admin, pattern=r"^auth_admin\|"))
# application.add_handler(MessageHandler((filters.Regex(r'.派蒙是应急食品') & filters.ChatType.PRIVATE), emergency_food))
cookies = Cookies(service)
cookies_handler = ConversationHandler(
entry_points=[CommandHandler('set_cookies', cookies.command_start),
MessageHandler(filters.Regex(r"^绑定账号(.*)"), cookies.command_start)],
states={
cookies.CHECK_SERVER: [MessageHandler(filters.TEXT, cookies.check_server)],
cookies.CHECK_COOKIES: [MessageHandler(filters.TEXT, cookies.check_cookies)],
cookies.COMMAND_RESULT: [MessageHandler(filters.TEXT, cookies.command_result)],
},
fallbacks=[CommandHandler('cancel', cookies.cancel)],
)
get_user = GetUser(service)
get_user_handler = ConversationHandler(
entry_points=[CommandHandler('get_user', get_user.command_start),
MessageHandler(filters.Regex(r"^玩家查询(.*)"), get_user.command_start)],
states={
get_user.COMMAND_RESULT: [CallbackQueryHandler(get_user.command_result)]
},
fallbacks=[CommandHandler('cancel', get_user.cancel)],
)
sign = Sign(service)
sign_handler = ConversationHandler(
entry_points=[CommandHandler('sign', sign.command_start),
MessageHandler(filters.Regex(r"^每日签到(.*)"), sign.command_start)],
states={
sign.COMMAND_RESULT: [CallbackQueryHandler(sign.command_result)]
},
fallbacks=[CommandHandler('cancel', sign.cancel)],
)
application.add_handler(sign_handler)
quiz = Quiz(service)
quiz_handler = ConversationHandler(
entry_points=[CommandHandler('quiz', quiz.command_start)],
states={
quiz.CHECK_COMMAND: [MessageHandler(filters.TEXT, quiz.check_command)],
quiz.CHECK_QUESTION: [MessageHandler(filters.TEXT, quiz.check_question)],
quiz.GET_NEW_QUESTION: [MessageHandler(filters.TEXT, quiz.get_new_question)],
quiz.GET_NEW_CORRECT_ANSWER: [MessageHandler(filters.TEXT, quiz.get_new_correct_answer)],
quiz.GET_NEW_WRONG_ANSWER: [MessageHandler(filters.TEXT & ~filters.COMMAND, quiz.get_new_wrong_answer),
CommandHandler("finish", quiz.finish_edit)],
quiz.SAVE_QUESTION: [MessageHandler(filters.TEXT, quiz.save_question)],
},
fallbacks=[CommandHandler('cancel', quiz.cancel)]
)
gacha = Gacha(service)
application.add_handler(CommandHandler("gacha", gacha.command_start))
admin = Admin(service)
application.add_handler(CommandHandler("add_admin", admin.add_admin))
application.add_handler(CommandHandler("del_admin", admin.del_admin))
weapon = Weapon(service)
application.add_handler(CommandHandler("weapon", weapon.command_start))
application.add_handler(MessageHandler(filters.Regex(r"^武器查询(.*)"), weapon.command_start))
application.add_handler(quiz_handler)
application.add_handler(cookies_handler)
application.add_handler(get_user_handler)
inline = Inline(service)
application.add_handler(InlineQueryHandler(inline.inline_query))
job_queue = JobQueue(service)
application.job_queue.run_once(job_queue.start_job, when=3, name="start_job")
# application.add_handler(MessageHandler(filters.COMMAND & filters.ChatType.PRIVATE, unknown_command))
application.add_error_handler(error_handler)
application.run_polling()
if __name__ == '__main__':
main()

6
metadata/README.md Normal file
View File

@ -0,0 +1,6 @@
# metadata 目录说明
该Metadata文件来自
[Snap.Genshin](https://github.com/DGP-Studio/Snap.Genshin/tree/main/Metadata)
非常感谢他们的付出

28702
metadata/characters.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,65 @@
[
{
"City": "Mondstadt",
"Name": "抗争",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Resistance",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_453.png"
},
{
"City": "Liyue",
"Name": "繁荣",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Prosperity",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_443.png"
},
{
"City": "Inazuma",
"Name": "天光",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Light",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_428.png"
},
{
"City": "Liyue",
"Name": "黄金",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Gold",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_433.png"
},
{
"City": "Mondstadt",
"Name": "自由",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Freedom",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_423.png"
},
{
"City": "Mondstadt",
"Name": "诗文",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Ballad",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_403.png"
},
{
"City": "Inazuma",
"Name": "风雅",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Elegance",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_418.png"
},
{
"City": "Liyue",
"Name": "勤劳",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Diligence",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_413.png"
},
{
"City": "Inazuma",
"Name": "浮世",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/4star.png",
"Key": "Talent_Transience",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/guide/i_408.png"
}
]

View File

@ -0,0 +1,65 @@
[
{
"City": "Liyue",
"Name": "漆黑陨铁",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_Aerosiderite",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_554.png"
},
{
"City": "Liyue",
"Name": "孤云寒林",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_Guyun",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_514.png"
},
{
"City": "Liyue",
"Name": "雾海云间",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_MistVeiled",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_534.png"
},
{
"City": "Inazuma",
"Name": "远海夷地",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_DistantSea",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_564.png"
},
{
"City": "Mondstadt",
"Name": "凛风奔狼",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_BorealWolf",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_524.png"
},
{
"City": "Inazuma",
"Name": "鸣神御灵",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_Narukami",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_574.png"
},
{
"City": "Mondstadt",
"Name": "高塔孤王",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_Decarabian",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_504.png"
},
{
"City": "Inazuma",
"Name": "今昔剧画",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_Mask",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_584.png"
},
{
"City": "Mondstadt",
"Name": "狮牙斗士",
"Star": "https://genshin.honeyhunterworld.com/img/back/item/5star.png",
"Key": "Weapon_DandelionGladiator",
"Source": "https://genshin.honeyhunterworld.com/img/upgrade/weapon/i_544.png"
}
]

30
metadata/elements.json Normal file
View File

@ -0,0 +1,30 @@
[
{
"Key": "Dendro",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/dendro.png"
},
{
"Key": "Anemo",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/anemo.png"
},
{
"Key": "Pryo",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/pyro.png"
},
{
"Key": "Cryo",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/cryo.png"
},
{
"Key": "Hydro",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/hydro.png"
},
{
"Key": "Electro",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/electro.png"
},
{
"Key": "Geo",
"Source": "https://genshin.honeyhunterworld.com/img/icons/element/geo.png"
}
]

2408
metadata/gachaevents.json Normal file

File diff suppressed because it is too large Load Diff

37
metadata/metadata.py Normal file
View File

@ -0,0 +1,37 @@
import os
from typing import List
import ujson
class Metadata(object):
def __init__(self):
project_path = os.path.dirname(__file__)
characters_file = os.path.join(project_path, 'characters.json')
with open(characters_file, 'r', encoding='utf-8') as f:
self.characters = ujson.load(f)
daily_talents_file = os.path.join(project_path, 'dailytalents.json')
with open(daily_talents_file, 'r', encoding='utf-8') as f:
self.daily_talents = ujson.load(f)
daily_weapons_file = os.path.join(project_path, 'dailyweapons.json')
with open(daily_weapons_file, 'r', encoding='utf-8') as f:
self.daily_weapons_file = ujson.load(f)
weapons_file = os.path.join(project_path, 'weapons.json')
with open(weapons_file, 'r', encoding='utf-8') as f:
self.weapons = ujson.load(f)
gacha_events_file = os.path.join(project_path, 'gachaevents.json')
with open(gacha_events_file, 'r', encoding='utf-8') as f:
self.gacha_events = ujson.load(f)
# 初始化
self.characters_name_list: List[str] = [characters["Name"] for characters in self.characters]
self.weapons_name_list: List[str] = [weapons["Name"] for weapons in self.weapons]
@staticmethod
def get_info(data: dict, name: str) -> dict:
for temp in data:
if temp["Name"] == name:
return temp
return {}
metadat = Metadata()

3475
metadata/weapons.json Normal file

File diff suppressed because it is too large Load Diff

22
metadata/weapontypes.json Normal file
View File

@ -0,0 +1,22 @@
[
{
"Key": "Bow",
"Source": "https://genshin.honeyhunterworld.com/img/skills/s_213101.png"
},
{
"Key": "Sword",
"Source": "https://genshin.honeyhunterworld.com/img/skills/s_33101.png"
},
{
"Key": "Catalyst",
"Source": "https://genshin.honeyhunterworld.com/img/skills/s_43101.png"
},
{
"Key": "Claymore",
"Source": "https://genshin.honeyhunterworld.com/img/skills/s_163101.png"
},
{
"Key": "Polearm",
"Source": "https://genshin.honeyhunterworld.com/img/skills/s_233101.png"
}
]

44
model/base.py Normal file
View File

@ -0,0 +1,44 @@
import imghdr
from enum import Enum
class Stat:
def __init__(self, view_num: int = 0, reply_num: int = 0, like_num: int = 0, bookmark_num: int = 0,
forward_num: int = 0):
self.forward_num = forward_num # 关注数
self.bookmark_num = bookmark_num # 收藏数
self.like_num = like_num # 喜欢数
self.reply_num = reply_num # 回复数
self.view_num = view_num # 观看数
class ArtworkInfo:
def __init__(self):
self.user_id: int = 0
self.artwork_id: int = 0 # 作品ID
self.site = ""
self.title: str = "" # 标题
self.origin_url: str = ""
self.site_name: str = ""
self.tags: list = []
self.stat: Stat = Stat()
self.create_timestamp: int = 0
self.info = None
class ArtworkImage:
def __init__(self, art_id: int, page: int = 0, is_error: bool = False, data: bytes = b""):
self.art_id = art_id
self.data = data
self.is_error = is_error
if not is_error:
self.format: str = imghdr.what(None, self.data)
self.page = page
class ServiceEnum(Enum):
NULL = None
MIHOYO = 1
HOYOLAB = 2

View File

@ -0,0 +1,3 @@
from .mihoyo import *
from .hoyolab import *
from .gacha import *

128
model/genshinhelper/base.py Normal file
View File

@ -0,0 +1,128 @@
import imghdr
from enum import Enum
class ArtworkImage:
def __init__(self, art_id: int, page: int = 0, is_error: bool = False, data: bytes = b""):
self.art_id = art_id
self.data = data
self.is_error = is_error
if not is_error:
self.format: str = imghdr.what(None, self.data)
self.page = page
class BaseResponseData:
def __init__(self, response=None, error_message: str = ""):
if response is None:
self.error: bool = True
self.message: str = error_message
return
self.response: dict = response
self.code = response["retcode"]
if self.code == 0:
self.error = False
else:
self.error = True
self.message = response["message"]
self.data = response["data"]
class Stat:
def __init__(self, view_num: int = 0, reply_num: int = 0, like_num: int = 0, bookmark_num: int = 0,
forward_num: int = 0):
self.forward_num = forward_num # 关注数
self.bookmark_num = bookmark_num # 收藏数
self.like_num = like_num # 喜欢数
self.reply_num = reply_num # 回复数
self.view_num = view_num # 观看数
class ArtworkInfo:
def __init__(self, post_id: int = 0, subject: str = "", tags=None,
image_url_list=None, stat: Stat = None, uid: int = 0, created_at: int = 0):
if tags is None:
self.tags = []
else:
self.tags = tags
if image_url_list is None:
self.image_url_list = []
else:
self.image_url_list = image_url_list
self.Stat = stat
self.created_at = created_at
self.uid = uid
self.subject = subject
self.post_id = post_id
class MiHoYoBBSResponse:
def __init__(self, response=None, error_message: str = ""):
if response is None:
self.error: bool = True
self.message: str = error_message
return
self.response: dict = response
self.code = response["retcode"]
if self.code == 0:
self.error = False
else:
if self.code == 1102:
self.message = "作品不存在"
self.error = True
return
if response["data"] is None:
self.error = True
self.message: str = response["message"]
if self.error:
return
try:
self._data_post = response["data"]["post"]
post = self._data_post["post"] # 投稿信息
post_id = post["post_id"]
subject = post["subject"] # 介绍类似title标题
created_at = post["created_at"] # 创建时间
user = self._data_post["user"] # 用户数据
uid = user["uid"] # 用户ID
topics = self._data_post["topics"] # 存放 Tag
image_list = self._data_post["image_list"] # image_list
except (AttributeError, TypeError) as err:
self.error: bool = True
self.message: str = err
return
topics_list = []
image_url_list = []
for topic in topics:
topics_list.append(topic["name"])
for image in image_list:
image_url_list.append(image["url"])
self.post_id = post["post_id"]
self.user_id = user["uid"]
self.created_at = post["created_at"]
stat = Stat(view_num=self._data_post["stat"]["view_num"],
reply_num=self._data_post["stat"]["reply_num"],
like_num=self._data_post["stat"]["like_num"],
bookmark_num=self._data_post["stat"]["bookmark_num"],
forward_num=self._data_post["stat"]["forward_num"],
)
self.results = ArtworkInfo(
subject=subject,
created_at=created_at,
uid=uid,
stat=stat,
tags=topics_list,
post_id=post_id,
image_url_list=image_url_list
)
def __bool__(self):
return self.error
def __len__(self):
return len(self.results.image_url_list)
class ServiceEnum(Enum):
MIHOYO = 1
HOYOLAB = 2

View File

@ -0,0 +1,29 @@
import httpx
from model.genshinhelper import BaseResponseData
class GachaInfo:
GACHA_LIST_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/gacha/list.json"
GACHA_INFO_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/%s/zh-cn.json"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/90.0.4430.72 Safari/537.36"
def __init__(self):
self.headers = {
'User-Agent': self.USER_AGENT,
}
self.client = httpx.AsyncClient(headers=self.headers)
async def get_gacha_list_info(self) -> BaseResponseData:
req = await self.client.get(self.GACHA_LIST_URL)
if req.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(req.json())
async def get_gacha_info(self, gacha_id: str) -> dict:
req = await self.client.get(self.GACHA_INFO_URL % gacha_id)
if req.is_error:
return {}
return req.json()

View File

@ -0,0 +1,53 @@
import hashlib
import random
import string
import time
import uuid
def get_device_id(name: str) -> str:
return str(uuid.uuid3(uuid.NAMESPACE_URL, name)).replace('-', '').upper()
def md5(text: str) -> str:
_md5 = hashlib.md5()
_md5.update(text.encode())
return _md5.hexdigest()
def random_text(num: int) -> str:
return ''.join(random.sample(string.ascii_lowercase + string.digits, num))
def timestamp() -> int:
return int(time.time())
def get_ds(salt: str = "", web: int = 1) -> str:
if salt == "":
if web == 1:
salt = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
elif web == 2:
salt = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
elif web == 3:
salt = "fd3ykrh7o1j54g581upo1tvpam0dsgtf"
i = str(timestamp())
r = random_text(6)
c = md5("salt=" + salt + "&t=" + i + "&r=" + r)
return f"{i},{r},{c}"
def recognize_server(uid: int) -> str:
server = {
"1": "cn_gf01",
"2": "cn_gf01",
"5": "cn_qd01",
"6": "os_usa",
"7": "os_euro",
"8": "os_asia",
"9": "os_cht",
}.get(str(uid)[0])
if server:
return server
else:
raise TypeError(f"UID {uid} isn't associated with any server")

View File

@ -0,0 +1,96 @@
from httpx import AsyncClient
from model.genshinhelper import BaseResponseData
from model.genshinhelper.helpers import get_ds, get_device_id, recognize_server
class Genshin:
SIGN_INFO_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/info"
SIGN_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/sign"
SIGN_HOME_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/home"
APP_VERSION = "2.11.1"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
REFERER = "https://webstatic.hoyoverse.com"
ORIGIN = "https://webstatic.hoyoverse.com"
ACT_ID = "e202102251931481"
DS_SALT = "6cqshh5dhw73bzxn20oexa9k516chk7s"
def __init__(self):
self.headers = {
"Origin": self.ORIGIN,
'DS': get_ds(self.DS_SALT),
'x-rpc-app_version': self.APP_VERSION,
'User-Agent': self.USER_AGENT,
'x-rpc-client_type': '5', # 1为ios 2为安卓 4为pc_web 5为mobile_web
'Referer': self.REFERER,
'x-rpc-device_id': get_device_id(self.USER_AGENT)}
self.client = AsyncClient(headers=self.headers)
async def is_sign(self, uid: int, region: str = "", cookies: dict = None, lang: str = 'zh-cn'):
"""
检查是否签到
:param lang: 语言
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
if region == "":
region = recognize_server(uid)
params = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid,
"lang": lang
}
req = await self.client.get(self.SIGN_INFO_URL, params=params, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(req.json())
async def sign(self, uid: int, region: str = "", cookies: dict = None, lang: str = 'zh-cn'):
"""
执行签到
:param lang:
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
if region == "":
region = recognize_server(uid)
data = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid,
"lang": lang
}
req = await self.client.post(self.SIGN_URL, json=data, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="签到失败")
return BaseResponseData(req.json())
async def get_sign_give(self, cookies: dict = None, lang: str = 'zh-cn'):
"""
返回今日签到信息
:param lang:
:param cookies:
:return:
"""
params = {
"act_id": self.ACT_ID,
"lang": lang
}
req = await self.client.get(self.SIGN_HOME_URL, params=params, cookies=cookies)
if req.is_error:
return
return BaseResponseData(req.json())
async def __aenter__(self):
pass
async def __aexit__(self, exc_type, exc, tb):
await self.client.aclose()

View File

@ -0,0 +1,186 @@
import asyncio
from typing import List
import httpx
from httpx import AsyncClient
from .base import MiHoYoBBSResponse, ArtworkImage, BaseResponseData
from .helpers import get_ds, get_device_id
class Mihoyo:
POST_FULL_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFull"
POST_FULL_IN_COLLECTION_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFullInCollection"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/90.0.4430.72 Safari/537.36"
def __init__(self):
self.client = httpx.AsyncClient(headers=self.get_headers())
def get_info_url(self, post_id: int) -> str:
return f"{self.POST_FULL_URL}?gids=2&post_id={post_id}&read=1"
def get_headers(self, referer: str = "https://bbs.mihoyo.com/"):
return {
"User-Agent": self.USER_AGENT,
"Referer": referer
}
@staticmethod
def get_list_url_params(forum_id: int, is_good: bool = False, is_hot: bool = False,
page_size: int = 20) -> dict:
params = {
"forum_id": forum_id,
"gids": 2,
"is_good": is_good,
"is_hot": is_hot,
"page_size": page_size,
"sort_type": 1
}
return params
@staticmethod
def get_images_params(resize: int = 600, quality: int = 80, auto_orient: int = 0, interlace: int = 1,
images_format: str = "jpg"):
"""
image/resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,jpg
:param resize: 图片大小
:param quality: 图片质量
:param auto_orient: 自适应
:param interlace: 未知
:param images_format: 图片格式
:return:
"""
params = f"image/resize,s_{resize}/quality,q_{quality}/auto-orient," \
f"{auto_orient}/interlace,{interlace}/format,{images_format}"
return {"x-oss-process": params}
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> BaseResponseData:
params = {
"collection_id": collection_id,
"gids": gids,
"order_type": order_type
}
response = await self.client.get(url=self.POST_FULL_IN_COLLECTION_URL, params=params)
if response.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(response.json())
async def get_artwork_info(self, post_id: int) -> MiHoYoBBSResponse:
url = self.get_info_url(post_id)
headers = self.get_headers()
response = await self.client.get(url=url, headers=headers)
if response.is_error:
return MiHoYoBBSResponse(error_message="请求错误")
return MiHoYoBBSResponse(response.json())
async def get_images_by_post_id(self, post_id: int) -> List[ArtworkImage]:
artwork_info = await self.get_artwork_info(post_id)
if artwork_info.error:
return []
urls = artwork_info.results.image_url_list
art_list = []
task_list = [
self.download_image(artwork_info.post_id, urls[page], page) for page in range(len(urls))
]
result_list = await asyncio.gather(*task_list)
for result in result_list:
if isinstance(result, ArtworkImage):
art_list.append(result)
def take_page(elem: ArtworkImage):
return elem.page
art_list.sort(key=take_page)
return art_list
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=5)
if response.is_error:
return ArtworkImage(art_id, page, True)
return ArtworkImage(art_id, page, data=response.content)
async def close(self):
await self.client.aclose()
class YuanShen:
SIGN_INFO_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/info"
SIGN_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign"
SIGN_HOME_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/home"
APP_VERSION = "2.3.0"
USER_AGENT = "Mozilla/5.0 (Linux; Android 9; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Version/4.0 Chrome/39.0.0.0 Mobile Safari/537.36 miHoYoBBS/2.3.0"
REFERER = "https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?" \
"bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon"
ORIGIN = "https://webstatic.mihoyo.com"
ACT_ID = "e202009291139501"
DS_SALT = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
def __init__(self):
self.headers = {
"Origin": self.ORIGIN,
'DS': get_ds(self.DS_SALT),
'x-rpc-app_version': self.APP_VERSION,
'User-Agent': self.USER_AGENT,
'x-rpc-client_type': '5', # 1为ios 2为安卓 4为pc_web 5为mobile_web
'Referer': self.REFERER,
'x-rpc-device_id': get_device_id(self.USER_AGENT)}
self.client = AsyncClient(headers=self.headers)
async def is_sign(self, uid: int, region: str = "cn_gf01", cookies: dict = None):
"""
检查是否签到
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
params = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid
}
req = await self.client.get(self.SIGN_INFO_URL, params=params, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(req.json())
async def sign(self, uid: int, region: str = "cn_gf01", cookies: dict = None):
"""
执行签到
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
data = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid
}
req = await self.client.post(self.SIGN_URL, json=data, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="签到失败")
return BaseResponseData(req.json())
async def get_sign_give(self, cookies: dict = None):
"""
返回今日签到信息
:param cookies:
:return:
"""
params = {
"act_id": self.ACT_ID
}
req = await self.client.get(self.SIGN_HOME_URL, params=params, cookies=cookies)
if req.is_error:
return
return BaseResponseData(req.json())
async def __aenter__(self):
pass
async def __aexit__(self, exc_type, exc, tb):
await self.client.aclose()

47
model/helpers.py Normal file
View File

@ -0,0 +1,47 @@
import hashlib
import os
from typing import List
import httpx
from telegram import Bot
from service.cache import RedisCache
import aiofiles
USER_AGENT: str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/90.0.4430.72 Safari/537.36"
headers: dict = {'User-Agent': USER_AGENT}
current_dir = os.getcwd()
cache_dir = os.path.join(current_dir, "cache")
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
async def get_admin_list(bot: Bot, cache: RedisCache, chat_id: int, extra_user: List[int]) -> List[int]:
admin_id_list = await cache.get_chat_admin(chat_id)
if len(admin_id_list) == 0:
admin_list = await bot.get_chat_administrators(chat_id)
admin_id_list = [admin.user.id for admin in admin_list]
await cache.set_chat_admin(chat_id, admin_id_list)
admin_id_list += extra_user
return admin_id_list
def sha1(text: str) -> str:
_sha1 = hashlib.sha1()
_sha1.update(text.encode())
return _sha1.hexdigest()
async def url_to_file(url: str, prefix: str = "file://") -> str:
url_sha1 = sha1(url)
url_file_name = os.path.basename(url)
_, extension = os.path.splitext(url_file_name)
temp_file_name = url_sha1 + extension
file_dir = os.path.join(cache_dir, temp_file_name)
if not os.path.exists(file_dir):
async with httpx.AsyncClient(headers=headers) as client:
data = await client.get(url)
if data.is_error:
return ""
async with aiofiles.open(file_dir, mode='wb') as f:
await f.write(data.content)
return prefix + file_dir

3
plugins/README.md Normal file
View File

@ -0,0 +1,3 @@
# plugins 目录说明
该目录仅限处理交互层和业务层数据交换的任务如有任何新业务接口请转到service目录添加。

44
plugins/admin.py Normal file
View File

@ -0,0 +1,44 @@
from telegram import Update
from telegram.ext import CallbackContext
from plugins.base import BasePlugins
from service import BaseService
class Admin(BasePlugins):
def __init__(self, service: BaseService):
super().__init__(service)
async def add_admin(self, update: Update, context: CallbackContext):
message = update.message
user = message.from_user
reply_to_message = message.reply_to_message
admin_list = await self.service.admin.get_admin_list()
if user.id in admin_list:
if reply_to_message is None:
await message.reply_text("请回复对应消息")
else:
if reply_to_message.from_user.id in admin_list:
await message.reply_text("该用户已经存在管理员列表")
else:
await self.service.admin.add_admin(reply_to_message.from_user.id)
await message.reply_text("添加成功")
else:
await message.reply_text("权限不足")
async def del_admin(self, update: Update, context: CallbackContext):
message = update.message
user = message.from_user
reply_to_message = message.reply_to_message
admin_list = await self.service.admin.get_admin_list()
if user.id in admin_list:
if reply_to_message is None:
await message.reply_text("请回复对应消息")
else:
if reply_to_message.from_user.id in admin_list:
await self.service.admin.delete_admin(reply_to_message.from_user.id)
await message.reply_text("删除成功")
else:
await message.reply_text("该用户不存在管理员列表")
else:
await message.reply_text("权限不足")

232
plugins/auth.py Normal file
View File

@ -0,0 +1,232 @@
import time
import datetime
import random
from typing import Tuple
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions
from telegram.constants import ParseMode
from telegram.error import BadRequest
from telegram.ext import CallbackContext
from numpy.random import Generator, MT19937
from telegram.helpers import escape_markdown
from model.helpers import get_admin_list
from service import BaseService
FullChatPermissions = ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_polls=True,
can_send_other_messages=True,
can_add_web_page_previews=True,
can_change_info=True,
can_invite_users=True,
can_pin_messages=True,
)
class Auth:
def __init__(self, service: BaseService):
self.service = service
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
self.time_out = 120
self.kick_time = 120
def random(self, low: int, high: int) -> int:
if self.send_time + 24 * 60 * 60 >= time.time():
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
return int(self.generator.uniform(low, high))
async def kick(self, context: CallbackContext, chat_id: int, user_id: int) -> bool:
if await context.bot.ban_chat_member(
chat_id=chat_id,
user_id=user_id,
until_date=int(time.time()) + self.kick_time,
):
return True
else:
return False
async def clean(self, context: CallbackContext, chat_id: int, user_id: int, message_id: int) -> bool:
if await context.bot.delete_message(chat_id=chat_id, message_id=message_id):
return True
else:
return False
async def restore(self, context: CallbackContext, chat_id: int, user_id: int) -> bool:
if await context.bot.restrict_chat_member(
chat_id=chat_id,
user_id=user_id,
permissions=FullChatPermissions,
):
return True
else:
return False
async def admin(self, update: Update, context: CallbackContext) -> None:
async def admin_callback(callback_query_data: str) -> Tuple[bool, int]:
_data = callback_query_data.split("|")
if _data[1] == "pass":
_result = True
else:
_result = False
_user_id = int(_data[2])
return _result, _user_id
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
chat = message.chat
if user.id not in await get_admin_list(
bot=context.bot,
cache=self.service.cache,
chat_id=chat.id,
extra_user=[]
):
await callback_query.answer(text=f"你不是管理!\n"
f"再瞎几把点我叫西风骑士团、千岩军和天领奉行了!", show_alert=True)
return
result, user_id = await admin_callback(callback_query.data)
try:
member_info = await context.bot.get_chat_member(chat.id, user_id)
except BadRequest:
user_info = f"{user_id}"
else:
user_info = member_info.user.mention_markdown_v2()
if result:
await callback_query.answer(text="放行", show_alert=False)
await self.restore(context, chat.id, user_id)
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user_id}|clean_join"):
schedule.remove()
await message.edit_text(f"{user_info}{user.mention_markdown_v2()} 放行",
parse_mode=ParseMode.MARKDOWN_V2)
else:
await callback_query.answer(text="驱离", show_alert=False)
await self.kick(context, chat.id, user_id)
await message.edit_text(f"{user_info}{user.mention_markdown_v2()} 驱离",
parse_mode=ParseMode.MARKDOWN_V2)
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user_id}|auth_kick"):
schedule.remove()
async def query(self, update: Update, context: CallbackContext) -> None:
async def query_callback(callback_query_data: str) -> Tuple[int, bool, str, str]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_question_id = int(_data[2])
_answer_id = int(_data[3])
_answer = await self.service.quiz_service.get_answer(_answer_id)
_question = await self.service.quiz_service.get_question(_question_id)
_result = _answer["is_correct"]
_answer_encode = _answer["answer"]
_question_encode = _question["question"]
return _user_id, _result, _question_encode, _answer_encode
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
chat = message.chat
user_id, result, question, answer = await query_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text=f"这不是你的验证!\n"
f"再瞎几把点再按我叫西风骑士团、千岩军和天领奉行了!", show_alert=True)
return
if result:
await callback_query.answer(text="验证成功", show_alert=False)
await self.restore(context, chat.id, user_id)
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user.id}|clean_join"):
schedule.remove()
text = f"{user.mention_markdown_v2()} 验证成功,向着星辰与深渊!\n" \
f"问题:{escape_markdown(question, version=2)} \n" \
f"回答:{escape_markdown(answer, version=2)}"
await message.edit_text(text, parse_mode=ParseMode.MARKDOWN_V2)
else:
await callback_query.answer(text=f"验证失败,请在 {self.time_out} 秒后重试", show_alert=True)
await self.kick(context, chat.id, user_id)
text = f"{user.mention_markdown_v2()} 验证失败,已经赶出提瓦特大陆!\n" \
f"问题:{escape_markdown(question, version=2)} \n" \
f"回答:{escape_markdown(answer, version=2)}"
await message.edit_text(text, parse_mode=ParseMode.MARKDOWN_V2)
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user.id}|auth_kick"):
schedule.remove()
async def new_mem(self, update: Update, context: CallbackContext) -> None:
message = update.message
chat = message.chat
if message.from_user.id in await get_admin_list(
bot=context.bot,
cache=self.service.cache,
chat_id=chat.id,
extra_user=[]
):
await message.reply_text("派蒙检测到管理员邀请,自动放行了!")
return
for user in message.new_chat_members:
if user.is_bot:
continue
if not await context.bot.restrict_chat_member(chat_id=message.chat.id, user_id=user.id,
permissions=ChatPermissions(can_send_messages=False)):
await message.reply_markdown_v2(f"派蒙无法修改 {user.mention_markdown_v2()} 的权限!"
f"请检查是否给派蒙授权管理了")
return
question_id_list = await self.service.quiz_service.get_question_id_list()
if len(question_id_list) == 0:
await message.reply_text(f"旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
return
index = self.random(0, len(question_id_list))
question = await self.service.quiz_service.get_question(question_id_list[index])
options = []
for answer_id in question["answer_id"]:
answer = await self.service.quiz_service.get_answer(answer_id)
options.append(answer)
buttons = [
[
InlineKeyboardButton(
answer["answer"],
callback_data=f"auth_challenge|{user.id}|{question['question_id']}|{answer['answer_id']}",
)
]
for answer in options
]
random.shuffle(buttons)
buttons.append(
[
InlineKeyboardButton(
"放行",
callback_data=f"auth_admin|pass|{user.id}",
),
InlineKeyboardButton(
"驱离",
callback_data=f"auth_admin|kick|{user.id}",
),
]
)
reply_message = f"*欢迎来到「提瓦特」世界!* \n" \
f"问题: {escape_markdown(question['question'], version=2)} \n" \
f"请在 {self.time_out} 内回答问题"
try:
question_message = await message.reply_markdown_v2(reply_message,
reply_markup=InlineKeyboardMarkup(buttons))
except BadRequest as er:
await message.reply_text("派蒙分心了一下,不小心忘记你了,你只能先退出群再进来吧。")
raise er
context.job_queue.scheduler.add_job(self.kick, "date", id=f"{chat.id}|{user.id}|auth_kick",
name=f"{chat.id}|{user.id}|auth_kick", args=[context, chat.id, user.id],
run_date=context.job_queue._tz_now() + datetime.timedelta(
seconds=self.time_out), replace_existing=True)
context.job_queue.scheduler.add_job(self.clean, "date", id=f"{message.chat.id}|{user.id}|auth_clean_join",
name=f"{message.chat.id}|{user.id}|auth_clean_join",
args=[context, message.chat.id, user.id, message.message_id],
run_date=context.job_queue._tz_now() + datetime.timedelta(
seconds=self.time_out), replace_existing=True)
context.job_queue.scheduler.add_job(self.clean, "date",
id=f"{message.chat.id}|{user.id}|auth_clean_question",
name=f"{message.chat.id}|{user.id}|auth_clean_question",
args=[context, message.chat.id, user.id, question_message.message_id],
run_date=context.job_queue._tz_now() + datetime.timedelta(
seconds=self.time_out), replace_existing=True)

32
plugins/base.py Normal file
View File

@ -0,0 +1,32 @@
import datetime
from telegram import Update
from telegram.ext import CallbackContext, ConversationHandler
from service import BaseService
class BasePlugins:
def __init__(self, service: BaseService):
self.service = service
@staticmethod
async def cancel(update: Update, _: CallbackContext) -> int:
await update.message.reply_text("退出命令")
return ConversationHandler.END
@staticmethod
async def _clean(context: CallbackContext, chat_id: int, message_id: int) -> bool:
if await context.bot.delete_message(chat_id=chat_id, message_id=message_id):
return True
else:
return False
def _add_delete_message_job(self, context: CallbackContext, chat_id: int, message_id: int,
delete_seconds: int = 30):
context.job_queue.scheduler.add_job(self._clean, "date",
id=f"{chat_id}|{message_id}|auto_clean_message",
name=f"{chat_id}|{message_id}|auto_clean_message",
args=[context, chat_id, message_id],
run_date=context.job_queue._tz_now() + datetime.timedelta(
seconds=delete_seconds), replace_existing=True)

153
plugins/cookies.py Normal file
View File

@ -0,0 +1,153 @@
from http.cookies import SimpleCookie
import ujson
import genshin
from genshin import InvalidCookies
from telegram import Update, ReplyKeyboardRemove, ReplyKeyboardMarkup
from telegram.ext import CallbackContext, ConversationHandler
from telegram.helpers import escape_markdown
from model.base import ServiceEnum
from plugins.base import BasePlugins
from service import BaseService
from service.base import UserInfoData
class CookiesCommandData:
service = ServiceEnum.MIHOYO
cookies: dict = {}
game_uid: int = 0
user_info: UserInfoData = UserInfoData()
class Cookies(BasePlugins):
CHECK_SERVER, CHECK_COOKIES, COMMAND_RESULT = range(10100, 10103)
def __init__(self, service: BaseService):
super().__init__(service)
async def command_start(self, update: Update, context: CallbackContext) -> int:
cookies_command_data: CookiesCommandData = context.chat_data.get("cookies_command_data")
if cookies_command_data is None:
cookies_command_data = CookiesCommandData()
context.chat_data["cookies_command_data"] = cookies_command_data
user = update.effective_user
message = f'你好 {user.mention_markdown_v2()} {escape_markdown("!请选择要绑定的服务器!或回复退出取消操作")}'
# cookie = await self.repository.read_cookie(user.id)
# if cookie != "":
# message = f'你好 {user.mention_markdown_v2()} ' \
# f'{escape_markdown("你已经绑定Cookies如果继续进行绑定会覆盖Cookie可回复退出取消操作")}'
# await update.message.reply_markdown_v2(message, reply_markup=ReplyKeyboardRemove())
reply_keyboard = [['miHoYo', 'HoYoLab'], ["退出"]]
await update.message.reply_markdown_v2(message,
reply_markup=ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True))
return self.CHECK_SERVER
async def check_server(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
cookies_command_data: CookiesCommandData = context.chat_data.get("cookies_command_data")
user_info = await self.service.user_service_db.get_user_info(user.id)
cookies_command_data.user_info = user_info
if update.message.text == "退出":
await update.message.reply_text("退出任务")
return ConversationHandler.END
elif update.message.text == "miHoYo":
cookies_command_data.service = ServiceEnum.MIHOYO
bbs_url = "https://bbs.mihoyo.com/ys/"
bbs_name = "米游社"
if len(user_info.mihoyo_cookie) > 1:
await update.message.reply_text("警告你已经绑定Cookie如果继续操作会覆盖当前Cookie。")
elif update.message.text == "HoYoLab":
bbs_url = "https://www.hoyolab.com/home"
bbs_name = "HoYoLab"
cookies_command_data.service = ServiceEnum.HOYOLAB
if len(user_info.hoyoverse_cookie) > 1:
await update.message.reply_text("警告你已经绑定Cookie如果继续操作会覆盖当前Cookie。")
else:
await update.message.reply_text("选择错误,请重新选择")
return self.CHECK_SERVER
await update.message.reply_text(f"请输入{bbs_name}的Cookies或回复退出取消操作", reply_markup=ReplyKeyboardRemove())
javascript = "javascript:(()=>{_=(n)=>{for(i in(r=document.cookie.split(';'))){var a=r[i].split('=');if(a[" \
"0].trim()==n)return a[1]}};c=_('account_id')||alert('无效的Cookie,请重新登录!');c&&confirm(" \
"'将Cookie复制到剪贴板?')&&copy(document.cookie)})(); "
help_message = f"*关于如何获取Cookies*\n" \
f"[1、打开{bbs_name}并登录]({bbs_url})\n" \
f"2、按F12打开开发者工具\n" \
f"3、{escape_markdown('将开发者工具切换至控制台(Console)页签', version=2)}\n" \
f"4、复制下方的代码并将其粘贴在控制台中按下回车\n" \
f"`{escape_markdown(javascript, version=2, entity_type='code')}`"
await update.message.reply_markdown_v2(help_message, disable_web_page_preview=True)
return self.CHECK_COOKIES
async def check_cookies(self, update: Update, context: CallbackContext) -> int:
cookies_command_data: CookiesCommandData = context.chat_data.get("cookies_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
str_cookies = update.message.text
cookie = SimpleCookie()
cookie.load(str_cookies)
if len(cookie) == 0:
await update.message.reply_text("Cookies格式有误请检查", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
cookies = {}
for key, morsel in cookie.items():
cookies[key] = morsel.value
if len(cookies) == 0:
await update.message.reply_text("Cookies格式有误请检查", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
if cookies_command_data.service == ServiceEnum.MIHOYO:
client = genshin.ChineseClient(cookies=cookies)
elif cookies_command_data.service == ServiceEnum.HOYOLAB:
client = genshin.GenshinClient(cookies=cookies)
else:
await update.message.reply_text("数据错误", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
try:
user_info = await client.get_record_card()
except InvalidCookies:
await update.message.reply_text("Cookies已经过期请检查是否正确", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
except AttributeError:
await update.message.reply_text("Cookies错误请检查是否正确", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await client.close()
cookies_command_data.cookies = cookies
cookies_command_data.game_uid = user_info.uid
reply_keyboard = [['确认', '退出']]
await update.message.reply_text("获取角色基础信息成功,请检查是否正确!")
message = f"*角色信息*\n" \
f"角色名称:{user_info.nickname}\n" \
f"角色等级:{user_info.level}\n" \
f"UID`{user_info.uid}`\n" \
f"服务器名称:`{user_info.server_name}`\n"
await update.message.reply_markdown_v2(message,
reply_markup=ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True))
return self.COMMAND_RESULT
async def command_result(self, update: Update, context: CallbackContext) -> int:
cookies_command_data: CookiesCommandData = context.chat_data.get("cookies_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "确认":
user = update.effective_user
data = ujson.dumps(cookies_command_data.cookies)
user_info = cookies_command_data.user_info
service = ServiceEnum.NULL.value
if cookies_command_data.service == ServiceEnum.MIHOYO:
user_info.mihoyo_game_uid = cookies_command_data.game_uid
service = ServiceEnum.MIHOYO.value
elif cookies_command_data.service == ServiceEnum.HOYOLAB:
user_info.hoyoverse_game_uid = cookies_command_data.game_uid
service = ServiceEnum.MIHOYO.value
await self.service.user_service_db.set_user_info(user.id, user_info.mihoyo_game_uid,
user_info.hoyoverse_game_uid,
service)
await self.service.user_service_db.set_cookie(user.id, data, cookies_command_data.service)
await update.message.reply_text("保存成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
else:
await update.message.reply_text("回复错误,请重新输入")
return self.COMMAND_RESULT

59
plugins/errorhandler.py Normal file
View File

@ -0,0 +1,59 @@
import html
import traceback
import ujson
from telegram import Update
from telegram.constants import ParseMode
from telegram.error import BadRequest
from telegram.ext import CallbackContext
from logger import Log
from config import config
try:
notice_chat_id = config.TELEGRAM["notice"]["ERROR"]
except KeyError:
Log.warning("错误通知Chat_id获取失败或未配置BOT发生致命错误时不会收到通知")
notice_chat_id = None
async def error_handler(update: object, context: CallbackContext) -> None:
"""
记录错误并发送消息通知开发人员
Log the error and send a telegram message to notify the developer.
"""
Log.error(msg="处理函数时发生异常:", exc_info=context.error)
if notice_chat_id is None:
return
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = ''.join(tb_list)
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message_1 = (
f'<b>处理函数时发生异常</b> \n'
f'Exception while handling an update \n'
f'<pre>update = {html.escape(ujson.dumps(update_str, indent=2, ensure_ascii=False))}'
'</pre>\n\n'
f'<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n'
f'<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n'
)
message_2 = (
f'<pre>{html.escape(tb_string)}</pre>'
)
try:
if 'make sure that only one bot instance is running' in tb_string:
Log.error("其他机器人在运行,请停止!")
return
await context.bot.send_message(chat_id=notice_chat_id, text=message_1, parse_mode=ParseMode.HTML)
await context.bot.send_message(chat_id=notice_chat_id, text=message_2, parse_mode=ParseMode.HTML)
except BadRequest as exc:
if 'too long' in str(exc):
message = (
f'<b>处理函数时发生异常traceback太长导致无法发送但已写入日志</b> \n'
f'<code>{html.escape(str(context.error))}</code>'
)
await context.bot.send_message(chat_id=notice_chat_id, text=message, parse_mode=ParseMode.HTML)
else:
raise exc

78
plugins/gacha.py Normal file
View File

@ -0,0 +1,78 @@
import os
from telegram import Update
from telegram.constants import ChatAction
from telegram.ext import CallbackContext, ConversationHandler
from logger import Log
from plugins.base import BasePlugins
from service import BaseService
from pyppeteer import launch
from metadata.metadata import metadat
from service.wich import WishCountInfo, get_one
class Gacha(BasePlugins):
def __init__(self, service: BaseService):
super().__init__(service)
self.browser: launch = None
self.current_dir = os.getcwd()
self.resources_dir = os.path.join(self.current_dir, "resources")
self.character_gacha_card = {}
for character in metadat.characters:
name = character["Name"]
self.character_gacha_card[name] = character["GachaCard"]
CHECK_SERVER, COMMAND_RESULT = range(10600, 10602)
async def command_start(self, update: Update, context: CallbackContext) -> None:
message = update.message
user = update.effective_user
Log.info(f"用户 {user.full_name}[{user.id}] 抽卡模拟器命令请求")
args = message.text.split(" ")
if len(args) == 1:
gacha_info = await self.service.gacha.gacha_info()
else:
gacha_info = await self.service.gacha.gacha_info(args[1])
# 用户数据储存和处理
if gacha_info.get("gacha_id") is None:
await message.reply_text(f"没有找到 {args[1]} 卡池名称")
return ConversationHandler.END
gacha_id: str = gacha_info["gacha_id"]
user_gacha: dict[str, WishCountInfo] = context.user_data.get("gacha")
if user_gacha is None:
user_gacha = context.user_data["gacha"] = {}
user_gacha_count: WishCountInfo = user_gacha.get(gacha_id)
if user_gacha_count is None:
user_gacha_count = user_gacha[gacha_id] = WishCountInfo(user_id=user.id)
# 用户数据储存和处理
await message.reply_chat_action(ChatAction.FIND_LOCATION)
data = {
"_res_path": f"file://{self.resources_dir}",
"name": "洛水居室",
"info": "卡池测试",
"poolName": gacha_info["title"],
"items": [],
}
for a in range(10):
item = get_one(user_gacha_count, gacha_info)
# item_name = item["item_name"]
# item_type = item["item_type"]
# if item_type == "角色":
# gacha_card = self.character_gacha_card.get(item_name)
# if gacha_card is None:
# await message.reply_text(f"获取角色 {item_name} GachaCard信息失败")
# return
# item["item_character_img"] = await url_to_file(gacha_card)
data["items"].append(item)
def take_rang(elem: dict):
return elem["rank"]
data["items"].sort(key=take_rang, reverse=True)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
png_data = await self.service.template.render('genshin/gacha', "gacha.html", data,
{"width": 1157, "height": 603}, False)
await message.reply_photo(png_data)

120
plugins/get_user.py Normal file
View File

@ -0,0 +1,120 @@
import os
import random
import genshin
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.ext import CallbackContext, ConversationHandler
from logger import Log
from model.base import ServiceEnum
from model.helpers import url_to_file
from plugins.base import BasePlugins
from service import BaseService
from service.base import UserInfoData
class GetUserCommandData:
user_info: UserInfoData = UserInfoData()
class GetUser(BasePlugins):
COMMAND_RESULT, = range(10200, 10201)
def __init__(self, service: BaseService):
super().__init__(service)
self.current_dir = os.getcwd()
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
Log.info(f"用户 {user.full_name}[{user.id}] 查询游戏用户命令请求")
get_user_command_data: GetUserCommandData = context.chat_data.get("get_user_command_data")
if get_user_command_data is None:
get_user_command_data = GetUserCommandData()
context.chat_data["get_user_command_data"] = get_user_command_data
user_info = await self.service.user_service_db.get_user_info(user.id)
if user_info.service == ServiceEnum.NULL:
message = "请选择你要查询的类别"
keyboard = [
[
InlineKeyboardButton("miHoYo", callback_data="miHoYo"),
InlineKeyboardButton("HoYoLab", callback_data="HoYoLab")
]
]
get_user_command_data.user_info = user_info
await update.message.reply_text(message, reply_markup=InlineKeyboardMarkup(keyboard))
return self.COMMAND_RESULT
return ConversationHandler.END
async def command_result(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
get_user_command_data: GetUserCommandData = context.chat_data["get_user_command_data"]
query = update.callback_query
await query.answer()
await query.delete_message()
if query.data == "miHoYo":
client = genshin.ChineseClient(cookies=get_user_command_data.user_info.mihoyo_cookie)
uid = get_user_command_data.user_info.mihoyo_game_uid
elif query.data == "HoYoLab":
client = genshin.GenshinClient(cookies=get_user_command_data.user_info.hoyoverse_cookie, lang="zh-cn")
uid = get_user_command_data.user_info.hoyoverse_game_uid
else:
return ConversationHandler.END
Log.info(f"用户 {user.full_name}[{user.id}] 查询武器命令请求 || 参数 UID {uid}")
user_info = await client.get_user(int(uid))
record_card_info = await client.get_record_card()
await query.message.reply_chat_action(ChatAction.FIND_LOCATION)
user_avatar = user_info.characters[0].icon
user_data = {
"name": record_card_info.nickname,
"uid": record_card_info.uid,
"user_avatar": await url_to_file(user_avatar),
"action_day_number": user_info.stats.days_active,
"achievement_number": user_info.stats.achievements,
"avatar_number": user_info.stats.anemoculi,
"spiral_abyss": user_info.stats.spiral_abyss,
"way_point_number": user_info.stats.unlocked_waypoints,
"domain_number": user_info.stats.unlocked_domains,
"luxurious_number": user_info.stats.luxurious_chests,
"precious_chest_number": user_info.stats.precious_chests,
"exquisite_chest_number": user_info.stats.exquisite_chests,
"common_chest_number": user_info.stats.common_chests,
"magic_chest_number": user_info.stats.remarkable_chests,
"anemoculus_number": user_info.stats.anemoculi,
"geoculus_number": user_info.stats.geoculi,
"electroculus_number": user_info.stats.electroculi,
"world_exploration_list": [],
"teapot_level": user_info.teapot.level,
"teapot_comfort_num": user_info.teapot.comfort,
"teapot_item_num": user_info.teapot.items,
"teapot_visit_num": user_info.teapot.visitors,
"teapot_list": []
}
for exploration in user_info.explorations:
exploration_data = {
"name": exploration.name,
"exploration_percentage": exploration.percentage,
"offerings": [],
"icon": await url_to_file(exploration.icon)
}
for offering in exploration.offerings:
offering_data = {
"data": f"{offering.name}{offering.level}"
}
exploration_data["offerings"].append(offering_data)
user_data["world_exploration_list"].append(exploration_data)
for teapot in user_info.teapot.realms:
teapot_data = {
"icon": await url_to_file(teapot.icon),
"name": teapot.name
}
user_data["teapot_list"].append(teapot_data)
background_image = random.choice(os.listdir(f"{self.current_dir}/resources/background/vertical"))
user_data["background_image"] = f"file://{self.current_dir}/resources/background/vertical/{background_image}"
png_data = await self.service.template.render('genshin/info', "info.html", user_data,
{"width": 1024, "height": 1024})
await query.message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await query.message.reply_photo(png_data, filename=f"{record_card_info.uid}.png",
allow_sending_without_reply=True)
await client.close()
return ConversationHandler.END

145
plugins/inline.py Normal file
View File

@ -0,0 +1,145 @@
from typing import cast
from urllib.parse import urlparse, urlencode, ParseResult
from uuid import uuid4
import httpx
from telegram import InlineQueryResultArticle, InputTextMessageContent, Update, InlineQuery, InlineQueryResultPhoto
from telegram.constants import ParseMode
from telegram.error import BadRequest
from telegram.ext import CallbackContext
from telegram.helpers import escape_markdown
from logger import Log
from service import BaseService
from service.base import QuestionData
from metadata.metadata import metadat
class Inline:
def __init__(self, service: BaseService):
self.service = service
async def inline_query(self, update: Update, context: CallbackContext) -> None:
user = update.effective_user
ilq = cast(InlineQuery, update.inline_query)
query = ilq.query
switch_pm_text = "需要帮助嘛?"
results_list = []
args = query.split(" ")
admin_list = await self.service.admin.get_admin_list()
if args[0] == "":
pass
else:
if "查看问题" == args[0] and user.id in admin_list:
async def append_quiz(_results_list, _quiz: QuestionData):
correct_answer = ""
input_message_content = f"问题ID `{_quiz.question_id}`\n" \
f"问题 `{escape_markdown(_quiz.question, version=2)} \n`"
wrong_answer = []
for _answer in _quiz.answer:
if _answer.is_correct:
correct_answer = escape_markdown(_answer.answer, version=2)
else:
wrong_answer.append(f"`{escape_markdown(_answer.answer, version=2)}`")
input_message_content += f"正确答案 `{correct_answer}`\n"
input_message_content += f"错误答案 {' '.join(wrong_answer)}"
_results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title=_quiz.question,
description=f"正确答案 {correct_answer}",
input_message_content=InputTextMessageContent(input_message_content,
parse_mode=ParseMode.MARKDOWN_V2)
))
quiz_info = await self.service.quiz_service.get_quiz_for_db()
if len(args) >= 2:
search = args[1]
for quiz in quiz_info:
if search in quiz.question:
await append_quiz(results_list, quiz)
else:
for answer in quiz.answer:
if search in answer.answer:
await append_quiz(results_list, quiz)
else:
if len(quiz_info) >= 50:
if len(args) <= 1:
results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title="问题数量已经大于50无法完全展示请在命令后添加题目名称即可指定搜索",
description=f"警告",
input_message_content=InputTextMessageContent("问题数量已经大于40无法完全展示"
"请在命令后添加题目名称即可指定搜索")
))
else:
for quiz in quiz_info:
await append_quiz(results_list, quiz)
for character_name in metadat.characters_name_list:
if args[0] in character_name:
url = await self.service.get_game_info.get_characters_cultivation_atlas(character_name)
if url != "":
title = f"{character_name}角色攻略"
description = f"{character_name}角色攻略"
def url_add_params(_url: str, _params: dict):
_pr = urlparse(_url)
_prlist = list(_pr)
_prlist[4] = urlencode(_params)
return ParseResult(*_prlist).geturl()
caption = "Form [米游社](https://bbs.mihoyo.com/ys/collection/642956) " \
"Via [猫冬](https://bbs.mihoyo.com/ys/accountCenter/postList?id=74019947) " \
f"查看 [原图]({url})"
results_list.append(
InlineQueryResultPhoto(
id=str(uuid4()),
photo_url=url+"?x-oss-process=format,jpg",
thumb_url=url_add_params(url, self.service.get_game_info.mihoyo.get_images_params(
resize=300)),
title=title,
caption=caption,
parse_mode=ParseMode.MARKDOWN_V2,
description=description
)
)
if "查看武器列表并查询" == args[0]:
for weapons_name in metadat.weapons_name_list:
results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title=weapons_name,
description=f"查看武器列表并查询 {weapons_name}",
input_message_content=InputTextMessageContent(f"武器查询{weapons_name}",
parse_mode=ParseMode.MARKDOWN_V2)
))
if len(results_list) == 0:
results_list.append(
InlineQueryResultArticle(
id=str(uuid4()),
title=f"好像找不到问题呢",
description=f"这个问题我也不知道,因为我就是个应急食品。",
input_message_content=InputTextMessageContent("这个问题我也不知道,因为我就是个应急食品。"),
))
try:
await ilq.answer(
results=results_list,
switch_pm_text=switch_pm_text,
switch_pm_parameter="inline_message",
cache_time=0,
auto_pagination=True,
)
except BadRequest as exc:
if "is too old and" in exc.message: # 过时请求全部忽略
pass
if "can't parse entities" not in exc.message:
raise exc
Log.warning("inline_query发生BadRequest错误", exc_info=exc)
await ilq.answer(
results=[],
switch_pm_text="糟糕,发生错误了。",
switch_pm_parameter="inline_message",
)

24
plugins/job_queue.py Normal file
View File

@ -0,0 +1,24 @@
from telegram.ext import CallbackContext
from logger import Log
from plugins.base import BasePlugins
from service import BaseService
class JobQueue(BasePlugins):
def __init__(self, service: BaseService):
super().__init__(service)
async def start_job(self, _: CallbackContext) -> None:
Log.info("初始Job启动成功正在初始化必要任务")
Log.info("正在初始化浏览器")
try:
await self.service.template.get_browser()
except TimeoutError as err:
Log.error("初始化浏览器超时,请检查日记查看错误", err)
except AttributeError as err:
Log.error("初始化浏览器时变量为空,请检查日记查看错误", err)
else:
Log.info("初始化浏览器成功")
Log.info("初始化Job成功")

227
plugins/quiz.py Normal file
View File

@ -0,0 +1,227 @@
import random
import re
import time
from typing import List
from numpy.random import MT19937, Generator
from redis import DataError
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, Poll, ReplyKeyboardRemove
from telegram.ext import CallbackContext, filters, ConversationHandler
from telegram.helpers import escape_markdown
from service import BaseService
from service.base import QuestionData, AnswerData
class QuizCommandData:
question_id: int = -1
new_question: str = ""
new_correct_answer: str = ""
new_wrong_answer: List[str] = []
status: int = 0
class Quiz:
CHECK_COMMAND, VIEW_COMMAND, CHECK_QUESTION, \
GET_NEW_QUESTION, GET_NEW_CORRECT_ANSWER, GET_NEW_WRONG_ANSWER, \
QUESTION_EDIT, SAVE_QUESTION = range(10300, 10308)
def __init__(self, service: BaseService):
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
self.service = service
self.time_out = 120
def random(self, low: int, high: int) -> int:
if self.send_time + 24 * 60 * 60 >= time.time():
self.send_time = time.time()
self.generator = Generator(MT19937(int(self.send_time)))
return int(self.generator.uniform(low, high))
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
if filters.ChatType.PRIVATE.filter(update.message):
admin_list = await self.service.admin.get_admin_list()
if user.id in admin_list:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if quiz_command_data is None:
quiz_command_data = QuizCommandData()
context.chat_data["quiz_command_data"] = quiz_command_data
message = f'你好 {user.mention_markdown_v2()} {escape_markdown("!请选择你的操作!")}'
reply_keyboard = [
["查看问题", "添加问题"],
["重载问题"],
["退出"]
]
await update.message.reply_markdown_v2(message,
reply_markup=ReplyKeyboardMarkup(reply_keyboard,
one_time_keyboard=True))
return self.CHECK_COMMAND
if filters.ChatType.GROUPS.filter(update.message):
question_id_list = await self.service.quiz_service.get_question_id_list()
if len(question_id_list) == 0:
await update.message.reply_text(f"旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
return ConversationHandler.END
index = self.random(0, len(question_id_list))
question = await self.service.quiz_service.get_question(question_id_list[index])
options = []
correct_option = ""
for answer_id in question["answer_id"]:
answer = await self.service.quiz_service.get_answer(answer_id)
options.append(answer["answer"])
if answer["is_correct"] == 1:
correct_option = answer["answer"]
index = options.index(correct_option)
await update.effective_message.reply_poll(question["question"], options, correct_option_id=index,
is_anonymous=False, open_period=self.time_out, type=Poll.QUIZ)
return ConversationHandler.END
return ConversationHandler.END
async def view_command(self, update: Update, context: CallbackContext) -> int:
keyboard = [
[
InlineKeyboardButton(text="选择问题", switch_inline_query_current_chat="查看问题")
]
]
await update.message.reply_text("请回复你要查看的问题",
reply_markup=InlineKeyboardMarkup(keyboard))
return self.CHECK_COMMAND
async def check_question(self, update: Update, context: CallbackContext) -> int:
reply_keyboard = [
["删除问题"],
["退出"]
]
await update.message.reply_text("请选择你的操作", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.CHECK_COMMAND
async def check_command(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "查看问题":
return await self.view_command(update, context)
elif update.message.text == "添加问题":
return await self.add_question(update, context)
elif update.message.text == "删除问题":
return await self.delete_question(update, context)
# elif update.message.text == "修改问题":
# return await self.edit_question(update, context)
elif update.message.text == "重载问题":
return await self.refresh_question(update, context)
else:
result = re.findall(r"问题ID (\d+)", update.message.text)
if len(result) == 1:
try:
question_id = int(result[0])
except ValueError:
await update.message.reply_text("获取问题ID失败")
return ConversationHandler.END
quiz_command_data.question_id = question_id
await update.message.reply_text("获取问题ID成功")
return await self.check_question(update, context)
await update.message.reply_text("命令错误", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def refresh_question(self, update: Update, context: CallbackContext) -> int:
try:
await self.service.quiz_service.refresh_quiz()
except DataError:
await update.message.reply_text("Redis数据错误重载失败", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await update.message.reply_text("重载成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
async def add_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 1
await update.message.reply_text("请回复你要添加的问题", reply_markup=ReplyKeyboardRemove())
return self.GET_NEW_QUESTION
async def get_new_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写正确答案:"
quiz_command_data.new_question = update.message.text
await update.message.reply_markdown_v2(reply_text)
return self.GET_NEW_CORRECT_ANSWER
async def get_new_correct_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"正确答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"请填写错误答案:"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_correct_answer = update.message.text
return self.GET_NEW_WRONG_ANSWER
async def get_new_wrong_answer(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"错误答案:`{escape_markdown(update.message.text, version=2)}`\n" \
f"可继续填写,并使用 {escape_markdown('/finish', version=2)} 结束。"
await update.message.reply_markdown_v2(reply_text)
quiz_command_data.new_wrong_answer.append(update.message.text)
return self.GET_NEW_WRONG_ANSWER
async def finish_edit(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
reply_text = f"问题:`{escape_markdown(quiz_command_data.new_question, version=2)}`\n" \
f"正确答案:`{escape_markdown(quiz_command_data.new_correct_answer, version=2)}`\n" \
f"错误答案:`{escape_markdown(' '.join(quiz_command_data.new_wrong_answer), version=2)}`"
await update.message.reply_markdown_v2(reply_text)
reply_keyboard = [["保存并重载配置", "退出"]]
await update.message.reply_text("请核对问题,并选择下一步操作。", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return self.SAVE_QUESTION
async def save_question(self, update: Update, context: CallbackContext):
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
if update.message.text == "退出":
await update.message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
elif update.message.text == "保存并重载配置":
if quiz_command_data.status == 1:
answer = [
AnswerData(answer=wrong_answer, is_correct=False) for wrong_answer in
quiz_command_data.new_wrong_answer
]
answer.append(AnswerData(answer=quiz_command_data.new_correct_answer, is_correct=True))
await self.service.quiz_service.save_quiz(
QuestionData(question=quiz_command_data.new_question, answer=answer))
await update.message.reply_text("保存成功", reply_markup=ReplyKeyboardRemove())
await self.service.quiz_service.refresh_quiz()
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
else:
await update.message.reply_text("回复错误,请重新选择")
return self.SAVE_QUESTION
async def edit_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
quiz_command_data.new_wrong_answer = []
quiz_command_data.new_question = ""
quiz_command_data.new_correct_answer = ""
quiz_command_data.status = 2
await update.message.reply_text("请回复你要修改的问题", reply_markup=ReplyKeyboardRemove())
return self.GET_NEW_QUESTION
async def delete_question(self, update: Update, context: CallbackContext) -> int:
quiz_command_data: QuizCommandData = context.chat_data.get("quiz_command_data")
# 再问题重载Redis 以免redis数据为空时出现奔溃
await self.service.quiz_service.refresh_quiz()
question = await self.service.quiz_service.get_question(quiz_command_data.question_id)
# 因为外键的存在,先删除答案
for answer_id in question["answer_id"]:
await self.service.repository.delete_answer(answer_id)
await self.service.repository.delete_question(question["question_id"])
await update.message.reply_text("删除问题成功", reply_markup=ReplyKeyboardRemove())
await self.service.quiz_service.refresh_quiz()
await update.message.reply_text("重载配置成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
@staticmethod
async def cancel(update: Update, _: CallbackContext) -> int:
await update.message.reply_text("退出命令", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END

0
plugins/repeat.py Normal file
View File

98
plugins/sign.py Normal file
View File

@ -0,0 +1,98 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import CallbackContext, ConversationHandler, filters
from model.base import ServiceEnum
from model.genshinhelper import YuanShen, Genshin
from plugins.base import BasePlugins
from service import BaseService
from service.base import UserInfoData
class SignCommandData:
user_info: UserInfoData = UserInfoData()
chat_id: int = 0
reply_to_message_id: int = 0
class Sign(BasePlugins):
def __init__(self, service: BaseService):
super().__init__(service)
self._sign_y = YuanShen()
self._sing_g = Genshin()
CHECK_SERVER, COMMAND_RESULT = range(10400, 10402)
async def _start_sign(self, uid: int, cookies: dict, service: ServiceEnum) -> str:
if service == ServiceEnum.MIHOYO:
sign_api = self._sign_y
else:
sign_api = self._sing_g
sign_give = await sign_api.get_sign_give(cookies=cookies)
if sign_give.error:
return f"获取签到信息失败API返回信息为 {sign_give.message}"
is_sign = await sign_api.is_sign(uid, cookies=cookies)
if is_sign.error:
return f"获取签到状态失败API返回信息为 {is_sign.message}"
total_sign_day = is_sign.data["total_sign_day"]
award_name = sign_give.data["awards"][total_sign_day]["name"]
award_cnt = sign_give.data["awards"][total_sign_day]["cnt"]
today = is_sign.data["today"]
if not is_sign.data["is_sign"]:
sign = await sign_api.sign(uid, cookies=cookies)
if sign.code == 0:
result = "OK"
elif sign.code == -5003:
result = "今天旅行者已经签到过了~"
else:
result = f"签到失败 返回错误代码为 {sign.code}"
else:
result = "今天旅行者已经签到过了~"
message = f"###### {today} ######\n" \
f"UID: {uid}\n" \
f"今日奖励: {award_name} × {award_cnt}\n" \
f"签到结果: {result}"
return message
async def command_start(self, update: Update, context: CallbackContext) -> int:
user = update.effective_user
message = update.message
sign_command_data: SignCommandData = context.chat_data.get("sign_command_data")
if sign_command_data is None:
sign_command_data = SignCommandData()
context.chat_data["sign_command_data"] = sign_command_data
user_info = await self.service.user_service_db.get_user_info(user.id)
if user_info.service == ServiceEnum.NULL:
message = "请选择你要签到的服务器"
keyboard = [
[
InlineKeyboardButton("miHoYo", callback_data="miHoYo"),
InlineKeyboardButton("HoYoLab", callback_data="HoYoLab")
]
]
sign_command_data.user_info = user_info
await update.message.reply_text(message, reply_markup=InlineKeyboardMarkup(keyboard))
sign_command_data.chat_id = update.message.chat_id
sign_command_data.reply_to_message_id = update.message.message_id
return self.COMMAND_RESULT
else:
sign = await self._start_sign(user_info.mihoyo_game_uid, user_info.mihoyo_cookie, user_info.service)
reply_message = await message.reply_text(sign)
if filters.ChatType.GROUPS.filter(update.callback_query.message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id)
return ConversationHandler.END
async def command_result(self, update: Update, context: CallbackContext) -> int:
sign_command_data: SignCommandData = context.chat_data["sign_command_data"]
user_info = sign_command_data.user_info
query = update.callback_query
await query.answer()
message = "签到失败"
if query.data == "miHoYo":
message = await self._start_sign(user_info.mihoyo_game_uid, user_info.mihoyo_cookie, ServiceEnum.MIHOYO)
if query.data == "HoYoLab":
message = await self._start_sign(user_info.hoyoverse_game_uid, user_info.hoyoverse_cookie,
ServiceEnum.HOYOLAB)
await query.edit_message_text(message)
if filters.ChatType.GROUPS.filter(update.callback_query.message):
self._add_delete_message_job(context, query.message.chat_id, query.message.message_id)
return ConversationHandler.END

31
plugins/start.py Normal file
View File

@ -0,0 +1,31 @@
from telegram import Update
from telegram.ext import CallbackContext
from telegram.helpers import escape_markdown
async def start(update: Update, _: CallbackContext) -> None:
user = update.effective_user
await update.message.reply_markdown_v2(f'你好 {user.mention_markdown_v2()} {escape_markdown("!我是派蒙 ")}')
async def help_command(update: Update, _: CallbackContext) -> None:
await update.message.reply_text('前面的区域,以后再来探索吧!')
async def new_chat_members(update: Update, context: CallbackContext) -> None:
for new_chat_members_data in update.message.new_chat_members:
if new_chat_members_data.id == context.bot.id: # 判断是否是机器人第一次入群
await update.message.reply_text('感谢邀请小派蒙到本群!'
'请使用 /help 查看咱已经学会的功能。')
async def unknown_command(update: Update, _: CallbackContext) -> None:
await update.message.reply_text('前面的区域,以后再来探索吧!')
async def emergency_food(update: Update, _: CallbackContext) -> None:
await update.message.reply_text('派蒙才不是应急食品!')
async def ping(update: Update, _: CallbackContext) -> None:
await update.message.reply_text("online! ヾ(✿゚▽゚)")

86
plugins/weapon.py Normal file
View File

@ -0,0 +1,86 @@
import os
import time
from uuid import uuid4
import re
import aiofiles
from jinja2 import Environment, PackageLoader
from playwright.async_api import async_playwright, ViewportSize
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.ext import CallbackContext
from logger import Log
from model.helpers import url_to_file
from plugins.base import BasePlugins
from service import BaseService
from metadata.metadata import metadat
class Weapon(BasePlugins):
def __init__(self, service: BaseService):
super().__init__(service)
async def command_start(self, update: Update, context: CallbackContext) -> None:
message = update.message
user = update.effective_user
args = message.text.split(" ")
search_command = re.search(r'^武器查询(.*)', message.text)
keyboard = [
[
InlineKeyboardButton(text="查看武器列表并查询", switch_inline_query_current_chat="查看武器列表并查询")
]
]
if search_command:
weapon_name = search_command[1]
if weapon_name == "":
await message.reply_text("请回复你要查询的武器", reply_markup=InlineKeyboardMarkup(keyboard))
return
elif len(args) >= 2:
weapon_name = args[1]
else:
await message.reply_text("请回复你要查询的武器", reply_markup=InlineKeyboardMarkup(keyboard))
return
weapon_data = None
for weapon in metadat.weapons:
if weapon["Name"] == weapon_name:
weapon_data = weapon
if weapon_data is None:
await message.reply_text(f"没有找到 {weapon_name}",
reply_markup=InlineKeyboardMarkup(keyboard))
return
Log.info(f"用户 {user.full_name}[{user.id}] 查询武器命令请求 || 参数 {weapon_name}")
await message.reply_chat_action(ChatAction.FIND_LOCATION)
async def input_template_data(_weapon_data):
_template_data = {
"weapon_name": _weapon_data["Name"],
"weapon_info_type_img": await url_to_file(_weapon_data["Type"]),
"progression_secondary_stat_value": _weapon_data["SubStatValue"],
"progression_secondary_stat_name": _weapon_data["SubStat"],
"weapon_info_source_img": await url_to_file(_weapon_data["Source"]),
"progression_base_atk": _weapon_data["ATK"],
"weapon_info_source_list": [],
"special_ability_name": _weapon_data["Passive"],
"special_ability_info": _weapon_data["PassiveDescription"],
}
_template_data["weapon_info_source_list"].append(
await url_to_file(_weapon_data["Ascension"]["Source"])
)
_template_data["weapon_info_source_list"].append(
await url_to_file(_weapon_data["Elite"]["Source"])
)
_template_data["weapon_info_source_list"].append(
await url_to_file(_weapon_data["Monster"]["Source"])
)
return _template_data
template_data = await input_template_data(weapon_data)
png_data = await self.service.template.render('genshin/weapon', "weapon.html", template_data,
{"width": 540, "height": 540})
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await message.reply_photo(png_data, filename=f"{template_data['weapon_name']}.png",
allow_sending_without_reply=True)

12
requirements.txt Normal file
View File

@ -0,0 +1,12 @@
redis>=4.2.0
ujson>=5.1.0
genshin>=0.4.0
aiomysql>=0.0.22
colorlog>=6.6.0
numpy>=1.22.3
httpx>=0.22.0
asyncio>=3.4.3
Jinja2>=3.1.1
aiofiles>=0.8.0
playwright>=1.20.1
PyMySQL>=0.9.3

7
resources/README.md Normal file
View File

@ -0,0 +1,7 @@
# resource 目录说明
## styles
[tailwindcss](https://tailwindcss.com/)
[fontawesome](https://fontawesome.dashgame.com/)

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

61
resources/bot/help.html Normal file
View File

@ -0,0 +1,61 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<link href="../styles/tailwind.min.css" rel="stylesheet">
<link href="../styles/font-awesome.min.css" rel="stylesheet">
<style>
body {
color: #1e1f20;
}
#container {
background-color: #ececec;
max-width: 768px;
}
.command-title {
border-left: 5px solid #51aded;
}
.command {
background-color: burlywood;
}
</style>
</head>
<body>
<div class="container mx-auto px-20 py-10" id="container">
<div class="info p-6 flex flex-wrap">
<div class="info-name text-4xl ">
<h1>PaimoeBot帮助文档</h1>
</div>
<div class="info-name text-1xl pl-10">
<p><i class="fa fa-address-card-o"></i>需要绑定Cookie</p>
<p><i class="fa fa-at"></i>可回复对应群友查询</p>
</div>
</div>
<div class="base-command pt-4">
<div class="command-type">
<div class="command-title text-2xl pl-2">
基础命令
</div>
</div>
<div class="command-list py-8 flex flex-wrap">
<div class="command ml-8 p-2 rounded-xl">
<div class="name-title text-xl p-1">
/uid
<i class="fa fa-address-card-o pl-2"></i>
<i class="fa fa-at"></i>
</div>
<div class="name text-base p-1">查询玩家信息</div>
</div>
<div class="command ml-8 p-2 rounded-xl">
<div class="name-title text-xl p-1">/cookie</div>
<div class="name text-base p-1">绑定玩家信息</div>
</div>
</div>
</div>
</div>
</body>
</html>

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Some files were not shown because too many files have changed in this diff Show More