3
0

v0.3.1 支持 mihoyo 国际

This commit is contained in:
xtaodada 2022-01-28 17:58:47 +08:00
parent f58313ded0
commit bc8118dd20
No known key found for this signature in database
GPG Key ID: EE4DC37B55E24736
19 changed files with 3560 additions and 5 deletions

2
.gitignore vendored
View File

@ -144,3 +144,5 @@ assets/data/list.json
assets/voice/voice.json
ID_DATA.db
ID_DATA_bak.db
ID_DATA_OR.db
ID_DATA_OR_bak.db

Binary file not shown.

After

Width:  |  Height:  |  Size: 526 KiB

329
defs/db2.py Normal file
View File

@ -0,0 +1,329 @@
import sqlite3
import re
import traceback
from shutil import copyfile
import genshinstats as gs
async def cookiesDB(uid, Cookies, qid):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS NewCookiesTable
(UID INT PRIMARY KEY NOT NULL,
Cookies TEXT,
QID INT,
StatusA TEXT,
StatusB TEXT,
StatusC TEXT,
NUM INT,
Extra TEXT);''')
c.execute('''CREATE TABLE IF NOT EXISTS CookiesCache
(UID TEXT PRIMARY KEY,
MYSID TEXT,
Cookies TEXT);''')
cursor = c.execute("SELECT * from NewCookiesTable WHERE UID = ?", (uid,))
c_data = cursor.fetchall()
if len(c_data) == 0:
c.execute("INSERT OR IGNORE INTO NewCookiesTable (Cookies,UID,StatusA,StatusB,StatusC,NUM,QID) \
VALUES (?, ?,?,?,?,?,?)", (Cookies, uid, "off", "off", "off", 140, qid))
else:
c.execute("UPDATE NewCookiesTable SET Cookies = ? WHERE UID=?", (Cookies, uid))
conn.commit()
conn.close()
async def deal_ck(mes, qid):
aid = re.search(r"ltuid=(\d*)", mes)
mysid_data = aid.group(0).split('=')
mysid = mysid_data[1]
cookie = ';'.join(filter(lambda x: x.split('=')[0] in [
"ltuid", "ltoken"], [i.strip() for i in mes.split(';')]))
data = await GetMysInfo(mysid, cookie)
if data:
uid = data[0]['uid']
else:
# 未绑定游戏账号
uid = 888888888
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
test = c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name = 'CookiesCache'")
if test == 0:
pass
else:
try:
c.execute("DELETE from CookiesCache where uid=? or mysid = ?", (uid, mysid))
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
await cookiesDB(uid, cookie, qid)
async def GetMysInfo(mysid, ck):
try:
gs.set_cookie(ck)
data = gs.get_uid_from_hoyolab_uid(mysid)
if data:
return gs.get_game_accounts()
else:
return None
except gs.errors.NotLoggedIn:
raise gs.errors.NotLoggedIn
except Exception as e:
traceback.print_exc()
print("米游社信息读取Api失败")
async def selectDB(userid, mode="auto"):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
cursor = c.execute("SELECT * FROM UIDDATA WHERE USERID = ?", (userid,))
for row in cursor:
if mode == "auto":
if row[0]:
if row[2]:
return [row[2], 3]
elif row[1]:
return [row[1], 2]
else:
return None
else:
return None
elif mode == "uid":
return [row[1], 2]
elif mode == "mys":
return [row[2], 3]
async def OpenPush(uid, qid, status, mode):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
cursor = c.execute("SELECT * from NewCookiesTable WHERE UID = ?", (uid,))
c_data = cursor.fetchall()
if len(c_data) != 0:
try:
c.execute("UPDATE NewCookiesTable SET {s} = ?,QID = ? WHERE UID=?".format(s=mode), (status, qid, uid))
conn.commit()
conn.close()
return "成功!"
except:
return "未找到Ck绑定记录。"
else:
return "未找到Ck绑定记录。"
async def OwnerCookies(uid):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
try:
cursor = c.execute("SELECT * FROM NewCookiesTable WHERE UID = ?", (uid,))
c_data = cursor.fetchall()
cookies = c_data[0][1]
except:
return
return cookies
async def MysSign(Uid):
try:
gs.set_cookie(await OwnerCookies(Uid))
return gs.claim_daily_reward(Uid, lang='zh-cn')
except:
print("签到失败,请重试")
async def CheckDB():
str = ''
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
cursor = c.execute("SELECT UID,Cookies from NewCookiesTable")
c_data = cursor.fetchall()
for row in c_data:
try:
aid = re.search(r"ltuid=(\d*)", row[1])
mysid_data = aid.group(0).split('=')
mysid = mysid_data[1]
mys_data = await GetMysInfo(mysid, row[1])
str = str + f"uid{row[0]}/mysid{mysid}的Cookies是正常的\n"
except:
str = str + f"uid{row[0]}的Cookies是异常的已删除该条Cookies\n"
c.execute("DELETE from NewCookiesTable where UID=?", (row[0],))
test = c.execute("SELECT count(*) FROM sqlite_master WHERE type='table' AND name = 'CookiesCache'")
if test == 0:
pass
else:
c.execute("DELETE from CookiesCache where Cookies=?", (row[1],))
conn.commit()
conn.close()
return str
async def GetDaily(Uid):
try:
gs.set_cookie(await OwnerCookies(Uid))
return gs.get_notes(Uid, lang='zh-cn')
except Exception as e:
traceback.print_exc()
print("访问每日信息失败,请重试!")
async def connectDB(userid, uid=None, mys=None):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS UIDDATA
(USERID INT PRIMARY KEY NOT NULL,
UID TEXT,
MYSID TEXT);''')
c.execute("INSERT OR IGNORE INTO UIDDATA (USERID,UID,MYSID) \
VALUES (?, ?,?)", (userid, uid, mys))
if uid:
c.execute("UPDATE UIDDATA SET UID = ? WHERE USERID=?", (uid, userid))
if mys:
c.execute("UPDATE UIDDATA SET MYSID = ? WHERE USERID=?", (mys, userid))
conn.commit()
conn.close()
def deletecache():
try:
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
c.execute("DROP TABLE CookiesCache")
c.execute("UPDATE NewCookiesTable SET Extra = ? WHERE Extra=?", (None, "limit30"))
copyfile("ID_DATA_OR.db", "ID_DATA_OR_bak.db")
c.execute('''CREATE TABLE IF NOT EXISTS CookiesCache
(UID TEXT PRIMARY KEY,
MYSID TEXT,
Cookies TEXT);''')
conn.commit()
conn.close()
except:
print("\nerror\n")
try:
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
c.execute("UPDATE UseridDict SET lots=NULL")
conn.commit()
conn.close()
except:
print("\nerror\n")
def functionRegex(value, patter):
c_pattern = re.compile(r"ltuid={}".format(patter))
return c_pattern.search(value) is not None
def cacheDB(uid, mode=1, mys=None):
use = ''
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS CookiesCache
(UID TEXT PRIMARY KEY,
MYSID TEXT,
Cookies TEXT);''')
if mode == 2:
cursor = c.execute("SELECT * FROM CookiesCache WHERE MYSID = ?", (uid,))
c_data = cursor.fetchall()
else:
if mys:
cursor = c.execute("SELECT * FROM CookiesCache WHERE MYSID = ?", (mys,))
c_data = cursor.fetchall()
else:
cursor = c.execute("SELECT * FROM CookiesCache WHERE UID = ?", (uid,))
c_data = cursor.fetchall()
if len(c_data) == 0:
if mode == 2:
conn.create_function("REGEXP", 2, functionRegex)
cursor = c.execute("SELECT * FROM NewCookiesTable WHERE REGEXP(Cookies, ?)", (uid,))
d_data = cursor.fetchall()
else:
cursor = c.execute("SELECT * FROM NewCookiesTable WHERE UID = ?", (uid,))
d_data = cursor.fetchall()
if len(d_data) != 0:
if d_data[0][7] != "error":
use = d_data[0][1]
if mode == 1:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,UID) \
VALUES (?, ?)", (use, uid))
elif mode == 2:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,MYSID) \
VALUES (?, ?)", (use, uid))
else:
cookiesrow = c.execute("SELECT * FROM NewCookiesTable WHERE Extra IS NULL ORDER BY RANDOM() LIMIT 1")
e_data = cookiesrow.fetchall()
if len(e_data) != 0:
if mode == 1:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,UID) \
VALUES (?, ?)", (e_data[0][1], uid))
elif mode == 2:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,MYSID) \
VALUES (?, ?)", (e_data[0][1], uid))
use = e_data[0][1]
else:
return "没有可以使用的Cookies"
else:
cookiesrow = c.execute("SELECT * FROM NewCookiesTable WHERE Extra IS NULL ORDER BY RANDOM() LIMIT 1")
e_data = cookiesrow.fetchall()
if len(e_data) != 0:
if mode == 1:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,UID) \
VALUES (?, ?)", (e_data[0][1], uid))
elif mode == 2:
c.execute("INSERT OR IGNORE INTO CookiesCache (Cookies,MYSID) \
VALUES (?, ?)", (e_data[0][1], uid))
use = e_data[0][1]
else:
return "没有可以使用的Cookies"
else:
use = c_data[0][2]
if mys:
try:
c.execute("UPDATE CookiesCache SET UID = ? WHERE MYSID=?", (uid, mys))
except:
c.execute("UPDATE CookiesCache SET MYSID = ? WHERE UID=?", (mys, uid))
conn.commit()
conn.close()
return use
async def GetSpiralAbyssInfo(Uid, ck, Schedule_type="1"):
try:
gs.set_cookie(ck),
return gs.get_spiral_abyss(Uid)
except Exception as e:
traceback.print_exc()
print("深渊信息读取Api失败")
def errorDB(ck, err):
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
if err == "error":
c.execute("UPDATE NewCookiesTable SET Extra = ? WHERE Cookies=?", ("error", ck))
elif err == "limit30":
c.execute("UPDATE NewCookiesTable SET Extra = ? WHERE Cookies=?", ("limit30", ck))
async def GetInfo(Uid, ck):
try:
gs.set_cookie(ck)
return gs.get_user_stats(Uid, lang='zh-cn')
except Exception as e:
traceback.print_exc()
print("米游社基础信息读取Api失败")

816
defs/mihoyo.py Normal file
View File

@ -0,0 +1,816 @@
import math
import os
import random
import sqlite3
import time
import traceback
import urllib
import numpy as np
from typing import List
from wordcloud import WordCloud
from PIL import Image, ImageDraw, ImageFilter
from pyrogram.types import Message
from defs.db2 import MysSign, GetDaily, cacheDB, GetMysInfo, errorDB, GetInfo, GetSpiralAbyssInfo
from defs.event import ys_font
WEAPON_PATH = os.path.join("assets", 'weapon')
BG_PATH = os.path.join("assets", "bg")
CHAR_DONE_PATH = os.path.join("assets", 'char_done')
BG2_PATH = os.path.join("assets", "bg2")
CHAR_PATH = os.path.join("assets", "characters")
CHAR_IMG_PATH = os.path.join("assets", 'char_img')
REL_PATH = os.path.join("assets", "reliquaries")
avatar_json = {
"Albedo": "阿贝多",
"Ambor": "安柏",
"Barbara": "芭芭拉",
"Beidou": "北斗",
"Bennett": "班尼特",
"Chongyun": "重云",
"Diluc": "迪卢克",
"Diona": "迪奥娜",
"Eula": "优菈",
"Fischl": "菲谢尔",
"Ganyu": "甘雨",
"Hutao": "胡桃",
"Jean": "",
"Kazuha": "枫原万叶",
"Kaeya": "凯亚",
"Ayaka": "神里绫华",
"Keqing": "刻晴",
"Klee": "可莉",
"Lisa": "丽莎",
"Mona": "莫娜",
"Ningguang": "凝光",
"Noel": "诺艾尔",
"Qiqi": "七七",
"Razor": "雷泽",
"Rosaria": "罗莎莉亚",
"Sucrose": "砂糖",
"Tartaglia": "达达利亚",
"Venti": "温迪",
"Xiangling": "香菱",
"Xiao": "",
"Xingqiu": "行秋",
"Xinyan": "辛焱",
"Yanfei": "烟绯",
"Zhongli": "钟离",
"Yoimiya": "宵宫",
"Sayu": "早柚",
"Shogun": "雷电将军",
"Aloy": "埃洛伊",
"Sara": "九条裟罗",
"Kokomi": "珊瑚宫心海",
"Shenhe": "申鹤"
}
daily_im = '''
*数据刷新可能存在一定延迟请以当前游戏实际数据为准{}
==============
原粹树脂{}/{}{}
每日委托{}/{} 奖励{}领取
周本减半{}/{}
洞天宝钱{}
探索派遣
总数/完成/上限{}/{}/{}
{}'''
# 签到函数
async def sign(uid):
try:
sign_data = await MysSign(uid)
if sign_data is not None:
mes_im = "签到成功"
get_im = f"本次签到获得{sign_data['name']}x{sign_data['cnt']}"
im = mes_im + "!" + "\n" + get_im
else:
im = "签到失败请检查Cookies是否失效。"
except:
im = "签到失败请检查Cookies是否失效。"
return im
# 统计状态函数
async def daily(mode="push", uid=None):
def seconds2hours(seconds: int) -> str:
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return "%02d:%02d:%02d" % (h, m, s)
temp_list = []
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
if mode == "ask":
c_data = ([uid, 0, 0, 0, 0, 0, 0],)
else:
cursor = c.execute(
"SELECT * FROM NewCookiesTable WHERE StatusA != ?", ("off",))
c_data = cursor.fetchall()
for row in c_data:
try:
dailydata = await GetDaily(str(row[0]))
except Exception as e:
if str(e).find("用户信息不匹配"):
temp_list.append(
{"qid": row[2], "gid": row[3], "message": "你的推送状态有误可能是uid绑定错误或没有在米游社打开“实时便筏”功能。"})
else:
traceback.print_exc()
else:
current_resin = dailydata['resin']
current_expedition_num = len(dailydata['expeditions'])
max_expedition_num = dailydata['max_expeditions']
finished_expedition_num = 0
expedition_info: List[str] = []
for expedition in dailydata['expeditions']:
avatar: str = expedition['icon'][89:-4]
try:
avatar_name: str = avatar_json[avatar]
except KeyError:
avatar_name: str = avatar
if expedition['status'] == 'Finished':
expedition_info.append(f"{avatar_name} 探索完成")
finished_expedition_num += 1
else:
remained_timed: str = seconds2hours(
expedition['remaining_time'])
expedition_info.append(
f"{avatar_name} 剩余时间{remained_timed}")
if current_resin >= row[6] or dailydata["max_realm_currency"] - dailydata[
"realm_currency"] <= 100 or finished_expedition_num > 0:
tip = ''
if current_resin >= row[6] != 0:
tip += "\n==============\n你的树脂快满了!"
if dailydata["max_realm_currency"] - dailydata["realm_currency"] <= 100:
tip += "\n==============\n你的洞天宝钱快满了!"
if finished_expedition_num > 0:
tip += "\n==============\n你有探索派遣完成了!"
max_resin = dailydata['max_resin']
rec_time = ''
# print(dailydata)
if current_resin < 160:
resin_recovery_time = seconds2hours(
dailydata['until_resin_limit'])
next_resin_rec_time = seconds2hours(
8 * 60 - ((dailydata['max_resin'] - dailydata['resin']) * 8 * 60 - int(
dailydata['until_resin_limit'])))
rec_time = f' ({next_resin_rec_time}/{resin_recovery_time})'
finished_task_num = dailydata['completed_commissions']
total_task_num = dailydata['total_commissions']
is_extra_got = '' if dailydata['claimed_commission_reward'] else ''
resin_discount_num_limit = dailydata['max_boss_discounts']
used_resin_discount_num = resin_discount_num_limit - \
dailydata['remaining_boss_discounts']
coin = f'{dailydata["realm_currency"]}/{dailydata["max_realm_currency"]}'
if dailydata["realm_currency"] < dailydata["max_realm_currency"]:
coin_rec_time = seconds2hours(int(dailydata["until_realm_currency_limit"]))
coin_add_speed = math.ceil((dailydata["max_realm_currency"] - dailydata["realm_currency"]) / (
int(dailydata["until_realm_currency_limit"]) / 60 / 60))
coin += f'{coin_rec_time}{coin_add_speed}/h'
expedition_data = "\n".join(expedition_info)
send_mes = daily_im.format(tip, current_resin, max_resin, rec_time,
finished_task_num, total_task_num,
is_extra_got, used_resin_discount_num,
resin_discount_num_limit, coin,
current_expedition_num, finished_expedition_num, max_expedition_num,
expedition_data)
temp_list.append(
{"qid": row[2], "gid": row[3], "message": send_mes})
return temp_list
def create_rounded_rectangle_mask(rectangle, radius):
solid_fill = (50, 50, 50, 255)
i = Image.new("RGBA", rectangle.size, (0, 0, 0, 0))
corner = Image.new('RGBA', (radius, radius), (0, 0, 0, 0))
draw = ImageDraw.Draw(corner)
draw.pieslice((0, 0, radius * 2, radius * 2), 180, 270, fill=solid_fill)
mx, my = rectangle.size
i.paste(corner, (0, 0), corner)
i.paste(corner.rotate(90), (0, my - radius), corner.rotate(90))
i.paste(corner.rotate(180), (mx - radius, my - radius), corner.rotate(180))
i.paste(corner.rotate(270), (mx - radius, 0), corner.rotate(270))
draw = ImageDraw.Draw(i)
draw.rectangle([(radius, 0), (mx - radius, my)], fill=solid_fill)
draw.rectangle([(0, radius), (mx, my - radius)], fill=solid_fill)
return i
def get_weapon_pic(url):
urllib.request.urlretrieve(url, os.path.join(WEAPON_PATH, url.split('/')[-1]))
def get_char_pic(id, url):
urllib.request.urlretrieve(f'{url}', os.path.join(CHAR_PATH, f'{id}.png'))
def get_charimg_pic(url):
urllib.request.urlretrieve(url, os.path.join(CHAR_IMG_PATH, url.split('/')[-1]))
def get_rel_pic(url):
urllib.request.urlretrieve(url, os.path.join(REL_PATH, url.split('/')[-1]))
async def draw_pic(uid, message: Message, nickname="1", mode=2, role_level=None):
# 获取Cookies如果没有能使用的则return
while 1:
use_cookies = cacheDB(uid, mode - 1)
if use_cookies == '':
return "绑定记录不存在。"
elif use_cookies == "没有可以使用的Cookies":
return "没有可以使用的Cookies"
if mode == 3:
mys_data = await GetMysInfo(uid, use_cookies)
uid = mys_data[0]['uid']
nickname = mys_data[0]['nickname']
role_level = mys_data[0]['level']
try:
raw_data = await GetInfo(uid, use_cookies)
except Exception as e:
if str(e).find("Cannnot get data for more than 30 accounts per day.") != -1:
# return ("当前cookies已达到30人上限")
errorDB(use_cookies, "limit30")
elif str(e).find("Login cookies have not been provided") != -1:
# return ("Cookie错误/过期请重置Cookie")
errorDB(use_cookies, "error")
else:
traceback.print_exc()
return (
"Api报错\r\n"
+ "\r\n出现这种情况可能是UID输入错误 or 不存在"
)
else:
break
# 获取背景图片
bg2_path = os.path.join(BG_PATH, random.choice([x for x in os.listdir(BG_PATH)
if os.path.isfile(os.path.join(BG_PATH, x))]))
if message.media:
image_data = await message.download()
edit_bg = Image.open(image_data)
else:
edit_bg = Image.open(bg2_path)
# 获取背景主色
q = edit_bg.quantize(colors=3, method=2)
bg_num_temp = 0
for i in range(0, 3):
bg = tuple(q.getpalette()[i * 3:(i * 3) + 3])
bg_num = bg[0] + bg[1] + bg[2]
if bg_num >= bg_num_temp:
bg_num_temp = bg_num
bg_color = (bg[0], bg[1], bg[2])
# 通过背景主色bg_color确定文字主色
r = 140
if max(*bg_color) > 255 - r:
r *= -1
new_color = (math.floor(bg_color[0] + r if bg_color[0] + r <= 255 else 255),
math.floor(bg_color[1] + r if bg_color[1] + r <= 255 else 255),
math.floor(bg_color[2] + r if bg_color[2] + r <= 255 else 255))
# 确定texture2D路径
panle1_path = os.path.join(BG2_PATH, "panle_1.png")
panle3_path = os.path.join(BG2_PATH, "panle_3.png")
avatar_bg_path = os.path.join(BG2_PATH, "avatar_bg.png")
avatar_fg_path = os.path.join(BG2_PATH, "avatar_fg.png")
all_mask_path = os.path.join(BG2_PATH, "All_Mask.png")
# 记录数据
char_datas = raw_data["characters"]
# 确定角色占用行数
char_num = len(char_datas)
char_hang = 1 + (char_num - 1) // 6 if char_num > 8 else char_num
# 确定整体图片的长宽
based_w = 900
based_h = 890 + char_hang * 130 if char_num > 8 else 890 + char_hang * 110
based_scale = '%.3f' % (based_w / based_h)
# 通过确定的长宽比,缩放背景图片
w, h = edit_bg.size
scale_f = '%.3f' % (w / h)
new_w = math.ceil(based_h * float(scale_f))
new_h = math.ceil(based_w / float(scale_f))
if scale_f > based_scale:
bg_img2 = edit_bg.resize((new_w, based_h), Image.ANTIALIAS)
else:
bg_img2 = edit_bg.resize((based_w, new_h), Image.ANTIALIAS)
bg_img = bg_img2.crop((0, 0, 900, based_h))
# 转换遮罩的颜色、大小匹配并paste上去
all_mask = Image.open(all_mask_path).resize(bg_img.size, Image.ANTIALIAS)
all_mask_img = Image.new("RGBA", (based_w, based_h), bg_color)
bg_img.paste(all_mask_img, (0, 0), all_mask)
# 操作图片
panle1 = Image.open(panle1_path)
panle3 = Image.open(panle3_path)
avatar_bg = Image.open(avatar_bg_path)
avatar_fg = Image.open(avatar_fg_path)
# 确定主体框架
avatar_bg_color = Image.new("RGBA", (316, 100), bg_color)
panle1_color = Image.new("RGBA", (900, 800), new_color)
bg_img.paste(panle1_color, (0, 0), panle1)
bg_img.paste(panle3, (0, char_hang * 130 + 800) if char_num > 8 else (0, char_hang * 110 + 800), panle3)
bg_img.paste(avatar_bg_color, (113, 98), avatar_bg)
bg_img.paste(avatar_fg, (114, 95), avatar_fg)
# 绘制基础信息文字
text_draw = ImageDraw.Draw(bg_img)
if role_level:
text_draw.text((140, 200), "冒险等级:" + f"{role_level}", new_color, ys_font(20))
text_draw.text((220, 123), f"{nickname}", new_color, ys_font(32))
text_draw.text((235, 163), 'UID ' + f"{uid}", new_color, ys_font(14))
# 活跃天数/成就数量/深渊信息
text_draw.text((640, 94.8), str(raw_data['stats']['active_days']), new_color, ys_font(26))
text_draw.text((640, 139.3), str(raw_data['stats']['achievements']), new_color, ys_font(26))
text_draw.text((640, 183.9), raw_data['stats']['spiral_abyss'], new_color, ys_font(26))
# 宝箱
text_draw.text((258, 375.4), str("未知"), new_color, ys_font(24))
text_draw.text((258, 425.4), str(raw_data['stats']['common_chests']), new_color, ys_font(24))
text_draw.text((258, 475.4), str(raw_data['stats']['exquisite_chests']), new_color, ys_font(24))
text_draw.text((258, 525.4), str(raw_data['stats']['precious_chests']), new_color, ys_font(24))
text_draw.text((258, 575.4), str(raw_data['stats']['luxurious_chests']), new_color, ys_font(24))
# 已获角色
text_draw.text((740, 547), str(raw_data['stats']['characters']), new_color, ys_font(24))
# 开启锚点和秘境数量
text_draw.text((258, 625.4), str(raw_data['stats']['unlocked_waypoints']), new_color, ys_font(24))
text_draw.text((258, 675.4), str(raw_data['stats']['unlocked_domains']), new_color, ys_font(24))
# 蒙德
text_draw.text((490, 370), str(raw_data['explorations'][4]['explored']) + '%', new_color,
ys_font(22))
text_draw.text((490, 400), 'lv.' + str(raw_data['explorations'][4]['level']), new_color, ys_font(22))
text_draw.text((513, 430), str(raw_data['stats']['anemoculi']), new_color, ys_font(22))
# 璃月
text_draw.text((490, 490), str(raw_data['explorations'][3]['explored']) + '%', new_color,
ys_font(22))
text_draw.text((490, 520), 'lv.' + str(raw_data['explorations'][3]['level']), new_color, ys_font(22))
text_draw.text((513, 550), str(raw_data['stats']['geoculi']), new_color, ys_font(22))
# 雪山
text_draw.text((745, 373.5), str(raw_data['explorations'][2]['explored']) + '%', new_color,
ys_font(22))
text_draw.text((745, 407.1), 'lv.' + str(raw_data['explorations'][2]['level']), new_color, ys_font(22))
# 稻妻
text_draw.text((490, 608), str(raw_data['explorations'][1]['explored']) + '%', new_color,
ys_font(22))
text_draw.text((490, 635), 'lv.' + str(raw_data['explorations'][1]['level']), new_color, ys_font(22))
text_draw.text((490, 662), 'lv.' + str(raw_data['explorations'][1]['offerings'][0]['level']), new_color,
ys_font(22))
text_draw.text((513, 689), str(raw_data['stats']['electroculi']), new_color, ys_font(22))
# 渊下宫
text_draw.text((745, 480), str(raw_data['explorations'][0]['explored']) + '%', new_color,
ys_font(22))
# 家园
if raw_data['teapot']:
text_draw.text((693, 582.4), 'lv.' + str(raw_data['teapot']['level']), new_color, ys_font(22))
text_draw.text((693, 620.4), str(raw_data['teapot']['visitors']), new_color, ys_font(22))
text_draw.text((693, 658.4), str(raw_data['teapot']['items']), new_color, ys_font(22))
text_draw.text((693, 696.4), str(raw_data['teapot']['comfort']), new_color, ys_font(22))
else:
text_draw.text((693, 582.4), "未开", new_color, ys_font(22))
text_draw.text((693, 620.4), "未开", new_color, ys_font(22))
text_draw.text((693, 658.4), "未开", new_color, ys_font(22))
text_draw.text((693, 696.4), "未开", new_color, ys_font(22))
# 确定texture2D路径
charpic_mask_path = os.path.join(BG2_PATH, "charpic_mask.png")
weaponpic_mask_path = os.path.join(BG2_PATH, "weaponpic_mask.png")
def getText(star, step):
return os.path.join(BG2_PATH, "{}s_{}.png".format(str(star), str(step)))
charpic_mask = Image.open(charpic_mask_path)
weaponpic_mask = Image.open(weaponpic_mask_path)
s5s1 = Image.open(getText(5, 1))
s5s2 = Image.open(getText(5, 2))
s5s3 = Image.open(getText(5, 3))
s5s4 = Image.open(getText(5, 4))
s4s1 = Image.open(getText(4, 1))
s4s2 = Image.open(getText(4, 2))
s4s3 = Image.open(getText(4, 3))
s4s4 = Image.open(getText(4, 4))
s3s3 = Image.open(getText(3, 3))
s2s3 = Image.open(getText(2, 3))
s1s3 = Image.open(getText(1, 3))
char_bg_path = os.path.join(BG2_PATH, "char_bg.png")
char_fg_path = os.path.join(BG2_PATH, "char_fg.png")
char_bg = Image.open(char_bg_path)
char_fg = Image.open(char_fg_path)
char_color = (math.floor(bg_color[0] + 10 if bg_color[0] + r <= 255 else 255),
math.floor(bg_color[1] + 10 if bg_color[1] + r <= 255 else 255),
math.floor(bg_color[2] + 10 if bg_color[2] + r <= 255 else 255))
charset_mask = Image.new("RGBA", (900, 130), char_color)
num = 0
char_datas.sort(key=lambda x: (-x['rarity'], -x['level'], -x['friendship']))
if char_num > 8:
for i in char_datas:
# char_mingzuo = 0
char_name = i["name"]
char_id = i["id"]
char_level = i["level"]
char_fetter = i['friendship']
char_rarity = i['rarity']
# char_weapon_star = i['weapon']['rarity']
# char_weapon_jinglian = i['weapon']['affix_level']
# char_weapon_icon = i['weapon']['icon']
# if not os.path.exists(os.path.join(WEAPON_PATH, str(char_weapon_icon.split('/')[-1]))):
# get_weapon_pic(char_weapon_icon)
if not os.path.exists(os.path.join(CHAR_PATH, str(i['id']) + ".png")):
get_char_pic(i['id'], i['icon'])
char = os.path.join(CHAR_PATH, str(char_id) + ".png")
# weapon = os.path.join(WEAPON_PATH, str(char_weapon_icon.split('/')[-1]))
char_img = Image.open(char)
char_img = char_img.resize((100, 100), Image.ANTIALIAS)
# weapon_img = Image.open(weapon)
# weapon_img = weapon_img.resize((47, 47), Image.ANTIALIAS)
charpic = Image.new("RGBA", (125, 140))
if char_rarity == 5:
charpic.paste(s5s1, (0, 0), s5s1)
baseda = Image.new("RGBA", (100, 100))
cc = Image.composite(char_img, baseda, charpic_mask)
charpic.paste(cc, (6, 15), cc)
charpic.paste(s5s2, (0, 0), s5s2)
# if char_weapon_star == 5:
# charpic.paste(s5s3, (0, 0), s5s3)
# elif char_weapon_star == 4:
# charpic.paste(s4s3, (0, 0), s4s3)
# elif char_weapon_star == 3:
# charpic.paste(s3s3, (0, 0), s3s3)
# elif char_weapon_star == 2:
# charpic.paste(s2s3, (0, 0), s2s3)
# elif char_weapon_star == 1:
# charpic.paste(s1s3, (0, 0), s1s3)
basedb = Image.new("RGBA", (47, 47))
# dd = Image.composite(weapon_img, basedb, weaponpic_mask)
# charpic.paste(dd, (69, 62), dd)
charpic.paste(s5s4, (0, 0), s5s4)
else:
charpic.paste(s4s1, (0, 0), s4s1)
baseda = Image.new("RGBA", (100, 100))
cc = Image.composite(char_img, baseda, charpic_mask)
charpic.paste(cc, (6, 15), cc)
charpic.paste(s4s2, (0, 0), s4s2)
# if char_weapon_star == 5:
# charpic.paste(s5s3, (0, 0), s5s3)
# elif char_weapon_star == 4:
# charpic.paste(s4s3, (0, 0), s4s3)
# elif char_weapon_star == 3:
# charpic.paste(s3s3, (0, 0), s3s3)
# elif char_weapon_star == 2:
# charpic.paste(s2s3, (0, 0), s2s3)
# elif char_weapon_star == 1:
# charpic.paste(s1s3, (0, 0), s1s3)
basedb = Image.new("RGBA", (47, 47))
# dd = Image.composite(weapon_img, basedb, weaponpic_mask)
# charpic.paste(dd, (69, 62), dd)
charpic.paste(s4s4, (0, 0), s4s4)
char_draw = ImageDraw.Draw(charpic)
char_draw.text((38, 106), f'Lv.{str(char_level)}', (21, 21, 21), ys_font(18))
# 无法读取 char_draw.text((104.5, 91.5), f'{str(char_weapon_jinglian)}', 'white', ys_font(10))
# 无法读取 char_draw.text((99, 19.5), f'{str(char_mingzuo)}', 'white', ys_font(18))
if str(i["friendship"]) == "10" or str(char_name) == "旅行者":
char_draw.text((98, 42), "", (21, 21, 21), ys_font(14))
else:
char_draw.text((100, 41), f'{str(char_fetter)}', (21, 21, 21), ys_font(16))
char_crop = (68 + 129 * (num % 6), 800 + 130 * (num // 6))
bg_img.paste(charpic, char_crop, charpic)
num = num + 1
else:
for i in char_datas:
# char_mingzuo = 0
char_name = i["name"]
char_id = i["id"]
char_level = i["level"]
char_fetter = i['friendship']
char_rarity = i['rarity']
char_img_icon = i["icon"].replace("https://upload-os-bbs.mihoyo.com/game_record/genshin/character_icon/",
"https://upload-bbs.mihoyo.com/game_record/genshin/character_image/")
char_img_icon = char_img_icon.replace(".png", "@2x.png")
# char_weapon_star = i['weapon']['rarity']
# char_weapon_jinglian = i['weapon']['affix_level']
# char_weapon_icon = i['weapon']['icon']
# if not os.path.exists(os.path.join(WEAPON_PATH, str(char_weapon_icon.split('/')[-1]))):
# get_weapon_pic(char_weapon_icon)
if not os.path.exists(os.path.join(CHAR_IMG_PATH, str(char_img_icon.split('/')[-1]))):
get_charimg_pic(char_img_icon)
if not os.path.exists(os.path.join(CHAR_PATH, str(i['id']) + ".png")):
get_char_pic(i['id'], i['icon'])
char = os.path.join(CHAR_PATH, str(char_id) + ".png")
# weapon = os.path.join(WEAPON_PATH, str(char_weapon_icon.split('/')[-1]))
char_stand_img = os.path.join(CHAR_IMG_PATH, str(char_img_icon.split('/')[-1]))
char_stand_mask = Image.open(os.path.join(BG2_PATH, "stand_mask.png"))
char_stand = Image.open(char_stand_img)
char_img = Image.open(char)
char_img = char_img.resize((100, 100), Image.ANTIALIAS)
# weapon_img = Image.open(weapon)
# weapon_img = weapon_img.resize((47, 47), Image.ANTIALIAS)
charpic = Image.new("RGBA", (900, 130))
charpic_temp = Image.new("RGBA", (900, 130))
charpic.paste(charset_mask, (0, 0), char_bg)
# weapon_bg = Image.open(getText(char_weapon_star, 3))
# charpic.paste(weapon_bg, (72, 10), weapon_bg)
charpic_temp.paste(char_img, (81, 13), charpic_mask)
charpic_temp.paste(char_stand, (335, -99), char_stand_mask)
charpic_temp.paste(char_fg, (0, 0), char_fg)
# charpic_temp.paste(weapon_img, (141, 72), weaponpic_mask)
# temp = Image.composite(weapon_img, basedb, weaponpic_mask)
charpic.paste(charpic_temp, (0, 0), charpic_temp)
# for _, k in enumerate(i["reliquaries"]):
# if not os.path.exists(os.path.join(REL_PATH, str(k["icon"].split('/')[-1]))):
# get_rel_pic(k["icon"])
# rel = os.path.join(REL_PATH, str(k["icon"].split('/')[-1]))
# rel_img = Image.open(rel).resize((43, 43), Image.ANTIALIAS)
# rel_bg = Image.open(getText(k["rarity"], 3))
#
# if k["pos_name"] == "生之花":
# charpic.paste(rel_bg, (287 + 55 * 0, -14), rel_bg)
# charpic.paste(rel_img, (360 + 55 * 0, 49), rel_img)
# elif k["pos_name"] == "死之羽":
# charpic.paste(rel_bg, (287 + 55 * 1, -14), rel_bg)
# charpic.paste(rel_img, (360 + 55 * 1, 49), rel_img)
# elif k["pos_name"] == "时之沙":
# charpic.paste(rel_bg, (287 + 55 * 2, -14), rel_bg)
# charpic.paste(rel_img, (360 + 55 * 2, 49), rel_img)
# elif k["pos_name"] == "空之杯":
# charpic.paste(rel_bg, (287 + 55 * 3, -14), rel_bg)
# charpic.paste(rel_img, (360 + 55 * 3, 49), rel_img)
# elif k["pos_name"] == "理之冠":
# charpic.paste(rel_bg, (287 + 55 * 4, -14), rel_bg)
# charpic.paste(rel_img, (360 + 55 * 4, 49), rel_img)
char_draw = ImageDraw.Draw(charpic)
char_draw.text((182, 39), i["name"], new_color, ys_font(22))
char_draw.text((272, 45), f'Lv.{str(char_level)}', new_color, ys_font(18))
# char_draw.text((104.5,91.5),f'{str(char_weapon_jinglian)}',new_color,ys_font(10))
# char_draw.text((267, 77), f'{str(char_mingzuo)}', new_color, ys_font(18))
char_draw.text((209, 77), f'{str(i["friendship"])}' if str(char_name) != "旅行者" else "10", new_color,
ys_font(18))
char_crop = (0, 800 + 110 * num)
num += 1
bg_img.paste(charpic, char_crop, charpic)
# 转换之后发送
bg_img = bg_img.convert('RGB')
bg_img.save(f"temp{os.sep}uid.jpg", format='JPEG', subsampling=0, quality=90)
try:
if message.media:
os.remove(image_data) # noqa
except:
pass
return f"temp{os.sep}uid.jpg"
async def draw_wordcloud(uid, message: Message, mode=2):
while 1:
use_cookies = cacheDB(uid, mode - 1)
if use_cookies == '':
return "绑定记录不存在。"
elif use_cookies == "没有可以使用的Cookies":
return "没有可以使用的Cookies"
if mode == 3:
mys_data = await GetMysInfo(uid, use_cookies)
uid = mys_data[0]['uid']
raw_Abyss_data = await GetSpiralAbyssInfo(uid, use_cookies)
try:
raw_data = await GetInfo(uid, use_cookies)
except Exception as e:
if str(e).find("Cannnot get data for more than 30 accounts per day.") != -1:
# return ("当前cookies已达到30人上限")
errorDB(use_cookies, "limit30")
continue
elif str(e).find("Login cookies have not been provided") != -1:
# return ("Cookie错误/过期请重置Cookie")
errorDB(use_cookies, "error")
continue
else:
traceback.print_exc()
return (
"Api报错\r\n"
+ "\r\n出现这种情况可能是UID输入错误 or 不存在"
)
break
char_datas = raw_data["characters"] # noqa
l1_size = 2
l2_size = 4
l3_size = 6
l4_size = 7
l5_size = 10
word_str = {}
g3d1 = 0
ly3c = 0
star5num = 0
star5numcon = 0
for i in char_datas:
if i["name"] in ['雷电将军', '温迪', '钟离', '枫原万叶']:
g3d1 += 1
if i["name"] in ['甘雨', '', '胡桃']:
ly3c += 1
if i['rarity'] == 5:
star5num += 1
if i['name'] != '旅行者':
star5numcon += 1
if i["level"] >= 80:
if i['name'] == "迪卢克":
word_str["落魄了家人们"] = l3_size
if i['name'] == "刻晴":
word_str["斩尽牛杂"] = l3_size
if i['name'] == "旅行者":
word_str["旅行者真爱党"] = l3_size
game_time = time.mktime(time.strptime('20200915', '%Y%m%d'))
now_time = time.time()
total_s = now_time - game_time
total_d = (((total_s) / 60) / 60) / 24
if math.floor(total_d) - 5 <= raw_data['stats']['active_days']:
word_str["开服玩家"] = l4_size
if g3d1 >= 4:
word_str["三神一帝"] = l3_size
if ly3c >= 3:
word_str["璃月3C"] = l3_size
if star5num >= 16:
word_str["五星众多"] = l3_size
if raw_data['stats']['anemoculi'] + raw_data['stats']['geoculi'] + raw_data['stats'][
'electroculi'] == 378:
word_str["全神瞳"] = l2_size
if raw_data['explorations'][4]['explored'] + \
raw_data['explorations'][3]['explored'] + \
raw_data['explorations'][2]['explored'] + \
raw_data['explorations'][1]['explored'] + \
raw_data['explorations'][0]['explored'] >= 495:
word_str["全探索"] = l4_size
if raw_data['stats']['achievements'] >= 510:
word_str["全成就"] = l5_size
elif raw_data['stats']['achievements'] >= 490:
word_str["成就达人"] = l3_size
if raw_data['stats']['spiral_abyss'] == '12-3':
word_str["深境的探究者"] = l2_size
if len(raw_data['characters']) >= 42:
word_str["全角色"] = l3_size
if raw_data['stats']['active_days'] <= 40:
word_str["刚入坑"] = l1_size
elif raw_data['stats']['active_days'] <= 100:
word_str["初心者"] = l2_size
elif raw_data['stats']['active_days'] <= 300:
word_str["老玩家"] = l2_size
if raw_data['stats']['active_days'] >= 365 and raw_data['stats'][
'common_chests'] + raw_data['stats']['exquisite_chests'] + \
raw_data['stats']['precious_chests'] + raw_data['stats']['luxurious_chests'] <= 2500:
word_str["老咸鱼"] = l3_size
if raw_data['teapot']:
if raw_data['teapot']['comfort'] >= 25000:
word_str["团雀附体"] = l2_size
if raw_Abyss_data['stats']['total_battles'] <= 12 and raw_Abyss_data['stats']['max_floor'] == '12-3':
word_str["PVP资格证"] = l4_size
if raw_Abyss_data['character_ranks']['most_damage_taken']:
try:
if raw_Abyss_data['character_ranks']['most_damage_taken'][0]["value"] >= 150000:
word_str["这一击,贯穿星辰"] = l4_size
except:
pass
bg_list = random.choice([x for x in os.listdir(BG_PATH)
if os.path.isfile(os.path.join(BG_PATH, x))])
bg2_path = os.path.join(BG_PATH, bg_list)
based_w = 900
based_h = 1000
based_scale = '%.3f' % (based_w / based_h)
is_edit = False
if message.media:
is_edit = await message.download()
if is_edit:
bg_path_edit = is_edit
else:
bg_path_edit = bg2_path
edit_bg = Image.open(bg_path_edit)
w, h = edit_bg.size
scale_f = '%.3f' % (w / h)
new_w = math.ceil(based_h * float(scale_f))
new_h = math.ceil(based_w / float(scale_f))
if scale_f > based_scale:
bg_img2 = edit_bg.resize((new_w, based_h), Image.ANTIALIAS)
else:
bg_img2 = edit_bg.resize((based_w, new_h), Image.ANTIALIAS)
bg_img = bg_img2.crop((0, 0, based_w, based_h))
x, y = 50, 153
radius = 50
cropped_img = bg_img.crop((x, y, x + 800, y + 800))
blurred_img = cropped_img.filter(ImageFilter.GaussianBlur(5), ).convert("RGBA")
bg_img.paste(blurred_img, (x, y), create_rounded_rectangle_mask(cropped_img, radius))
panle = Image.open(os.path.join(BG2_PATH, 'wordcloud_0.png'))
mask = np.array(Image.open(os.path.join(BG2_PATH, 'wordcloudmask.png')))
wc = WordCloud(
font_path=os.path.join("assets", "fonts", "ZhuZiAWan-2.ttc"),
mask=mask,
background_color="rgba(255, 255, 255, 0)",
mode="RGBA",
max_words=200,
max_font_size=80
# color_func=multi_color_func
# color_func=similar_color_func
).generate_from_frequencies(word_str, max_font_size=100)
image_produce = wc.to_image()
bg_img.paste(panle, (0, 0), panle)
bg_img.paste(image_produce, (0, 0), image_produce)
bg_img = bg_img.convert('RGB')
text_draw = ImageDraw.Draw(bg_img)
text_draw.text((450, 105), 'UID ' + f"{uid}", (40, 136, 168), ys_font(26), anchor="mm")
bg_img.save(f"temp{os.sep}cx.jpg", format='JPEG', subsampling=0, quality=90)
if is_edit:
try:
os.remove(is_edit)
except:
pass
return f"temp{os.sep}cx.jpg"

18
genshinstats/__init__.py Normal file
View File

@ -0,0 +1,18 @@
"""Wrapper for the Genshin Impact's api.
This is an unofficial wrapper for the Genshin Impact gameRecord and wish history api.
Majority of the endpoints are implemented, documented and typehinted.
All endpoints require to be logged in with either a cookie or an authkey, read the README.md for more info.
https://github.com/thesadru/genshinstats
"""
from .caching import *
from .daily import *
from .errors import *
from .genshinstats import *
from .hoyolab import *
from .map import *
from .transactions import *
from .utils import *
from .wishes import *

206
genshinstats/caching.py Normal file
View File

@ -0,0 +1,206 @@
"""Install a cache into genshinstats"""
import inspect
import os
import sys
from functools import update_wrapper
from itertools import islice
from typing import Any, Callable, Dict, List, MutableMapping, Tuple, TypeVar
import genshinstats as gs
__all__ = ["permanent_cache", "install_cache", "uninstall_cache"]
C = TypeVar("C", bound=Callable[..., Any])
def permanent_cache(*params: str) -> Callable[[C], C]:
"""Like lru_cache except permanent and only caches based on some parameters"""
cache: Dict[Any, Any] = {}
def wrapper(func):
sig = inspect.signature(func)
def inner(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# since the amount of arguments is constant we can just save the values
key = tuple(v for k, v in bound.arguments.items() if k in params)
if key in cache:
return cache[key]
r = func(*args, **kwargs)
if r is not None:
cache[key] = r
return r
inner.cache = cache
return update_wrapper(inner, func)
return wrapper # type: ignore
def cache_func(func: C, cache: MutableMapping[Tuple[Any, ...], Any]) -> C:
"""Caches a normal function"""
# prevent possible repeated cachings
if hasattr(func, "__cache__"):
return func
sig = inspect.signature(func)
def wrapper(*args, **kwargs):
# create key (func name, *arguments)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
key = tuple(v for k, v in bound.arguments.items() if k != "cookie")
key = (func.__name__,) + key
if key in cache:
return cache[key]
r = func(*args, **kwargs)
if r is not None:
cache[key] = r
return r
setattr(wrapper, "__cache__", cache)
setattr(wrapper, "__original__", func)
return update_wrapper(wrapper, func) # type: ignore
def cache_paginator(
func: C, cache: MutableMapping[Tuple[Any, ...], Any], strict: bool = False
) -> C:
"""Caches an id generator such as wish history
Respects size and authkey.
If strict mode is on then the first item of the paginator will no longer be requested every time.
"""
if hasattr(func, "__cache__"):
return func
sig = inspect.signature(func)
def wrapper(*args, **kwargs):
# create key (func name, end id, *arguments)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
arguments = bound.arguments
# remove arguments that might cause problems
size, authkey, end_id = [arguments.pop(k) for k in ("size", "authkey", "end_id")]
partial_key = tuple(arguments.values())
# special recursive case must be ignored
# otherwise an infinite recursion due to end_id resets will occur
if "banner_type" in arguments and arguments["banner_type"] is None:
return func(*args, **kwargs)
def make_key(end_id: int) -> Tuple[Any, ...]:
return (
func.__name__,
end_id,
) + partial_key
def helper(end_id: int):
while True:
# yield new items from the cache
key = make_key(end_id)
while key in cache:
yield cache[key]
end_id = cache[key]["id"]
key = make_key(end_id)
# look ahead and add new items to the cache
# since the size limit is always 20 we use that to make only a single request
new = list(func(size=20, authkey=authkey, end_id=end_id, **arguments))
if not new:
break
# the head may not want to be cached so it must be handled separately
if end_id != 0 or strict:
cache[make_key(end_id)] = new[0]
if end_id == 0:
yield new[0]
end_id = new[0]["id"]
for p, n in zip(new, new[1:]):
cache[make_key(p["id"])] = n
return islice(helper(end_id), size)
setattr(wrapper, "__cache__", cache)
setattr(wrapper, "__original__", func)
return update_wrapper(wrapper, func) # type: ignore
def install_cache(cache: MutableMapping[Tuple[Any, ...], Any], strict: bool = False) -> None:
"""Installs a cache into every cacheable function in genshinstats
If strict mode is on then the first item of the paginator will no longer be requested every time.
That can however cause a variety of problems and it's therefore recommend to use it only with TTL caches.
Please do note that hundreds of accesses may be made per call so your cache shouldn't be doing heavy computations during accesses.
"""
functions: List[Callable] = [
# genshinstats
gs.get_user_stats,
gs.get_characters,
gs.get_spiral_abyss,
# wishes
gs.get_banner_details,
gs.get_gacha_items,
# hoyolab
gs.search,
gs.get_record_card,
gs.get_recommended_users,
]
paginators: List[Callable] = [
# wishes
gs.get_wish_history,
# transactions
gs.get_artifact_log,
gs.get_crystal_log,
gs.get_primogem_log,
gs.get_resin_log,
gs.get_weapon_log,
]
invalid: List[Callable] = [
# normal generator
gs.get_claimed_rewards,
# cookie dependent
gs.get_daily_reward_info,
gs.get_game_accounts,
]
wrapped = []
for func in functions:
wrapped.append(cache_func(func, cache))
for func in paginators:
wrapped.append(cache_paginator(func, cache, strict=strict))
for func in wrapped:
# ensure we only replace actual functions from the genshinstats directory
for module in sys.modules.values():
if not hasattr(module, func.__name__):
continue
orig_func = getattr(module, func.__name__)
if (
os.path.split(orig_func.__globals__["__file__"])[0]
!= os.path.split(func.__globals__["__file__"])[0] # type: ignore
):
continue
setattr(module, func.__name__, func)
def uninstall_cache() -> None:
"""Uninstalls the cache from all functions"""
modules = sys.modules.copy()
for module in modules.values():
try:
members = inspect.getmembers(module)
except ModuleNotFoundError:
continue
for name, func in members:
if hasattr(func, "__cache__"):
setattr(module, name, getattr(func, "__original__", func))

106
genshinstats/daily.py Normal file
View File

@ -0,0 +1,106 @@
"""Automatic sign-in for hoyolab's daily rewards.
Automatically claims the next daily reward in the daily check-in rewards.
"""
from typing import Any, Dict, Iterator, List, Mapping, NamedTuple, Optional
from urllib.parse import urljoin
from .caching import permanent_cache
from .genshinstats import fetch_endpoint
from .hoyolab import get_game_accounts
from .utils import recognize_server
__all__ = [
"fetch_daily_endpoint",
"get_daily_reward_info",
"get_claimed_rewards",
"get_monthly_rewards",
"claim_daily_reward",
]
OS_URL = "https://hk4e-api-os.mihoyo.com/event/sol/" # overseas
OS_ACT_ID = "e202102251931481"
CN_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/" # chinese
CN_ACT_ID = "e202009291139501"
class DailyRewardInfo(NamedTuple):
signed_in: bool
claimed_rewards: int
def fetch_daily_endpoint(endpoint: str, chinese: bool = False, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint for daily rewards"""
url, act_id = (CN_URL, CN_ACT_ID) if chinese else (OS_URL, OS_ACT_ID)
kwargs.setdefault("params", {})["act_id"] = act_id
url = urljoin(url, endpoint)
return fetch_endpoint(url, **kwargs)
def get_daily_reward_info(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> DailyRewardInfo:
"""Fetches daily award info for the currently logged-in user.
Returns a tuple - whether the user is logged in, how many total rewards the user has claimed so far
"""
data = fetch_daily_endpoint("info", chinese, cookie=cookie)
return DailyRewardInfo(data["is_sign"], data["total_sign_day"])
@permanent_cache("chinese", "lang")
def get_monthly_rewards(
chinese: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets a list of avalible rewards for the current month"""
return fetch_daily_endpoint("home", chinese, cookie=cookie, params=dict(lang=lang))["awards"]
def get_claimed_rewards(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Iterator[Dict[str, Any]]:
"""Gets all claimed awards for the currently logged-in user"""
current_page = 1
while True:
data = fetch_daily_endpoint(
"award", chinese, cookie=cookie, params=dict(current_page=current_page)
)["list"]
yield from data
if len(data) < 10:
break
current_page += 1
def claim_daily_reward(
uid: int = None, chinese: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Optional[Dict[str, Any]]:
"""Signs into hoyolab and claims the daily rewards.
Chinese and overseas servers work a bit differently,
so you must specify whether you want to claim rewards for chinese accounts.
When claiming rewards for other users you may add a cookie argument.
Returns the claimed reward or None if the reward cannot be claimed yet.
"""
signed_in, claimed_rewards = get_daily_reward_info(chinese, cookie)
if signed_in:
return None
params = {}
if chinese:
if uid is None:
accounts = get_game_accounts(chinese=True, cookie=cookie)
params["game_uid"] = accounts[0]["game_uid"]
params["region"] = accounts[0]["region"]
else:
params["game_uid"] = uid
params["region"] = recognize_server(uid)
params["lang"] = lang
fetch_daily_endpoint("sign", chinese, cookie=cookie, method="POST", params=params)
rewards = get_monthly_rewards(chinese, lang, cookie)
return rewards[claimed_rewards]

115
genshinstats/errors.py Normal file
View File

@ -0,0 +1,115 @@
"""Genshinstats errors.
These take in only a single argument: msg.
It's possible to add retcodes and the original api response message with `.set_reponse()`.
"""
class GenshinStatsException(Exception):
"""Base Exception for all genshinstats errors."""
retcode: int = 0
orig_msg: str = ""
def __init__(self, msg: str) -> None:
self.msg = msg
def set_response(self, response: dict) -> None:
"""Adds an optional response object to the error."""
self.retcode = response["retcode"]
self.orig_msg = response["message"]
self.msg = self.msg.format(self.retcode, self.orig_msg)
@property
def msg(self) -> str:
return self.args[0]
@msg.setter
def msg(self, msg) -> None:
self.args = (msg,)
class TooManyRequests(GenshinStatsException):
"""Made too many requests and got ratelimited"""
class NotLoggedIn(GenshinStatsException):
"""Cookies have not been provided."""
class AccountNotFound(GenshinStatsException):
"""Tried to get data with an invalid uid."""
class DataNotPublic(GenshinStatsException):
"""User hasn't set their data to public."""
class CodeRedeemException(GenshinStatsException):
"""Code redemption failed."""
class SignInException(GenshinStatsException):
"""Sign-in failed"""
class AuthkeyError(GenshinStatsException):
"""Base GachaLog Exception."""
class InvalidAuthkey(AuthkeyError):
"""An authkey is invalid."""
class AuthkeyTimeout(AuthkeyError):
"""An authkey has timed out."""
class MissingAuthKey(AuthkeyError):
"""No gacha authkey was found."""
def raise_for_error(response: dict):
"""Raises a custom genshinstats error from a response."""
# every error uses a different response code and message,
# but the codes are not unique so we must check the message at some points too.
error = {
# general
10101: TooManyRequests("Cannnot get data for more than 30 accounts per cookie per day."),
-100: NotLoggedIn("Login cookies have not been provided or are incorrect."),
10001: NotLoggedIn("Login cookies have not been provided or are incorrect."),
10102: DataNotPublic("User's data is not public"),
1009: AccountNotFound("Could not find user; uid may not be valid."),
-1: GenshinStatsException("Internal database error, see original message"),
-10002: AccountNotFound(
"Cannot get rewards info. Account has no game account binded to it."
),
-108: GenshinStatsException("Language is not valid."),
10103: NotLoggedIn("Cookies are correct but do not have a hoyolab account bound to them."),
# code redemption
-2003: CodeRedeemException("Invalid redemption code"),
-2007: CodeRedeemException("You have already used a redemption code of the same kind."),
-2017: CodeRedeemException("Redemption code has been claimed already."),
-2018: CodeRedeemException("This Redemption Code is already in use"),
-2001: CodeRedeemException("Redemption code has expired."),
-2021: CodeRedeemException(
"Cannot claim codes for account with adventure rank lower than 10."
),
-1073: CodeRedeemException("Cannot claim code. Account has no game account bound to it."),
-1071: NotLoggedIn(
"Login cookies from redeem_code() have not been provided or are incorrect. "
"Make sure you use account_id and cookie_token cookies."
),
# sign in
-5003: SignInException("Already claimed daily reward today."),
2001: SignInException("Already checked into hoyolab today."),
# gacha log
-100: InvalidAuthkey("Authkey is not valid.")
if response["message"] == "authkey error"
else NotLoggedIn("Login cookies have not been provided or are incorrect."),
-101: AuthkeyTimeout(
"Authkey has timed-out. Update it by opening the history page in Genshin."
),
}.get(response["retcode"], GenshinStatsException("{} Error ({})"))
error.set_response(response)
raise error

View File

@ -0,0 +1,357 @@
"""Wrapper for the hoyolab.com gameRecord api.
Can fetch data for a user's stats like stats, characters, spiral abyss runs...
"""
import hashlib
import json
import random
import string
import time
from http.cookies import SimpleCookie
from typing import Any, Dict, List, Mapping, MutableMapping, Union
from urllib.parse import urljoin
import requests
from requests.sessions import RequestsCookieJar, Session
from .errors import NotLoggedIn, TooManyRequests, raise_for_error
from .pretty import (
prettify_abyss,
prettify_activities,
prettify_characters,
prettify_notes,
prettify_stats,
)
from .utils import USER_AGENT, is_chinese, recognize_server, retry
__all__ = [
"set_cookie",
"set_cookies",
"get_browser_cookies",
"set_cookies_auto",
"set_cookie_auto",
"fetch_endpoint",
"get_user_stats",
"get_characters",
"get_spiral_abyss",
"get_notes",
"get_activities",
"get_all_user_data",
]
session = Session()
session.headers.update(
{
# required headers
"x-rpc-app_version": "",
"x-rpc-client_type": "",
"x-rpc-language": "en-us",
# authentications headers
"ds": "",
# recommended headers
"user-agent": USER_AGENT,
}
)
cookies: List[RequestsCookieJar] = [] # a list of all avalible cookies
OS_DS_SALT = "6cqshh5dhw73bzxn20oexa9k516chk7s"
CN_DS_SALT = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
OS_TAKUMI_URL = "https://api-os-takumi.mihoyo.com/" # overseas
CN_TAKUMI_URL = "https://api-takumi.mihoyo.com/" # chinese
OS_GAME_RECORD_URL = "https://bbs-api-os.mihoyo.com/game_record/"
CN_GAME_RECORD_URL = "https://api-takumi.mihoyo.com/game_record/app/"
def set_cookie(cookie: Union[Mapping[str, Any], str] = None, **kwargs: Any) -> None:
"""Logs-in using a cookie.
Usage:
>>> set_cookie(ltuid=..., ltoken=...)
>>> set_cookie(account_id=..., cookie_token=...)
>>> set_cookie({'ltuid': ..., 'ltoken': ...})
>>> set_cookie("ltuid=...; ltoken=...")
"""
if bool(cookie) == bool(kwargs):
raise ValueError("Cannot use both positional and keyword arguments at once")
set_cookies(cookie or kwargs)
def set_cookies(*args: Union[Mapping[str, Any], str], clear: bool = True) -> None:
"""Sets multiple cookies at once to cycle between. Takes same arguments as set_cookie.
Unlike set_cookie, this function allows for multiple cookies to be used at once.
This is so far the only way to circumvent the rate limit.
If clear is set to False the previously set cookies won't be cleared.
"""
if clear:
cookies.clear()
for cookie in args:
if isinstance(cookie, Mapping):
cookie = {k: str(v) for k, v in cookie.items()} # SimpleCookie needs a string
cookie = SimpleCookie(cookie)
jar = RequestsCookieJar()
jar.update(cookie)
cookies.append(jar)
def get_browser_cookies(browser: str = None) -> Dict[str, str]:
"""Gets cookies from your browser for later storing.
If a specific browser is set, gets data from that browser only.
Avalible browsers: chrome, chromium, opera, edge, firefox
"""
try:
import browser_cookie3 # optional library
except ImportError:
raise ImportError(
"functions 'set_cookie_auto' and 'get_browser_cookie` require \"browser-cookie3\". "
'To use these function please install the dependency with "pip install browser-cookie3".'
)
load = getattr(browser_cookie3, browser.lower()) if browser else browser_cookie3.load
# For backwards compatibility we also get account_id and cookie_token
# however we can't just get every cookie because there's sensitive information
allowed_cookies = {"ltuid", "ltoken", "account_id", "cookie_token"}
return {
c.name: c.value
for domain in ("mihoyo", "hoyolab")
for c in load(domain_name=domain)
if c.name in allowed_cookies and c.value is not None
}
def set_cookie_auto(browser: str = None) -> None:
"""Like set_cookie, but gets the cookies by itself from your browser.
Requires the module browser-cookie3
Be aware that this process can take up to 10 seconds.
To speed it up you may select a browser.
If a specific browser is set, gets data from that browser only.
Avalible browsers: chrome, chromium, opera, edge, firefox
"""
set_cookies(get_browser_cookies(browser), clear=True)
set_cookies_auto = set_cookie_auto # alias
def generate_ds(salt: str) -> str:
"""Creates a new ds for authentication."""
t = int(time.time()) # current seconds
r = "".join(random.choices(string.ascii_letters, k=6)) # 6 random chars
h = hashlib.md5(f"salt={salt}&t={t}&r={r}".encode()).hexdigest() # hash and get hex
return f"{t},{r},{h}"
def generate_cn_ds(salt: str, body: Any = None, query: Mapping[str, Any] = None) -> str:
"""Creates a new chinese ds for authentication."""
t = int(time.time())
r = random.randint(100001, 200000)
b = json.dumps(body) if body else ""
q = "&".join(f"{k}={v}" for k, v in sorted(query.items())) if query else ""
h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={b}&q={q}".encode()).hexdigest()
return f"{t},{r},{h}"
# sometimes a random connection error can just occur, mihoyo being mihoyo
@retry(3, requests.ConnectionError)
def _request(*args: Any, **kwargs: Any) -> Any:
"""Fancy requests.request"""
r = session.request(*args, **kwargs)
r.raise_for_status()
kwargs["cookies"].update(session.cookies)
session.cookies.clear()
data = r.json()
if data["retcode"] == 0:
return data["data"]
raise_for_error(data)
def fetch_endpoint(
endpoint: str, chinese: bool = False, cookie: Mapping[str, Any] = None, **kwargs
) -> Dict[str, Any]:
"""Fetch an enpoint from the API.
Takes in an endpoint url which is joined with the base url.
A request is then sent and returns a parsed response.
Includes error handling and ds token renewal.
Can specifically use the chinese base url and request data for chinese users,
but that requires being logged in as that user.
Supports handling ratelimits if multiple cookies are set with `set_cookies`
"""
# parse the arguments for requests.request
kwargs.setdefault("headers", {})
method = kwargs.pop("method", "get")
if chinese:
kwargs["headers"].update(
{
"ds": generate_cn_ds(CN_DS_SALT, kwargs.get("json"), kwargs.get("params")),
"x-rpc-app_version": "2.11.1",
"x-rpc-client_type": "5",
}
)
url = urljoin(CN_TAKUMI_URL, endpoint)
else:
kwargs["headers"].update(
{
"ds": generate_ds(OS_DS_SALT),
"x-rpc-app_version": "1.5.0",
"x-rpc-client_type": "4",
}
)
url = urljoin(OS_TAKUMI_URL, endpoint)
if cookie is not None:
if not isinstance(cookie, MutableMapping) or not all(
isinstance(v, str) for v in cookie.values()
):
cookie = {k: str(v) for k, v in cookie.items()}
return _request(method, url, cookies=cookie, **kwargs)
elif len(cookies) == 0:
raise NotLoggedIn("Login cookies have not been provided")
for cookie in cookies.copy():
try:
return _request(method, url, cookies=cookie, **kwargs)
except TooManyRequests:
# move the ratelimited cookie to the end to let the ratelimit wear off
cookies.append(cookies.pop(0))
# if we're here it means we used up all our cookies so we must handle that
if len(cookies) == 1:
raise TooManyRequests("Cannnot get data for more than 30 accounts per day.")
else:
raise TooManyRequests("All cookies have hit their request limit of 30 accounts per day.")
def fetch_game_record_endpoint(
endpoint: str, chinese: bool = False, cookie: Mapping[str, Any] = None, **kwargs
):
"""A short-hand for fetching data for the game record"""
base_url = CN_GAME_RECORD_URL if chinese else OS_GAME_RECORD_URL
url = urljoin(base_url, endpoint)
return fetch_endpoint(url, chinese, cookie, **kwargs)
def get_user_stats(
uid: int, equipment: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets basic user information and stats.
If equipment is True an additional request will be made to get the character equipment
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/index",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
data = prettify_stats(data)
if equipment:
data["characters"] = get_characters(
uid, [i["id"] for i in data["characters"]], lang, cookie
)
return data
def get_characters(
uid: int, character_ids: List[int] = None, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets characters of a user.
Characters contain info about their level, constellation, weapon, and artifacts.
Talents are not included.
If character_ids are provided then only characters with those ids are returned.
"""
if character_ids is None:
character_ids = [i["id"] for i in get_user_stats(uid)["characters"]]
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/character",
chinese=is_chinese(uid),
cookie=cookie,
method="POST",
json=dict(
character_ids=character_ids, role_id=uid, server=server
), # POST uses the body instead
headers={"x-rpc-language": lang},
)["avatars"]
return prettify_characters(data)
def get_spiral_abyss(
uid: int, previous: bool = False, cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets spiral abyss runs of a user and details about them.
Every season these stats refresh and you can get the previous stats with `previous`.
"""
server = recognize_server(uid)
schedule_type = 2 if previous else 1
data = fetch_game_record_endpoint(
"genshin/api/spiralAbyss",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid, schedule_type=schedule_type),
)
return prettify_abyss(data)
def get_activities(
uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets the activities of the user
As of this time only Hyakunin Ikki is availible.
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/activities",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
return prettify_activities(data)
def get_notes(uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None) -> Dict[str, Any]:
"""Gets the real-time notes of the user
Contains current resin, expeditions, daily commissions and similar.
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/dailyNote",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
return prettify_notes(data)
def get_all_user_data(
uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Fetches all data a user can has. Very slow.
A helper function that gets all avalible data for a user and returns it as one dict.
However that makes it fairly slow so it's not recommended to use it outside caching.
"""
data = get_user_stats(uid, equipment=True, lang=lang, cookie=cookie)
data["spiral_abyss"] = [get_spiral_abyss(uid, previous, cookie) for previous in [False, True]]
return data

175
genshinstats/hoyolab.py Normal file
View File

@ -0,0 +1,175 @@
"""Wrapper for the hoyolab.com community api.
Can search users, get record cards, redeem codes...
"""
import time
from typing import Any, Dict, List, Mapping, Optional
from .caching import permanent_cache
from .genshinstats import fetch_endpoint, fetch_game_record_endpoint
from .pretty import prettify_game_accounts
from .utils import deprecated, recognize_server
__all__ = [
"get_langs",
"search",
"set_visibility",
"hoyolab_check_in",
"get_game_accounts",
"get_record_card",
"get_uid_from_hoyolab_uid",
"redeem_code",
"get_recommended_users",
"get_hot_posts",
]
@permanent_cache()
def get_langs() -> Dict[str, str]:
"""Gets codes of all languages and their names"""
data = fetch_endpoint("community/misc/wapi/langs", cookie={}, params=dict(gids=2))["langs"]
return {i["value"]: i["name"] for i in data}
def search(keyword: str, size: int = 20, chinese: bool = False) -> List[Dict[str, Any]]:
"""Searches all users.
Can return up to 20 results, based on size.
"""
url = (
"https://bbs-api.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/community/"
)
return fetch_endpoint(
url + "apihub/wapi/search",
cookie={}, # this endpoint does not require cookies
params=dict(keyword=keyword, size=size, gids=2),
)["users"]
def set_visibility(public: bool, cookie: Mapping[str, Any] = None) -> None:
"""Sets your data to public or private."""
fetch_endpoint(
"game_record/card/wapi/publishGameRecord",
cookie=cookie,
method="POST",
# what's game_id about???
json=dict(is_public=public, game_id=2),
)
@deprecated()
def hoyolab_check_in(chinese: bool = False, cookie: Mapping[str, Any] = None) -> None:
"""Checks in the currently logged-in user to hoyolab.
This function will not claim daily rewards!!!
"""
url = (
"https://bbs-api.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/community/"
)
fetch_endpoint(
url + "apihub/api/signIn", chinese=chinese, cookie=cookie, method="POST", json=dict(gids=2)
)
def get_game_accounts(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets all game accounts of the currently signed in player.
Can get accounts both for overseas and china.
"""
url = "https://api-takumi.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/"
data = fetch_endpoint(url + "binding/api/getUserGameRolesByCookie", cookie=cookie)["list"]
return prettify_game_accounts(data)
def get_record_card(
hoyolab_uid: int, chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Optional[Dict[str, Any]]:
"""Gets a game record card of a user based on their hoyolab uid.
A record card contains data regarding the stats of a user for their displayed server.
Their uid for a given server is also included.
In case the user hasn't set their data to public or you are ratelimited the function returns None.
You can get a hoyolab id with `search`.
"""
cards = fetch_game_record_endpoint(
"card/wapi/getGameRecordCard",
chinese=chinese,
cookie=cookie,
params=dict(uid=hoyolab_uid, gids=2),
)["list"]
return cards[0] if cards else None
def get_uid_from_hoyolab_uid(
hoyolab_uid: int, chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Optional[int]:
"""Gets a uid with a community uid.
This is so it's possible to search a user and then directly get the uid.
In case the uid is private, returns None.
"""
card = get_record_card(hoyolab_uid, chinese, cookie)
return int(card["game_role_id"]) if card else None
def redeem_code(code: str, uid: int = None, cookie: Mapping[str, Any] = None) -> None:
"""Redeems a gift code for the currently signed in user.
Api endpoint for https://genshin.mihoyo.com/en/gift.
!!! This function requires account_id and cookie_token cookies !!!
The code will be redeemed for every avalible account,
specifying the uid will claim it only for that account.
Returns the amount of users it managed to claim codes for.
You can claim codes only every 5s so you must sleep between claims.
The function sleeps for you when claiming for every account
but you must sleep yourself when passing in a uid or when an error is encountered.
Currently codes can only be claimed for overseas accounts, not chinese.
"""
if uid is not None:
fetch_endpoint(
"https://hk4e-api-os.mihoyo.com/common/apicdkey/api/webExchangeCdkey",
cookie=cookie,
params=dict(
uid=uid, region=recognize_server(uid), cdkey=code, game_biz="hk4e_global", lang="en"
),
)
else:
# cannot claim codes for accounts with ar lower than 10
accounts = [
account for account in get_game_accounts(cookie=cookie) if account["level"] >= 10
]
for i, account in enumerate(accounts):
if i:
time.sleep(5) # there's a ratelimit of 1 request every 5 seconds
redeem_code(code, account["uid"], cookie)
def get_recommended_users(page_size: int = None) -> List[Dict[str, Any]]:
"""Gets a list of recommended active users"""
return fetch_endpoint(
"community/user/wapi/recommendActive",
cookie={},
params=dict(page_size=page_size or 0x10000, offset=0, gids=2),
)["list"]
def get_hot_posts(forum_id: int = 1, size: int = 100, lang: str = "en-us") -> List[Dict[str, Any]]:
"""Fetches hot posts from the front page of hoyolabs
Posts are split into different forums set by ids 1-5.
There may be less posts returned than size.
"""
# the api is physically unable to return more than 2 ^ 24 bytes
# that's around 2 ^ 15 posts so we limit the amount to 2 ^ 14
# the user shouldn't be getting that many posts in the first place
return fetch_endpoint(
"community/post/api/forumHotPostFullList",
cookie={},
params=dict(forum_id=forum_id, page_size=min(size, 0x4000), lang=lang),
)["posts"]

84
genshinstats/map.py Normal file
View File

@ -0,0 +1,84 @@
"""The official genshin map
Gets data from the official genshin map such as categories, points and similar.
"""
import json
from typing import Any, Dict, List
from urllib.parse import urljoin
from .caching import permanent_cache
from .genshinstats import fetch_endpoint
OS_MAP_URL = "https://api-os-takumi-static.mihoyo.com/common/map_user/ys_obc/v1/map/"
__all__ = [
"fetch_map_endpoint",
"get_map_image",
"get_map_icons",
"get_map_labels",
"get_map_locations",
"get_map_points",
"get_map_tile",
]
def fetch_map_endpoint(endpoint: str, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's webstatic map api.
Only currently liyue is supported.
Takes in an endpoint url which is joined with the base url.
A request is then sent and returns a parsed response.
"""
kwargs.setdefault("params", {}).update({"map_id": 2, "app_sn": "ys_obc", "lang": "en-us"})
url = urljoin(OS_MAP_URL, endpoint)
return fetch_endpoint(url, cookie={}, **kwargs)
@permanent_cache()
def get_map_image() -> str:
"""Get the url to the entire map image"""
data = fetch_map_endpoint("info")["info"]["detail"]
return json.loads(data)["slices"][0][0]["url"]
@permanent_cache()
def get_map_icons() -> Dict[int, str]:
"""Get all icons for the map"""
data = fetch_map_endpoint("spot_kind/get_icon_list")["icons"]
return {i["id"]: i["url"] for i in data}
@permanent_cache()
def get_map_labels() -> List[Dict[str, Any]]:
"""Get labels and label categories"""
return fetch_map_endpoint("label/tree")["tree"]
def get_map_locations() -> List[Dict[str, Any]]:
"""Get all locations on the map"""
return fetch_map_endpoint("map_anchor/list")["list"]
def get_map_points() -> List[Dict[str, Any]]:
"""Get points on the map"""
return fetch_map_endpoint("point/list")["point_list"]
def get_map_tile(
x: int, y: int, width: int, height: int, resolution: int = 1, image: str = None
) -> str:
"""Gets a map tile at a position
You may set an x, y, width and height of the resulting image
however you shoudl prefer to use multiples of 256 because they are cached
on the mihoyo servers.
Resolution dictates the resolution of the image as a percentage. 100 is highest and 0 is lowest.
You should pick values from 100, 50, 25 and 12.5
"""
image = image or get_map_image()
return (
image
+ f"?x-oss-process=image/resize,p_{round(resolution)}/crop,x_{x},y_{y},w_{width},h_{height}"
)

462
genshinstats/pretty.py Normal file
View File

@ -0,0 +1,462 @@
"""Prettifiers for genshinstats api returns.
Fixes the huge problem of outdated field names in the api,
that were leftover from during development
"""
import re
from datetime import datetime
character_icons = {
"PlayerGirl": "Traveler",
"PlayerBoy": "Traveler",
"Ambor": "Amber",
"Qin": "Jean",
"Hutao": "Hu Tao",
"Feiyan": "Yanfei",
"Kazuha": "Kadehara Kazuha",
"Sara": "Kujou Sara",
"Shougun": "Raiden Shogun",
"Tohma": "Thoma",
}
def _recognize_character_icon(url: str) -> str:
"""Recognizes a character's icon url and returns its name."""
exp = r"game_record/genshin/character_.*_(\w+)(?:@\dx)?.png"
match = re.search(exp, url)
if match is None:
raise ValueError(f"{url!r} is not a character icon or image url")
character = match.group(1)
return character_icons.get(character) or character
def prettify_stats(data):
s = data["stats"]
h = data["homes"][0] if data["homes"] else None
return {
"stats": {
"achievements": s["achievement_number"],
"active_days": s["active_day_number"],
"characters": s["avatar_number"],
"spiral_abyss": s["spiral_abyss"],
"anemoculi": s["anemoculus_number"],
"geoculi": s["geoculus_number"],
"electroculi": s["electroculus_number"],
"common_chests": s["common_chest_number"],
"exquisite_chests": s["exquisite_chest_number"],
"precious_chests": s["precious_chest_number"],
"luxurious_chests": s["luxurious_chest_number"],
"unlocked_waypoints": s["way_point_number"],
"unlocked_domains": s["domain_number"],
},
"characters": [
{
"name": i["name"],
"rarity": i["rarity"]
if i["rarity"] < 100
else i["rarity"] - 100, # aloy has 105 stars
"element": i["element"],
"level": i["level"],
"friendship": i["fetter"],
"icon": i["image"],
"id": i["id"],
}
for i in data["avatars"]
],
"teapot": {
# only unique data between realms are names and icons
"realms": [{"name": s["name"], "icon": s["icon"]} for s in data["homes"]],
"level": h["level"],
"comfort": h["comfort_num"],
"comfort_name": h["comfort_level_name"],
"comfort_icon": h["comfort_level_icon"],
"items": h["item_num"],
"visitors": h["visit_num"], # currently not in use
}
if h
else None,
"explorations": [
{
"name": i["name"],
"explored": round(i["exploration_percentage"] / 10, 1),
"type": i["type"],
"level": i["level"],
"icon": i["icon"],
"offerings": i["offerings"],
}
for i in data["world_explorations"]
],
}
def prettify_characters(data):
return [
{
"name": i["name"],
"rarity": i["rarity"] if i["rarity"] < 100 else i["rarity"] - 100, # aloy has 105 stars
"element": i["element"],
"level": i["level"],
"friendship": i["fetter"],
"constellation": sum(c["is_actived"] for c in i["constellations"]),
"icon": i["icon"],
"image": i["image"],
"id": i["id"],
"collab": i["rarity"] >= 100,
**(
{"traveler_name": "Aether" if "Boy" in i["icon"] else "Lumine"}
if "Player" in i["icon"]
else {}
),
"weapon": {
"name": i["weapon"]["name"],
"rarity": i["weapon"]["rarity"],
"type": i["weapon"]["type_name"],
"level": i["weapon"]["level"],
"ascension": i["weapon"]["promote_level"],
"refinement": i["weapon"]["affix_level"],
"description": i["weapon"]["desc"],
"icon": i["weapon"]["icon"],
"id": i["weapon"]["id"],
},
"artifacts": [
{
"name": a["name"],
"pos_name": {
1: "flower",
2: "feather",
3: "hourglass",
4: "goblet",
5: "crown",
}[a["pos"]],
"full_pos_name": a["pos_name"],
"pos": a["pos"],
"rarity": a["rarity"],
"level": a["level"],
"set": {
"name": a["set"]["name"],
"effect_type": ["none", "single", "classic"][len(a["set"]["affixes"])],
"effects": [
{
"pieces": e["activation_number"],
"effect": e["effect"],
}
for e in a["set"]["affixes"]
],
"set_id": int(re.search(r"UI_RelicIcon_(\d+)_\d+", a["icon"]).group(1)), # type: ignore
"id": a["set"]["id"],
},
"icon": a["icon"],
"id": a["id"],
}
for a in i["reliquaries"]
],
"constellations": [
{
"name": c["name"],
"effect": c["effect"],
"is_activated": c["is_actived"],
"index": c["pos"],
"icon": c["icon"],
"id": c["id"],
}
for c in i["constellations"]
],
"outfits": [
{"name": c["name"], "icon": c["icon"], "id": c["id"]} for c in i["costumes"]
],
}
for i in data
]
def prettify_abyss(data):
fchars = lambda d: [
{
"value": a["value"],
"name": _recognize_character_icon(a["avatar_icon"]),
"rarity": a["rarity"] if a["rarity"] < 100 else a["rarity"] - 100, # aloy has 105 stars
"icon": a["avatar_icon"],
"id": a["avatar_id"],
}
for a in d
]
todate = lambda x: datetime.fromtimestamp(int(x)).strftime("%Y-%m-%d")
totime = lambda x: datetime.fromtimestamp(int(x)).isoformat(" ")
return {
"season": data["schedule_id"],
"season_start_time": todate(data["start_time"]),
"season_end_time": todate(data["end_time"]),
"stats": {
"total_battles": data["total_battle_times"],
"total_wins": data["total_win_times"],
"max_floor": data["max_floor"],
"total_stars": data["total_star"],
},
"character_ranks": {
"most_played": fchars(data["reveal_rank"]),
"most_kills": fchars(data["defeat_rank"]),
"strongest_strike": fchars(data["damage_rank"]),
"most_damage_taken": fchars(data["take_damage_rank"]),
"most_bursts_used": fchars(data["normal_skill_rank"]),
"most_skills_used": fchars(data["energy_skill_rank"]),
},
"floors": [
{
"floor": f["index"],
"stars": f["star"],
"max_stars": f["max_star"],
"icon": f["icon"],
"chambers": [
{
"chamber": l["index"],
"stars": l["star"],
"max_stars": l["max_star"],
"has_halves": len(l["battles"]) == 2,
"battles": [
{
"half": b["index"],
"timestamp": totime(b["timestamp"]),
"characters": [
{
"name": _recognize_character_icon(c["icon"]),
"rarity": c["rarity"]
if c["rarity"] < 100
else c["rarity"] - 100, # aloy has 105 stars
"level": c["level"],
"icon": c["icon"],
"id": c["id"],
}
for c in b["avatars"]
],
}
for b in l["battles"]
],
}
for l in f["levels"]
],
}
for f in data["floors"]
],
}
def prettify_activities(data):
activities = {
k: v if v.get("exists_data") else {"records": []}
for activity in data["activities"]
for k, v in activity.items()
}
return {
"hyakunin": [
{
"id": r["challenge_id"],
"name": r["challenge_name"],
"difficulty": r["difficulty"],
"medal_icon": r["heraldry_icon"],
"score": r["max_score"],
"multiplier": r["score_multiple"],
"lineups": [
{
"characters": [
{
"name": _recognize_character_icon(c["icon"]),
"rarity": c["rarity"]
if c["rarity"] < 100
else c["rarity"] - 100, # aloy has 105 stars
"level": c["level"],
"icon": c["icon"],
"id": c["id"],
"trial": c["is_trail_avatar"],
}
for c in l["avatars"]
],
"skills": [
{"name": s["name"], "desc": s["desc"], "icon": s["icon"], "id": s["id"]}
for s in l["skills"]
],
}
for l in r["lineups"]
],
}
for r in activities["sumo"]["records"]
],
"labyrinth": None,
}
def prettify_notes(data):
return {
"resin": data["current_resin"],
"until_resin_limit": data["resin_recovery_time"],
"max_resin": data["max_resin"],
"total_commissions": data["total_task_num"],
"completed_commissions": data["finished_task_num"],
"claimed_commission_reward": data["is_extra_task_reward_received"],
"max_boss_discounts": data["resin_discount_num_limit"],
"remaining_boss_discounts": data["remain_resin_discount_num"],
"expeditions": [
{
"icon": exp["avatar_side_icon"],
"remaining_time": exp["remained_time"],
"status": exp["status"],
}
for exp in data["expeditions"]
],
"max_expeditions": data["max_expedition_num"],
"realm_currency": data["current_home_coin"],
"max_realm_currency": data["max_home_coin"],
"until_realm_currency_limit": data["home_coin_recovery_time"],
}
def prettify_game_accounts(data):
return [
{
"uid": int(a["game_uid"]),
"server": a["region_name"],
"level": a["level"],
"nickname": a["nickname"],
# idk what these are for:
"biz": a["game_biz"],
"is_chosen": a["is_chosen"],
"is_official": a["is_official"],
}
for a in data
]
def prettify_wish_history(data, banner_name=None):
return [
{
"type": i["item_type"],
"name": i["name"],
"rarity": int(i["rank_type"]),
"time": i["time"],
"id": int(i["id"]),
"banner": banner_name,
"banner_type": int(i["gacha_type"]),
"uid": int(i["uid"]),
}
for i in data
]
def prettify_gacha_items(data):
return [
{
"name": i["name"],
"type": i["item_type"],
"rarity": int(i["rank_type"]),
"id": 10000000 + int(i["item_id"]) - 1000
if len(i["item_id"]) == 4
else int(i["item_id"]),
}
for i in data
]
def prettify_banner_details(data):
per = lambda p: None if p == "0%" else float(p[:-1].replace(",", "."))
fprobs = (
lambda l: [
{
"type": i["item_type"],
"name": i["item_name"],
"rarity": int(i["rank"]),
"is_up": bool(i["is_up"]),
"order_value": i["order_value"],
}
for i in l
]
if l
else []
)
fitems = (
lambda l: [
{
"type": i["item_type"],
"name": i["item_name"],
"element": {
"": "Anemo",
"": "Pyro",
"": "Hydro",
"": "Electro",
"": "Cryo",
"": "Geo",
"": "Dendro",
"": None,
}[i["item_attr"]],
"icon": i["item_img"],
}
for i in l
]
if l
else []
)
return {
"banner_type_name": {
100: "Novice Wishes",
200: "Permanent Wish",
301: "Character Event Wish",
302: "Weapon Event Wish",
}[int(data["gacha_type"])],
"banner_type": int(data["gacha_type"]),
"banner": re.sub(r"<.*?>", "", data["title"]).strip(),
"title": data["title"],
"content": data["content"],
"date_range": data["date_range"],
"r5_up_prob": per(data["r5_up_prob"]), # probability for rate-up 5*
"r4_up_prob": per(data["r4_up_prob"]), # probability for rate-up 4*
"r5_prob": per(data["r5_prob"]), # probability for 5*
"r4_prob": per(data["r4_prob"]), # probability for 4*
"r3_prob": per(data["r3_prob"]), # probability for 3*
"r5_guarantee_prob": per(data["r5_baodi_prob"]), # probability for 5* incl. guarantee
"r4_guarantee_prob": per(data["r4_baodi_prob"]), # probability for 4* incl. guarantee
"r3_guarantee_prob": per(data["r3_baodi_prob"]), # probability for 3* incl. guarantee
"r5_up_items": fitems(
data["r5_up_items"]
), # list of 5* rate-up items that you can get from banner
"r4_up_items": fitems(
data["r4_up_items"]
), # list of 4* rate-up items that you can get from banner
"r5_items": fprobs(data["r5_prob_list"]), # list 5* of items that you can get from banner
"r4_items": fprobs(data["r4_prob_list"]), # list 4* of items that you can get from banner
"r3_items": fprobs(data["r3_prob_list"]), # list 3* of items that you can get from banner
"items": fprobs(
sorted(
data["r5_prob_list"] + data["r4_prob_list"] + data["r3_prob_list"],
key=lambda x: x["order_value"],
)
),
}
def prettify_trans(data, reasons={}):
if data and "name" in data[0]:
# transaction item
return [
{
"time": i["time"],
"name": i["name"],
"rarity": int(i["rank"]),
"amount": int(i["add_num"]),
"reason": reasons.get(int(i["reason"]), ""),
"reason_id": int(i["reason"]),
"uid": int(i["uid"]),
"id": int(i["id"]),
}
for i in data
]
else:
# transaction
return [
{
"time": i["time"],
"amount": int(i["add_num"]),
"reason": reasons.get(int(i["reason"]), ""),
"reason_id": int(i["reason"]),
"uid": int(i["uid"]),
"id": int(i["id"]),
}
for i in data
]

View File

@ -0,0 +1,194 @@
"""Logs for currency "transactions".
Logs for artifact, weapon, resin, genesis crystol and primogem "transactions".
You may view a history of everything you have gained in the last 3 months.
"""
import math
import sys
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
from .caching import permanent_cache
from .pretty import prettify_trans
from .wishes import fetch_gacha_endpoint, static_session
__all__ = [
"fetch_transaction_endpoint",
"get_primogem_log",
"get_resin_log",
"get_crystal_log",
"get_artifact_log",
"get_weapon_log",
"current_resin",
"approximate_current_resin",
]
YSULOG_URL = "https://hk4e-api-os.mihoyo.com/ysulog/api/"
def fetch_transaction_endpoint(
endpoint: str, authkey: Optional[str] = None, **kwargs
) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's transaction logs api.
Takes in an endpoint url which is joined with the base url.
If an authkey is provided, it uses that authkey specifically.
A request is then sent and returns a parsed response.
"""
url = urljoin(YSULOG_URL, endpoint)
return fetch_gacha_endpoint(url, authkey, **kwargs)
@permanent_cache("lang")
def _get_reasons(lang: str = "en-us") -> Dict[int, str]:
r = static_session.get(
f"https://mi18n-os.mihoyo.com/webstatic/admin/mi18n/hk4e_global/m02251421001311/m02251421001311-{lang}.json"
)
r.raise_for_status()
data = r.json()
return {
int(k.split("_")[-1]): v
for k, v in data.items()
if k.startswith("selfinquiry_general_reason_")
}
def _get_transactions(
endpoint: str, size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""A paginator that uses mihoyo's id paginator algorithm to yield pages"""
if size is not None and size <= 0:
return
page_size = 20
size = size or sys.maxsize
while True:
data = fetch_transaction_endpoint(
endpoint, authkey=authkey, params=dict(size=min(page_size, size), end_id=end_id)
)["list"]
data = prettify_trans(data, _get_reasons(lang))
yield from data
size -= page_size
if len(data) < page_size or size <= 0:
break
end_id = data[-1]["id"]
def get_primogem_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Gets all transactions of primogems
This means stuff like getting primogems from rewards and explorations or making wishes.
Records go only 3 months back.
"""
return _get_transactions("getPrimogemLog", size, authkey, lang, end_id)
def get_crystal_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get all transactions of genesis crystals
Records go only 3 months back.
"""
return _get_transactions("getCrystalLog", size, authkey, lang, end_id)
def get_resin_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Gets all usage of resin
This means using them in ley lines, domains, crafting and weekly bosses.
Records go only 3 months back.
"""
return _get_transactions("getResinLog", size, authkey, lang, end_id)
def get_artifact_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get the log of all artifacts gotten or destroyed in the last 3 months"""
return _get_transactions("getArtifactLog", size, authkey, lang, end_id)
def get_weapon_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get the log of all weapons gotten or destroyed in the last 3 months"""
return _get_transactions("getWeaponLog", size, authkey, lang, end_id)
def current_resin(
last_resin_time: datetime,
last_resin_amount: float,
current_time: datetime = None,
authkey: str = None,
):
"""Gets the current resin based off an amount of resin you've had at any time before
Works by getting all usages after the last resin time and emulating how the resin would be generated.
Keep in mind that this approach works only if the user hasn't played in the last hour.
"""
current_time = current_time or datetime.utcnow()
resin_usage: List[Dict[str, Any]] = [{"time": str(current_time), "amount": 0}]
for usage in get_resin_log(authkey=authkey):
if datetime.fromisoformat(usage["time"]) < last_resin_time:
break
resin_usage.append(usage)
resin_usage.reverse()
resin = last_resin_amount
for usage in resin_usage:
usage_time = datetime.fromisoformat(usage["time"])
recovered_resin = (usage_time - last_resin_time).total_seconds() / (8 * 60)
resin = min(resin + recovered_resin, 160) + usage["amount"]
last_resin_time = usage_time
# better raise an error than to leave users confused
if resin < 0:
raise ValueError("Last resin time is wrong or amount is too low")
return resin
def approximate_current_resin(time: datetime = None, authkey: str = None):
"""Roughly approximates how much resin using a minmax calculation
The result can have an offset of around 5 resin in some cases.
"""
# if any algorithm peeps can help with this one I'd appreciate it
recovery_rate = 8 * 60
current_max = shadow_max = 160.0
current_min = shadow_min = 0.0
time = time or datetime.utcnow()
last_amount = 0
for usage in get_resin_log(authkey=authkey):
usage_time = datetime.fromisoformat(usage["time"])
if time < usage_time:
continue
amount_recovered = (time - usage_time).total_seconds() / recovery_rate
cur_amount: int = usage["amount"]
shadow_max += cur_amount + amount_recovered
shadow_min += last_amount + amount_recovered
current_max = max(current_min, min(current_max, shadow_max))
current_min = max(current_min, min(current_max, shadow_min))
time = usage_time
last_amount = usage["amount"]
if math.isclose(current_max, current_min):
break
resin = (current_max + current_min) / 2
return resin

120
genshinstats/utils.py Normal file
View File

@ -0,0 +1,120 @@
"""Various utility functions for genshinstats."""
import inspect
import os.path
import re
import warnings
from functools import wraps
from typing import Callable, Iterable, Optional, Type, TypeVar, Union
from .errors import AccountNotFound
__all__ = [
"USER_AGENT",
"recognize_server",
"recognize_id",
"is_game_uid",
"is_chinese",
"get_logfile",
]
T = TypeVar("T")
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
def recognize_server(uid: int) -> str:
"""Recognizes which server a UID is from."""
server = {
"1": "cn_gf01",
"2": "cn_gf01",
"5": "cn_qd01",
"6": "os_usa",
"7": "os_euro",
"8": "os_asia",
"9": "os_cht",
}.get(str(uid)[0])
if server:
return server
else:
raise AccountNotFound(f"UID {uid} isn't associated with any server")
def recognize_id(id: int) -> Optional[str]:
"""Attempts to recognize what item type an id is"""
if 10000000 < id < 20000000:
return "character"
elif 1000000 < id < 10000000:
return "artifact_set"
elif 100000 < id < 1000000:
return "outfit"
elif 50000 < id < 100000:
return "artifact"
elif 10000 < id < 50000:
return "weapon"
elif 100 < id < 1000:
return "constellation"
elif 10 ** 17 < id < 10 ** 19:
return "transaction"
# not sure about these ones:
elif 1 <= id <= 4:
return "exploration"
else:
return None
def is_game_uid(uid: int) -> bool:
"""Recognizes whether the uid is a game uid."""
return bool(re.fullmatch(r"[6789]\d{8}", str(uid)))
def is_chinese(x: Union[int, str]) -> bool:
"""Recognizes whether the server/uid is chinese."""
return str(x).startswith(("cn", "1", "5"))
def get_logfile() -> Optional[str]:
"""Find and return the Genshin Impact logfile. None if not found."""
mihoyo_dir = os.path.expanduser("~/AppData/LocalLow/miHoYo/")
for name in ["Genshin Impact", "原神", "YuanShen"]:
output_log = os.path.join(mihoyo_dir, name, "output_log.txt")
if os.path.isfile(output_log):
return output_log
return None # no genshin installation
def retry(
tries: int = 3,
exceptions: Union[Type[BaseException], Iterable[Type[BaseException]]] = Exception,
) -> Callable[[T], T]:
"""A classic retry() decorator"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
for _ in range(tries):
try:
return func(*args, **kwargs)
except exceptions as e:
exc = e
else:
raise Exception(f"Maximum tries ({tries}) exceeded: {exc}") from exc # type: ignore
return inner
return wrapper # type: ignore
def deprecated(
message: str = "{} is deprecated and will be removed in future versions",
) -> Callable[[T], T]:
"""Shows a warning when a function is attempted to be used"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
warnings.warn(message.format(func.__name__), PendingDeprecationWarning)
return func(*args, **kwargs)
return inner
return wrapper # type: ignore

277
genshinstats/wishes.py Normal file
View File

@ -0,0 +1,277 @@
"""Genshin Impact wish history.
Gets wish history from the current banners in a clean api.
Requires an authkey that is fetched automatically from a logfile.
"""
import base64
import heapq
import os
import re
import sys
from itertools import chain, islice
from tempfile import gettempdir
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import unquote, urljoin
from requests import Session
from .errors import AuthkeyError, MissingAuthKey, raise_for_error
from .pretty import *
from .utils import USER_AGENT, get_logfile
from .caching import permanent_cache
__all__ = [
"extract_authkey",
"get_authkey",
"set_authkey",
"get_banner_ids",
"fetch_gacha_endpoint",
"get_banner_types",
"get_wish_history",
"get_gacha_items",
"get_banner_details",
"get_uid_from_authkey",
"validate_authkey",
]
GENSHIN_LOG = get_logfile()
GACHA_INFO_URL = "https://hk4e-api-os.mihoyo.com/event/gacha_info/api/"
AUTHKEY_FILE = os.path.join(gettempdir(), "genshinstats_authkey.txt")
session = Session()
session.headers.update(
{
# recommended header
"user-agent": USER_AGENT
}
)
session.params = {
# required params
"authkey_ver": "1",
"lang": "en",
# authentications params
"authkey": "",
# transaction params
"sign_type": "2",
}
static_session = Session() # extra session for static resources
def _get_short_lang_code(lang: str) -> str:
"""Returns an alternative short lang code"""
return lang if "zh" in lang else lang.split("-")[0]
def _read_logfile(logfile: str = None) -> str:
"""Returns the contents of a logfile"""
if GENSHIN_LOG is None:
raise FileNotFoundError("No Genshin Installation was found, could not get gacha data.")
with open(logfile or GENSHIN_LOG) as file:
return file.read()
def extract_authkey(string: str) -> Optional[str]:
"""Extracts an authkey from the provided string. Returns None if not found."""
match = re.search(r"https://.+?authkey=([^&#]+)", string, re.MULTILINE)
if match is not None:
return unquote(match.group(1))
return None
def get_authkey(logfile: str = None) -> str:
"""Gets the query for log requests.
This will either be done from the logs or from a tempfile.
"""
# first try the log
authkey = extract_authkey(_read_logfile(logfile))
if authkey is not None:
with open(AUTHKEY_FILE, "w") as file:
file.write(authkey)
return authkey
# otherwise try the tempfile (may be expired!)
if os.path.isfile(AUTHKEY_FILE):
with open(AUTHKEY_FILE) as file:
return file.read()
raise MissingAuthKey(
"No authkey could be found in the logs or in a tempfile. "
"Open the history in-game first before attempting to request it."
)
def set_authkey(authkey: str = None) -> None:
"""Sets an authkey for log requests.
You may pass in an authkey, a url with an authkey
or a path to a logfile with the authkey.
"""
if authkey is None or os.path.isfile(authkey):
authkey = get_authkey(authkey)
else:
authkey = extract_authkey(authkey) or authkey
session.params["authkey"] = authkey # type: ignore
def get_banner_ids(logfile: str = None) -> List[str]:
"""Gets all banner ids from a log file.
You need to open the details of all banners for this to work.
"""
log = _read_logfile(logfile)
ids = re.findall(r"OnGetWebViewPageFinish:https://.+?gacha_id=([^&#]+)", log)
return list(set(ids))
def fetch_gacha_endpoint(endpoint: str, authkey: str = None, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's gacha info.
Takes in an endpoint url which is joined with the base url.
If an authkey is provided, it uses that authkey specifically.
A request is then sent and returns a parsed response.
Includes error handling and getting the authkey.
"""
if authkey is None:
session.params["authkey"] = session.params["authkey"] or get_authkey() # type: ignore
else:
kwargs.setdefault("params", {})["authkey"] = authkey
method = kwargs.pop("method", "get")
url = urljoin(GACHA_INFO_URL, endpoint)
r = session.request(method, url, **kwargs)
r.raise_for_status()
data = r.json()
if data["retcode"] == 0:
return data["data"]
raise_for_error(data)
@permanent_cache("lang")
def get_banner_types(authkey: str = None, lang: str = "en") -> Dict[int, str]:
"""Gets ids for all banners and their names"""
banners = fetch_gacha_endpoint(
"getConfigList", authkey=authkey, params=dict(lang=_get_short_lang_code(lang))
)["gacha_type_list"]
return {int(i["key"]): i["name"] for i in banners}
def get_wish_history(
banner_type: int = None,
size: int = None,
authkey: str = None,
end_id: int = 0,
lang: str = "en",
) -> Iterator[Dict[str, Any]]:
"""Gets wish history.
Note that pulls are yielded and not returned to account for pagination.
When a banner_type is set, only data from that banner type is retuned.
You can get banner types and their names from get_banner_types.
If a size is set the total returned amount of pulls will be equal to or lower than the size.
To be able to get history starting from somewhere other than the last pull
you may pass in the id of the pull right chronologically after the one you want to start from as end_id.
"""
if size is not None and size <= 0:
return
if banner_type is None:
# we get data from all banners by getting data from every individual banner
# and then sorting it by pull date with heapq.merge
gens = [
get_wish_history(banner_type, None, authkey, end_id, lang)
for banner_type in get_banner_types(authkey)
]
yield from islice(heapq.merge(*gens, key=lambda x: x["time"], reverse=True), size)
return
# we create banner_name outside prettify so we don't make extra requests
banner_name = get_banner_types(authkey, lang)[banner_type]
lang = _get_short_lang_code(lang)
page_size = 20
size = size or sys.maxsize
while True:
data = fetch_gacha_endpoint(
"getGachaLog",
authkey=authkey,
params=dict(
gacha_type=banner_type, size=min(page_size, size), end_id=end_id, lang=lang
),
)["list"]
data = prettify_wish_history(data, banner_name)
yield from data
size -= page_size
if len(data) < page_size or size <= 0:
break
end_id = data[-1]["id"]
def get_gacha_items(lang: str = "en-us") -> List[Dict[str, Any]]:
"""Gets the list of characters and weapons that can be gotten from the gacha."""
r = static_session.get(
f"https://webstatic-sea.mihoyo.com/hk4e/gacha_info/os_asia/items/{lang}.json"
)
r.raise_for_status()
return prettify_gacha_items(r.json())
def get_banner_details(banner_id: str, lang: str = "en-us") -> Dict[str, Any]:
"""Gets details of a specific banner.
This requires the banner's id.
These keep rotating so you need to get them with get_banner_ids().
example standard wish: "a37a19624270b092e7250edfabce541a3435c2"
The newbie gacha has no json resource tied to it so you can't get info about it.
"""
r = static_session.get(
f"https://webstatic-sea.mihoyo.com/hk4e/gacha_info/os_asia/{banner_id}/{lang}.json"
)
r.raise_for_status()
return prettify_banner_details(r.json())
def get_uid_from_authkey(authkey: str = None) -> int:
"""Gets a uid from an authkey.
If an authkey is not passed in the function uses the currently set authkey.
"""
# for safety we use all banners, probably overkill
# they are sorted from most to least pulled on for speed
histories = [get_wish_history(i, 1, authkey) for i in (301, 200, 302, 100)]
pull = next(chain.from_iterable(histories), None)
if pull is None: # very rare but possible
raise Exception("User has never made a wish")
return pull["uid"]
def validate_authkey(authkey: Any, previous_authkey: str = None) -> bool:
"""Checks whether an authkey is valid by sending a request
If a previous authkey is provided the function also checks if the
authkey belongs to the same person as the previous one.
"""
if not isinstance(authkey, str) or len(authkey) != 1024:
return False # invalid format
try:
base64.b64decode(authkey)
except:
return False # invalid base64 format
if previous_authkey and authkey[:682] != previous_authkey[:682]:
return False
try:
fetch_gacha_endpoint("getConfigList", authkey=authkey)
except AuthkeyError:
return False
return True

269
plugins/mihoyo.py Normal file
View File

@ -0,0 +1,269 @@
import asyncio
import random
import re
import sqlite3
import traceback
from pyrogram import Client
from pyrogram.types import Message
from defs.db2 import deal_ck, selectDB, OpenPush, CheckDB, connectDB, deletecache
from defs.mihoyo import sign, daily, draw_pic, draw_wordcloud
from ci import scheduler, app, admin_id
from defs.redis_load import redis
SUPERUSERS = [admin_id]
async def mihoyo_msg(client: Client, message: Message):
text = message.text.replace("mihoyo", "")
userid = message.from_user.id
if '添加' in text:
try:
mes = text.replace('添加', '').strip()
if not mes:
return await message.reply_text("获取 Cookie 请参考:[link](https://github.com/thesadru/"
"genshinstats/tree/master#how-can-i-get-my-cookies)", quote=True)
await deal_ck(mes, userid)
await message.reply(f'添加Cookies成功\n'
f'Cookies属于个人重要信息如果你是在不知情的情况下添加'
f'请马上修改mihoyo账户密码保护个人隐私\n'
f'<code>=============</code>\n'
f'如果需要【开启自动签到】和【开启推送】还需要使用命令 '
f'<code>mihoyo绑定uid</code>绑定你的uid。\n'
f'例如:<code>mihoyo绑定uid123456789</code>。')
except Exception as e:
traceback.print_exc()
await message.reply(f'校验失败请输入正确的Cookies获取 Cookie 请参考:'
f'[link](https://github.com/thesadru/'
f'genshinstats/tree/master#how-can-i-get-my-cookies)', quote=True)
elif '推送' in text:
try:
uid = await selectDB(userid, mode="uid")
if '开启' in text:
im = await OpenPush(int(uid[0]), userid, "on", "StatusA")
await message.reply(im, quote=True)
else:
im = await OpenPush(int(uid[0]), userid, "off", "StatusA")
await message.reply(im, quote=True)
except Exception as e:
traceback.print_exc()
await message.reply("未找到uid绑定记录。", quote=True)
elif '自动签到' in text:
try:
uid = await selectDB(userid, mode="uid")
if '开启' in text:
im = await OpenPush(int(uid[0]), userid, "on", "StatusB")
await message.reply(im, quote=True)
else:
im = await OpenPush(int(uid[0]), userid, "off", "StatusA")
await message.reply(im, quote=True)
except Exception as e:
traceback.print_exc()
await message.reply("未找到uid绑定记录。", quote=True)
async def mihoyo_qun_msg(client: Client, message: Message):
text = message.text.replace("mihoyo", "")
qid = message.from_user.id
at = message.reply_to_message
if "自动签到" in text:
try:
if at and qid in SUPERUSERS:
qid = at.from_user.id
elif at and qid not in SUPERUSERS:
return await message.reply("你没有权限。")
gid = message.chat.id
uid = await selectDB(qid, mode="uid")
if "开启" in text:
im = await OpenPush(int(uid[0]), message.from_user.id, str(gid), "StatusB")
await message.reply(im, quote=True)
elif "关闭" in text:
im = await OpenPush(int(uid[0]), message.from_user.id, "off", "StatusB")
await message.reply(im)
except Exception as e:
traceback.print_exc()
await message.reply("未绑定uid信息")
elif "推送" in text:
try:
if at and qid in SUPERUSERS:
qid = at.from_user.id
elif at and qid not in SUPERUSERS:
return await message.reply("你没有权限。")
gid = message.chat.id
uid = await selectDB(qid, mode="uid")
if "开启" in text:
im = await OpenPush(int(uid[0]), message.from_user.id, str(gid), "StatusA")
await message.reply(im, quote=True)
elif "关闭" in text:
im = await OpenPush(int(uid[0]), message.from_user.id, "off", "StatusA")
await message.reply(im)
except Exception as e:
traceback.print_exc()
await message.reply("未绑定uid信息")
elif "每月统计" in text:
# need auth_key 不支持
await message.reply('暂不支持!')
elif "签到" in text:
try:
uid = await selectDB(message.from_user.id, mode="uid")
uid = uid[0]
im = await sign(uid)
await message.reply(im)
except Exception as e:
traceback.print_exc()
await message.reply('未找到绑定信息')
elif "效验全部" in text:
im = await CheckDB()
await message.reply(im)
elif "当前状态" in text:
try:
uid = await selectDB(message.from_user.id, mode="uid")
uid = uid[0]
mes = await daily("ask", uid)
im = mes[0]['message']
except Exception as e:
traceback.print_exc()
im = "没有找到绑定信息。"
await message.reply(im)
elif "绑定uid" in text:
uid = text.replace("绑定uid", "") # str
await connectDB(message.from_user.id, uid)
await message.reply('绑定uid成功')
elif "绑定mys" in text:
mys = text.replace("绑定mys", "") # str
await connectDB(message.from_user.id, None, mys)
await message.reply('绑定米游社id成功')
elif "查询" in text:
try:
at = message.reply_to_message
if at:
qid = at.from_user.id
nickname = at.from_user.first_name
uid = await selectDB(qid)
else:
nickname = message.from_user.first_name
uid = await selectDB(message.from_user.id)
nickname = nickname if len(nickname) < 10 else (nickname[:10] + "...")
if uid:
if "词云" in text:
try:
im = await draw_wordcloud(uid[0], message, uid[1])
if im.find(".jpg") != -1:
await message.reply_photo(im)
else:
await message.reply(im)
except Exception as e:
await message.reply("获取失败,有可能是数据状态有问题,\n{}\n请检查后台输出。".format(e))
traceback.print_exc()
else:
try:
bg = await draw_pic(uid[0], message, nickname=nickname, mode=uid[1])
if bg.find(".") != -1:
await message.reply_photo(bg)
else:
await message.reply(bg)
except Exception as e:
await message.reply("获取失败,有可能是数据状态有问题,\n{}\n请检查后台输出。".format(e))
traceback.print_exc()
else:
await message.reply('未找到绑定记录!')
except Exception as e:
traceback.print_exc()
await message.reply("发生错误 {},请检查后台输出。".format(e))
elif "mys" in text:
try:
try:
uid = re.findall(r"\d+", text)[0] # str
except IndexError:
return await message.reply("米游社 id 格式错误!")
nickname = message.from_user.first_name
nickname = nickname if len(nickname) < 10 else (nickname[:10] + "...")
try:
im = await draw_pic(uid, message, nickname=nickname, mode=3)
if im.find(".") != -1:
await message.reply_photo(im)
else:
await message.reply(im)
except Exception as e:
await message.reply("获取失败,有可能是数据状态有问题,\n{}\n请检查后台输出。".format(e))
traceback.print_exc()
except Exception as e:
traceback.print_exc()
await message.reply("发生错误 {},请检查后台输出。".format(e))
elif "全部重签" in text and message.from_user.id in SUPERUSERS:
try:
await message.reply("已开始执行")
await daily_sign_2()
except Exception as e:
traceback.print_exc()
await message.reply("发生错误 {},请检查后台输出。".format(e))
# 每隔一小时检测树脂是否超过设定值
@scheduler.scheduled_job('interval', hours=1)
async def push_2():
daily_data = await daily()
if daily_data is not None:
for i in daily_data:
# 过滤重复推送
data = i['message'].split('==============')
if len(data) > 2:
text = "".join(data[1:-1])
data = redis.get("daily_" + str(i['qid']))
if data:
if text == data.decode():
continue
redis.set("daily_" + str(i['qid']), text)
if i['gid'] == "on":
await app.send_message(int(i['qid']), i['message'])
else:
await app.send_message(int(i['gid']), f"[NOTICE {i['qid']}](tg://user?id={i['qid']})" + "\n" +
i['message'])
else:
pass
# 每日零点半进行 mihoyo 签到
@scheduler.scheduled_job('cron', hour='0', minute="30")
async def daily_sign_2():
conn = sqlite3.connect('ID_DATA_OR.db')
c = conn.cursor()
cursor = c.execute(
"SELECT * FROM NewCookiesTable WHERE StatusB != ?", ("off",))
c_data = cursor.fetchall()
temp_list = []
for row in c_data:
if row[4] == "on":
try:
im = await sign(str(row[0]))
await app.send_message(int(row[2]), im)
except Exception as e:
traceback.print_exc()
else:
im = await sign(str(row[0]))
message = f"[NOTICE {row[2]}](tg://user?id={row[2]})\n\n{im}"
for i in temp_list:
if row[4] == i["push_group"]:
i["push_message"] = i["push_message"] + "\n" + message
break
else:
temp_list.append({"push_group": row[4], "push_message": message})
await asyncio.sleep(6 + random.randint(0, 2))
for i in temp_list:
try:
await app.send_message(int(i["push_group"]), i["push_message"])
except Exception as e:
traceback.print_exc()
await asyncio.sleep(3 + random.randint(0, 2))
# 每日零点清空cookies使用缓存
@scheduler.scheduled_job('cron', hour='0')
async def delete_2():
deletecache()

View File

@ -3,6 +3,7 @@ import random
import re
import sqlite3
import traceback
from typing import Union
from pyrogram import Client
from pyrogram.types import Message
@ -10,6 +11,7 @@ from pyrogram.types import Message
from defs.db import deal_ck, selectDB, OpenPush, CheckDB, connectDB, deletecache
from defs.event import generate_event
from defs.mys2 import award, sign, daily, draw_pic, draw_wordcloud
from defs.mihoyo import draw_pic as draw_pic_2
from ci import scheduler, app, admin_id
from defs.redis_load import redis
@ -17,6 +19,11 @@ from defs.redis_load import redis
SUPERUSERS = [admin_id]
def is_chinese(x: Union[int, str]) -> bool:
"""Recognizes whether the server/uid is chinese."""
return str(x).startswith(("1", "2", "5"))
async def mys2_msg(client: Client, message: Message):
text = message.text.replace("米游社", "")
userid = message.from_user.id
@ -137,8 +144,11 @@ async def mys2_qun_msg(client: Client, message: Message):
await message.reply(im)
elif "绑定uid" in text:
uid = text.replace("绑定uid", "") # str
await connectDB(message.from_user.id, uid)
await message.reply('绑定uid成功')
if is_chinese(uid):
await connectDB(message.from_user.id, uid)
await message.reply('绑定uid成功')
else:
await message.reply("非国区uid")
elif "绑定mys" in text:
mys = text.replace("绑定mys", "") # str
await connectDB(message.from_user.id, None, mys)
@ -152,7 +162,10 @@ async def mys2_qun_msg(client: Client, message: Message):
try:
nickname = message.from_user.first_name
nickname = nickname if len(nickname) < 10 else (nickname[:10] + "...")
im = await draw_pic(uid, message, nickname=nickname, mode=2)
if is_chinese(uid):
im = await draw_pic(uid, message, nickname=nickname, mode=2)
else:
im = await draw_pic_2(uid, message, nickname=nickname, mode=2)
if im.find(".") != -1:
await message.reply_photo(im)
else:

View File

@ -9,6 +9,7 @@ from pyrogram import filters as Filters
from ci import me
from plugins.enemies import enemies_msg
from plugins.mys2 import mys2_msg, mys2_qun_msg
from plugins.mihoyo import mihoyo_msg, mihoyo_qun_msg
from plugins.start import welcome_command, ping_command, help_command, leave_command, help_callback
from plugins.almanac import almanac_msg
from plugins.challenge import tf_msg, wq_msg, zb_msg
@ -109,6 +110,8 @@ async def process_private_msg(client: Client, message: Message):
await log(client, message, '查询资源列表')
if '米游社' in message.text:
await mys2_msg(client, message)
if 'mihoyo' in message.text:
await mihoyo_msg(client, message)
# 账号信息cookie 过期过快 不推荐启用)
# if '账号信息' in message.text or '用户信息' in message.text:
# await mys_msg(client, message)
@ -187,6 +190,8 @@ async def process_group_msg(client: Client, message: Message):
# 米游社功能
if text.startswith('米游社'):
await mys2_qun_msg(client, message)
if text.startswith('mihoyo'):
await mihoyo_qun_msg(client, message)
@Client.on_message(Filters.photo)
@ -200,6 +205,9 @@ async def process_photo(client: Client, message: Message):
if text.startswith('米游社'):
if message.chat.type == "supergroup":
await mys2_qun_msg(client, message)
if text.startswith('mihoyo'):
if message.chat.type == "supergroup":
await mihoyo_qun_msg(client, message)
@Client.on_message(Filters.document & Filters.group & ~Filters.edited)
@ -211,6 +219,10 @@ async def process_document(client: Client, message: Message):
print(message.document.mime_type)
if message.document.mime_type in ["image/jpeg"]:
await mys2_qun_msg(client, message)
if text.startswith('mihoyo'):
print(message.document.mime_type)
if message.document.mime_type in ["image/jpeg"]:
await mihoyo_qun_msg(client, message)
@Client.on_message(Filters.voice & Filters.private & ~Filters.edited)

View File

@ -2,7 +2,7 @@ from ci import admin_id
from pyrogram import Client
from pyrogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
HELP_MSG_PRE = '<a href="https://git.io/JcbTD">PaimonBot</a> 0.3.0beta By Xtao-Labs\n\n' \
HELP_MSG_PRE = '<a href="https://git.io/JcbTD">PaimonBot</a> 0.3.1beta By Xtao-Labs\n\n' \
'🔅 以下是小派蒙我学会了的功能(部分):\n'
HELP_MSG = """① [武器/今日武器] 查看今日武器材料和武器
[天赋/今日天赋] 查看今日天赋材料和角色
@ -28,7 +28,7 @@ HELP_MSG = """① [武器/今日武器] 查看今日武器材料和武器
(13) [圣遗物评分] 我也想拥有这种分数的圣遗物(切实)
(14) [哪里有 (资源名)] 查看资源的位置
(15) [资源列表] 查看原神所有资源私聊
(16) [米游社] 米游社相关功能
(16) [米游社/mihoyo] 米游社/mihoyo相关功能替换
💠 <code>米游社添加私聊</code>
💠 <code>米游社推送开启/关闭</code>
💠 <code>米游社自动签到开启/关闭</code>