3
0

Add as submodule

This commit is contained in:
xtaodada 2022-02-03 13:52:20 +08:00
parent 5c56c644f5
commit ac760bcee9
No known key found for this signature in database
GPG Key ID: EE4DC37B55E24736
15 changed files with 6 additions and 2116 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "genshinstats"]
path = genshinstats
url = https://github.com/thesadru/genshinstats.git

View File

@ -3,7 +3,7 @@ import re
import traceback
from datetime import datetime
from shutil import copyfile
import genshinstats as gs
import genshinstats.genshinstats as gs
async def cookiesDB(uid, Cookies, qid):

View File

@ -13,7 +13,7 @@ from pyrogram.types import Message
from defs.db2 import MysSign, GetDaily, cacheDB, GetMysInfo, errorDB, GetInfo, GetSpiralAbyssInfo, GetAward
from defs.event import ys_font
from genshinstats.daily import DailyRewardInfo
from genshinstats.genshinstats.daily import DailyRewardInfo
WEAPON_PATH = os.path.join("assets", 'weapon')
BG_PATH = os.path.join("assets", "bg")

1
genshinstats Submodule

@ -0,0 +1 @@
Subproject commit 5760a38384e6dfdf95ec4ebdf410c3bee0537003

View File

@ -1,18 +0,0 @@
"""Wrapper for the Genshin Impact's api.
This is an unofficial wrapper for the Genshin Impact gameRecord and wish history api.
Majority of the endpoints are implemented, documented and typehinted.
All endpoints require to be logged in with either a cookie or an authkey, read the README.md for more info.
https://github.com/thesadru/genshinstats
"""
from .caching import *
from .daily import *
from .errors import *
from .genshinstats import *
from .hoyolab import *
from .map import *
from .transactions import *
from .utils import *
from .wishes import *

View File

@ -1,206 +0,0 @@
"""Install a cache into genshinstats"""
import inspect
import os
import sys
from functools import update_wrapper
from itertools import islice
from typing import Any, Callable, Dict, List, MutableMapping, Tuple, TypeVar
import genshinstats as gs
__all__ = ["permanent_cache", "install_cache", "uninstall_cache"]
C = TypeVar("C", bound=Callable[..., Any])
def permanent_cache(*params: str) -> Callable[[C], C]:
"""Like lru_cache except permanent and only caches based on some parameters"""
cache: Dict[Any, Any] = {}
def wrapper(func):
sig = inspect.signature(func)
def inner(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# since the amount of arguments is constant we can just save the values
key = tuple(v for k, v in bound.arguments.items() if k in params)
if key in cache:
return cache[key]
r = func(*args, **kwargs)
if r is not None:
cache[key] = r
return r
inner.cache = cache
return update_wrapper(inner, func)
return wrapper # type: ignore
def cache_func(func: C, cache: MutableMapping[Tuple[Any, ...], Any]) -> C:
"""Caches a normal function"""
# prevent possible repeated cachings
if hasattr(func, "__cache__"):
return func
sig = inspect.signature(func)
def wrapper(*args, **kwargs):
# create key (func name, *arguments)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
key = tuple(v for k, v in bound.arguments.items() if k != "cookie")
key = (func.__name__,) + key
if key in cache:
return cache[key]
r = func(*args, **kwargs)
if r is not None:
cache[key] = r
return r
setattr(wrapper, "__cache__", cache)
setattr(wrapper, "__original__", func)
return update_wrapper(wrapper, func) # type: ignore
def cache_paginator(
func: C, cache: MutableMapping[Tuple[Any, ...], Any], strict: bool = False
) -> C:
"""Caches an id generator such as wish history
Respects size and authkey.
If strict mode is on then the first item of the paginator will no longer be requested every time.
"""
if hasattr(func, "__cache__"):
return func
sig = inspect.signature(func)
def wrapper(*args, **kwargs):
# create key (func name, end id, *arguments)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
arguments = bound.arguments
# remove arguments that might cause problems
size, authkey, end_id = [arguments.pop(k) for k in ("size", "authkey", "end_id")]
partial_key = tuple(arguments.values())
# special recursive case must be ignored
# otherwise an infinite recursion due to end_id resets will occur
if "banner_type" in arguments and arguments["banner_type"] is None:
return func(*args, **kwargs)
def make_key(end_id: int) -> Tuple[Any, ...]:
return (
func.__name__,
end_id,
) + partial_key
def helper(end_id: int):
while True:
# yield new items from the cache
key = make_key(end_id)
while key in cache:
yield cache[key]
end_id = cache[key]["id"]
key = make_key(end_id)
# look ahead and add new items to the cache
# since the size limit is always 20 we use that to make only a single request
new = list(func(size=20, authkey=authkey, end_id=end_id, **arguments))
if not new:
break
# the head may not want to be cached so it must be handled separately
if end_id != 0 or strict:
cache[make_key(end_id)] = new[0]
if end_id == 0:
yield new[0]
end_id = new[0]["id"]
for p, n in zip(new, new[1:]):
cache[make_key(p["id"])] = n
return islice(helper(end_id), size)
setattr(wrapper, "__cache__", cache)
setattr(wrapper, "__original__", func)
return update_wrapper(wrapper, func) # type: ignore
def install_cache(cache: MutableMapping[Tuple[Any, ...], Any], strict: bool = False) -> None:
"""Installs a cache into every cacheable function in genshinstats
If strict mode is on then the first item of the paginator will no longer be requested every time.
That can however cause a variety of problems and it's therefore recommend to use it only with TTL caches.
Please do note that hundreds of accesses may be made per call so your cache shouldn't be doing heavy computations during accesses.
"""
functions: List[Callable] = [
# genshinstats
gs.get_user_stats,
gs.get_characters,
gs.get_spiral_abyss,
# wishes
gs.get_banner_details,
gs.get_gacha_items,
# hoyolab
gs.search,
gs.get_record_card,
gs.get_recommended_users,
]
paginators: List[Callable] = [
# wishes
gs.get_wish_history,
# transactions
gs.get_artifact_log,
gs.get_crystal_log,
gs.get_primogem_log,
gs.get_resin_log,
gs.get_weapon_log,
]
invalid: List[Callable] = [
# normal generator
gs.get_claimed_rewards,
# cookie dependent
gs.get_daily_reward_info,
gs.get_game_accounts,
]
wrapped = []
for func in functions:
wrapped.append(cache_func(func, cache))
for func in paginators:
wrapped.append(cache_paginator(func, cache, strict=strict))
for func in wrapped:
# ensure we only replace actual functions from the genshinstats directory
for module in sys.modules.values():
if not hasattr(module, func.__name__):
continue
orig_func = getattr(module, func.__name__)
if (
os.path.split(orig_func.__globals__["__file__"])[0]
!= os.path.split(func.__globals__["__file__"])[0] # type: ignore
):
continue
setattr(module, func.__name__, func)
def uninstall_cache() -> None:
"""Uninstalls the cache from all functions"""
modules = sys.modules.copy()
for module in modules.values():
try:
members = inspect.getmembers(module)
except ModuleNotFoundError:
continue
for name, func in members:
if hasattr(func, "__cache__"):
setattr(module, name, getattr(func, "__original__", func))

View File

@ -1,106 +0,0 @@
"""Automatic sign-in for hoyolab's daily rewards.
Automatically claims the next daily reward in the daily check-in rewards.
"""
from typing import Any, Dict, Iterator, List, Mapping, NamedTuple, Optional
from urllib.parse import urljoin
from .caching import permanent_cache
from .genshinstats import fetch_endpoint
from .hoyolab import get_game_accounts
from .utils import recognize_server
__all__ = [
"fetch_daily_endpoint",
"get_daily_reward_info",
"get_claimed_rewards",
"get_monthly_rewards",
"claim_daily_reward",
]
OS_URL = "https://hk4e-api-os.mihoyo.com/event/sol/" # overseas
OS_ACT_ID = "e202102251931481"
CN_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/" # chinese
CN_ACT_ID = "e202009291139501"
class DailyRewardInfo(NamedTuple):
signed_in: bool
claimed_rewards: int
def fetch_daily_endpoint(endpoint: str, chinese: bool = False, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint for daily rewards"""
url, act_id = (CN_URL, CN_ACT_ID) if chinese else (OS_URL, OS_ACT_ID)
kwargs.setdefault("params", {})["act_id"] = act_id
url = urljoin(url, endpoint)
return fetch_endpoint(url, **kwargs)
def get_daily_reward_info(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> DailyRewardInfo:
"""Fetches daily award info for the currently logged-in user.
Returns a tuple - whether the user is logged in, how many total rewards the user has claimed so far
"""
data = fetch_daily_endpoint("info", chinese, cookie=cookie)
return DailyRewardInfo(data["is_sign"], data["total_sign_day"])
@permanent_cache("chinese", "lang")
def get_monthly_rewards(
chinese: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets a list of avalible rewards for the current month"""
return fetch_daily_endpoint("home", chinese, cookie=cookie, params=dict(lang=lang))["awards"]
def get_claimed_rewards(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Iterator[Dict[str, Any]]:
"""Gets all claimed awards for the currently logged-in user"""
current_page = 1
while True:
data = fetch_daily_endpoint(
"award", chinese, cookie=cookie, params=dict(current_page=current_page)
)["list"]
yield from data
if len(data) < 10:
break
current_page += 1
def claim_daily_reward(
uid: int = None, chinese: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Optional[Dict[str, Any]]:
"""Signs into hoyolab and claims the daily rewards.
Chinese and overseas servers work a bit differently,
so you must specify whether you want to claim rewards for chinese accounts.
When claiming rewards for other users you may add a cookie argument.
Returns the claimed reward or None if the reward cannot be claimed yet.
"""
signed_in, claimed_rewards = get_daily_reward_info(chinese, cookie)
if signed_in:
return None
params = {}
if chinese:
if uid is None:
accounts = get_game_accounts(chinese=True, cookie=cookie)
params["game_uid"] = accounts[0]["game_uid"]
params["region"] = accounts[0]["region"]
else:
params["game_uid"] = uid
params["region"] = recognize_server(uid)
params["lang"] = lang
fetch_daily_endpoint("sign", chinese, cookie=cookie, method="POST", params=params)
rewards = get_monthly_rewards(chinese, lang, cookie)
return rewards[claimed_rewards]

View File

@ -1,115 +0,0 @@
"""Genshinstats errors.
These take in only a single argument: msg.
It's possible to add retcodes and the original api response message with `.set_reponse()`.
"""
class GenshinStatsException(Exception):
"""Base Exception for all genshinstats errors."""
retcode: int = 0
orig_msg: str = ""
def __init__(self, msg: str) -> None:
self.msg = msg
def set_response(self, response: dict) -> None:
"""Adds an optional response object to the error."""
self.retcode = response["retcode"]
self.orig_msg = response["message"]
self.msg = self.msg.format(self.retcode, self.orig_msg)
@property
def msg(self) -> str:
return self.args[0]
@msg.setter
def msg(self, msg) -> None:
self.args = (msg,)
class TooManyRequests(GenshinStatsException):
"""Made too many requests and got ratelimited"""
class NotLoggedIn(GenshinStatsException):
"""Cookies have not been provided."""
class AccountNotFound(GenshinStatsException):
"""Tried to get data with an invalid uid."""
class DataNotPublic(GenshinStatsException):
"""User hasn't set their data to public."""
class CodeRedeemException(GenshinStatsException):
"""Code redemption failed."""
class SignInException(GenshinStatsException):
"""Sign-in failed"""
class AuthkeyError(GenshinStatsException):
"""Base GachaLog Exception."""
class InvalidAuthkey(AuthkeyError):
"""An authkey is invalid."""
class AuthkeyTimeout(AuthkeyError):
"""An authkey has timed out."""
class MissingAuthKey(AuthkeyError):
"""No gacha authkey was found."""
def raise_for_error(response: dict):
"""Raises a custom genshinstats error from a response."""
# every error uses a different response code and message,
# but the codes are not unique so we must check the message at some points too.
error = {
# general
10101: TooManyRequests("Cannnot get data for more than 30 accounts per cookie per day."),
-100: NotLoggedIn("Login cookies have not been provided or are incorrect."),
10001: NotLoggedIn("Login cookies have not been provided or are incorrect."),
10102: DataNotPublic("User's data is not public"),
1009: AccountNotFound("Could not find user; uid may not be valid."),
-1: GenshinStatsException("Internal database error, see original message"),
-10002: AccountNotFound(
"Cannot get rewards info. Account has no game account binded to it."
),
-108: GenshinStatsException("Language is not valid."),
10103: NotLoggedIn("Cookies are correct but do not have a hoyolab account bound to them."),
# code redemption
-2003: CodeRedeemException("Invalid redemption code"),
-2007: CodeRedeemException("You have already used a redemption code of the same kind."),
-2017: CodeRedeemException("Redemption code has been claimed already."),
-2018: CodeRedeemException("This Redemption Code is already in use"),
-2001: CodeRedeemException("Redemption code has expired."),
-2021: CodeRedeemException(
"Cannot claim codes for account with adventure rank lower than 10."
),
-1073: CodeRedeemException("Cannot claim code. Account has no game account bound to it."),
-1071: NotLoggedIn(
"Login cookies from redeem_code() have not been provided or are incorrect. "
"Make sure you use account_id and cookie_token cookies."
),
# sign in
-5003: SignInException("Already claimed daily reward today."),
2001: SignInException("Already checked into hoyolab today."),
# gacha log
-100: InvalidAuthkey("Authkey is not valid.")
if response["message"] == "authkey error"
else NotLoggedIn("Login cookies have not been provided or are incorrect."),
-101: AuthkeyTimeout(
"Authkey has timed-out. Update it by opening the history page in Genshin."
),
}.get(response["retcode"], GenshinStatsException("{} Error ({})"))
error.set_response(response)
raise error

View File

@ -1,357 +0,0 @@
"""Wrapper for the hoyolab.com gameRecord api.
Can fetch data for a user's stats like stats, characters, spiral abyss runs...
"""
import hashlib
import json
import random
import string
import time
from http.cookies import SimpleCookie
from typing import Any, Dict, List, Mapping, MutableMapping, Union
from urllib.parse import urljoin
import requests
from requests.sessions import RequestsCookieJar, Session
from .errors import NotLoggedIn, TooManyRequests, raise_for_error
from .pretty import (
prettify_abyss,
prettify_activities,
prettify_characters,
prettify_notes,
prettify_stats,
)
from .utils import USER_AGENT, is_chinese, recognize_server, retry
__all__ = [
"set_cookie",
"set_cookies",
"get_browser_cookies",
"set_cookies_auto",
"set_cookie_auto",
"fetch_endpoint",
"get_user_stats",
"get_characters",
"get_spiral_abyss",
"get_notes",
"get_activities",
"get_all_user_data",
]
session = Session()
session.headers.update(
{
# required headers
"x-rpc-app_version": "",
"x-rpc-client_type": "",
"x-rpc-language": "en-us",
# authentications headers
"ds": "",
# recommended headers
"user-agent": USER_AGENT,
}
)
cookies: List[RequestsCookieJar] = [] # a list of all avalible cookies
OS_DS_SALT = "6cqshh5dhw73bzxn20oexa9k516chk7s"
CN_DS_SALT = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
OS_TAKUMI_URL = "https://api-os-takumi.mihoyo.com/" # overseas
CN_TAKUMI_URL = "https://api-takumi.mihoyo.com/" # chinese
OS_GAME_RECORD_URL = "https://bbs-api-os.mihoyo.com/game_record/"
CN_GAME_RECORD_URL = "https://api-takumi.mihoyo.com/game_record/app/"
def set_cookie(cookie: Union[Mapping[str, Any], str] = None, **kwargs: Any) -> None:
"""Logs-in using a cookie.
Usage:
>>> set_cookie(ltuid=..., ltoken=...)
>>> set_cookie(account_id=..., cookie_token=...)
>>> set_cookie({'ltuid': ..., 'ltoken': ...})
>>> set_cookie("ltuid=...; ltoken=...")
"""
if bool(cookie) == bool(kwargs):
raise ValueError("Cannot use both positional and keyword arguments at once")
set_cookies(cookie or kwargs)
def set_cookies(*args: Union[Mapping[str, Any], str], clear: bool = True) -> None:
"""Sets multiple cookies at once to cycle between. Takes same arguments as set_cookie.
Unlike set_cookie, this function allows for multiple cookies to be used at once.
This is so far the only way to circumvent the rate limit.
If clear is set to False the previously set cookies won't be cleared.
"""
if clear:
cookies.clear()
for cookie in args:
if isinstance(cookie, Mapping):
cookie = {k: str(v) for k, v in cookie.items()} # SimpleCookie needs a string
cookie = SimpleCookie(cookie)
jar = RequestsCookieJar()
jar.update(cookie)
cookies.append(jar)
def get_browser_cookies(browser: str = None) -> Dict[str, str]:
"""Gets cookies from your browser for later storing.
If a specific browser is set, gets data from that browser only.
Avalible browsers: chrome, chromium, opera, edge, firefox
"""
try:
import browser_cookie3 # optional library
except ImportError:
raise ImportError(
"functions 'set_cookie_auto' and 'get_browser_cookie` require \"browser-cookie3\". "
'To use these function please install the dependency with "pip install browser-cookie3".'
)
load = getattr(browser_cookie3, browser.lower()) if browser else browser_cookie3.load
# For backwards compatibility we also get account_id and cookie_token
# however we can't just get every cookie because there's sensitive information
allowed_cookies = {"ltuid", "ltoken", "account_id", "cookie_token"}
return {
c.name: c.value
for domain in ("mihoyo", "hoyolab")
for c in load(domain_name=domain)
if c.name in allowed_cookies and c.value is not None
}
def set_cookie_auto(browser: str = None) -> None:
"""Like set_cookie, but gets the cookies by itself from your browser.
Requires the module browser-cookie3
Be aware that this process can take up to 10 seconds.
To speed it up you may select a browser.
If a specific browser is set, gets data from that browser only.
Avalible browsers: chrome, chromium, opera, edge, firefox
"""
set_cookies(get_browser_cookies(browser), clear=True)
set_cookies_auto = set_cookie_auto # alias
def generate_ds(salt: str) -> str:
"""Creates a new ds for authentication."""
t = int(time.time()) # current seconds
r = "".join(random.choices(string.ascii_letters, k=6)) # 6 random chars
h = hashlib.md5(f"salt={salt}&t={t}&r={r}".encode()).hexdigest() # hash and get hex
return f"{t},{r},{h}"
def generate_cn_ds(salt: str, body: Any = None, query: Mapping[str, Any] = None) -> str:
"""Creates a new chinese ds for authentication."""
t = int(time.time())
r = random.randint(100001, 200000)
b = json.dumps(body) if body else ""
q = "&".join(f"{k}={v}" for k, v in sorted(query.items())) if query else ""
h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={b}&q={q}".encode()).hexdigest()
return f"{t},{r},{h}"
# sometimes a random connection error can just occur, mihoyo being mihoyo
@retry(3, requests.ConnectionError)
def _request(*args: Any, **kwargs: Any) -> Any:
"""Fancy requests.request"""
r = session.request(*args, **kwargs)
r.raise_for_status()
kwargs["cookies"].update(session.cookies)
session.cookies.clear()
data = r.json()
if data["retcode"] == 0:
return data["data"]
raise_for_error(data)
def fetch_endpoint(
endpoint: str, chinese: bool = False, cookie: Mapping[str, Any] = None, **kwargs
) -> Dict[str, Any]:
"""Fetch an enpoint from the API.
Takes in an endpoint url which is joined with the base url.
A request is then sent and returns a parsed response.
Includes error handling and ds token renewal.
Can specifically use the chinese base url and request data for chinese users,
but that requires being logged in as that user.
Supports handling ratelimits if multiple cookies are set with `set_cookies`
"""
# parse the arguments for requests.request
kwargs.setdefault("headers", {})
method = kwargs.pop("method", "get")
if chinese:
kwargs["headers"].update(
{
"ds": generate_cn_ds(CN_DS_SALT, kwargs.get("json"), kwargs.get("params")),
"x-rpc-app_version": "2.11.1",
"x-rpc-client_type": "5",
}
)
url = urljoin(CN_TAKUMI_URL, endpoint)
else:
kwargs["headers"].update(
{
"ds": generate_ds(OS_DS_SALT),
"x-rpc-app_version": "1.5.0",
"x-rpc-client_type": "4",
}
)
url = urljoin(OS_TAKUMI_URL, endpoint)
if cookie is not None:
if not isinstance(cookie, MutableMapping) or not all(
isinstance(v, str) for v in cookie.values()
):
cookie = {k: str(v) for k, v in cookie.items()}
return _request(method, url, cookies=cookie, **kwargs)
elif len(cookies) == 0:
raise NotLoggedIn("Login cookies have not been provided")
for cookie in cookies.copy():
try:
return _request(method, url, cookies=cookie, **kwargs)
except TooManyRequests:
# move the ratelimited cookie to the end to let the ratelimit wear off
cookies.append(cookies.pop(0))
# if we're here it means we used up all our cookies so we must handle that
if len(cookies) == 1:
raise TooManyRequests("Cannnot get data for more than 30 accounts per day.")
else:
raise TooManyRequests("All cookies have hit their request limit of 30 accounts per day.")
def fetch_game_record_endpoint(
endpoint: str, chinese: bool = False, cookie: Mapping[str, Any] = None, **kwargs
):
"""A short-hand for fetching data for the game record"""
base_url = CN_GAME_RECORD_URL if chinese else OS_GAME_RECORD_URL
url = urljoin(base_url, endpoint)
return fetch_endpoint(url, chinese, cookie, **kwargs)
def get_user_stats(
uid: int, equipment: bool = False, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets basic user information and stats.
If equipment is True an additional request will be made to get the character equipment
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/index",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
data = prettify_stats(data)
if equipment:
data["characters"] = get_characters(
uid, [i["id"] for i in data["characters"]], lang, cookie
)
return data
def get_characters(
uid: int, character_ids: List[int] = None, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets characters of a user.
Characters contain info about their level, constellation, weapon, and artifacts.
Talents are not included.
If character_ids are provided then only characters with those ids are returned.
"""
if character_ids is None:
character_ids = [i["id"] for i in get_user_stats(uid)["characters"]]
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/character",
chinese=is_chinese(uid),
cookie=cookie,
method="POST",
json=dict(
character_ids=character_ids, role_id=uid, server=server
), # POST uses the body instead
headers={"x-rpc-language": lang},
)["avatars"]
return prettify_characters(data)
def get_spiral_abyss(
uid: int, previous: bool = False, cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets spiral abyss runs of a user and details about them.
Every season these stats refresh and you can get the previous stats with `previous`.
"""
server = recognize_server(uid)
schedule_type = 2 if previous else 1
data = fetch_game_record_endpoint(
"genshin/api/spiralAbyss",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid, schedule_type=schedule_type),
)
return prettify_abyss(data)
def get_activities(
uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Gets the activities of the user
As of this time only Hyakunin Ikki is availible.
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/activities",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
return prettify_activities(data)
def get_notes(uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None) -> Dict[str, Any]:
"""Gets the real-time notes of the user
Contains current resin, expeditions, daily commissions and similar.
"""
server = recognize_server(uid)
data = fetch_game_record_endpoint(
"genshin/api/dailyNote",
chinese=is_chinese(uid),
cookie=cookie,
params=dict(server=server, role_id=uid),
headers={"x-rpc-language": lang},
)
return prettify_notes(data)
def get_all_user_data(
uid: int, lang: str = "en-us", cookie: Mapping[str, Any] = None
) -> Dict[str, Any]:
"""Fetches all data a user can has. Very slow.
A helper function that gets all avalible data for a user and returns it as one dict.
However that makes it fairly slow so it's not recommended to use it outside caching.
"""
data = get_user_stats(uid, equipment=True, lang=lang, cookie=cookie)
data["spiral_abyss"] = [get_spiral_abyss(uid, previous, cookie) for previous in [False, True]]
return data

View File

@ -1,175 +0,0 @@
"""Wrapper for the hoyolab.com community api.
Can search users, get record cards, redeem codes...
"""
import time
from typing import Any, Dict, List, Mapping, Optional
from .caching import permanent_cache
from .genshinstats import fetch_endpoint, fetch_game_record_endpoint
from .pretty import prettify_game_accounts
from .utils import deprecated, recognize_server
__all__ = [
"get_langs",
"search",
"set_visibility",
"hoyolab_check_in",
"get_game_accounts",
"get_record_card",
"get_uid_from_hoyolab_uid",
"redeem_code",
"get_recommended_users",
"get_hot_posts",
]
@permanent_cache()
def get_langs() -> Dict[str, str]:
"""Gets codes of all languages and their names"""
data = fetch_endpoint("community/misc/wapi/langs", cookie={}, params=dict(gids=2))["langs"]
return {i["value"]: i["name"] for i in data}
def search(keyword: str, size: int = 20, chinese: bool = False) -> List[Dict[str, Any]]:
"""Searches all users.
Can return up to 20 results, based on size.
"""
url = (
"https://bbs-api.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/community/"
)
return fetch_endpoint(
url + "apihub/wapi/search",
cookie={}, # this endpoint does not require cookies
params=dict(keyword=keyword, size=size, gids=2),
)["users"]
def set_visibility(public: bool, cookie: Mapping[str, Any] = None) -> None:
"""Sets your data to public or private."""
fetch_endpoint(
"game_record/card/wapi/publishGameRecord",
cookie=cookie,
method="POST",
# what's game_id about???
json=dict(is_public=public, game_id=2),
)
@deprecated()
def hoyolab_check_in(chinese: bool = False, cookie: Mapping[str, Any] = None) -> None:
"""Checks in the currently logged-in user to hoyolab.
This function will not claim daily rewards!!!
"""
url = (
"https://bbs-api.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/community/"
)
fetch_endpoint(
url + "apihub/api/signIn", chinese=chinese, cookie=cookie, method="POST", json=dict(gids=2)
)
def get_game_accounts(
chinese: bool = False, cookie: Mapping[str, Any] = None
) -> List[Dict[str, Any]]:
"""Gets all game accounts of the currently signed in player.
Can get accounts both for overseas and china.
"""
url = "https://api-takumi.mihoyo.com/" if chinese else "https://api-os-takumi.mihoyo.com/"
data = fetch_endpoint(url + "binding/api/getUserGameRolesByCookie", cookie=cookie)["list"]
return prettify_game_accounts(data)
def get_record_card(
hoyolab_uid: int, chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Optional[Dict[str, Any]]:
"""Gets a game record card of a user based on their hoyolab uid.
A record card contains data regarding the stats of a user for their displayed server.
Their uid for a given server is also included.
In case the user hasn't set their data to public or you are ratelimited the function returns None.
You can get a hoyolab id with `search`.
"""
cards = fetch_game_record_endpoint(
"card/wapi/getGameRecordCard",
chinese=chinese,
cookie=cookie,
params=dict(uid=hoyolab_uid, gids=2),
)["list"]
return cards[0] if cards else None
def get_uid_from_hoyolab_uid(
hoyolab_uid: int, chinese: bool = False, cookie: Mapping[str, Any] = None
) -> Optional[int]:
"""Gets a uid with a community uid.
This is so it's possible to search a user and then directly get the uid.
In case the uid is private, returns None.
"""
card = get_record_card(hoyolab_uid, chinese, cookie)
return int(card["game_role_id"]) if card else None
def redeem_code(code: str, uid: int = None, cookie: Mapping[str, Any] = None) -> None:
"""Redeems a gift code for the currently signed in user.
Api endpoint for https://genshin.mihoyo.com/en/gift.
!!! This function requires account_id and cookie_token cookies !!!
The code will be redeemed for every avalible account,
specifying the uid will claim it only for that account.
Returns the amount of users it managed to claim codes for.
You can claim codes only every 5s so you must sleep between claims.
The function sleeps for you when claiming for every account
but you must sleep yourself when passing in a uid or when an error is encountered.
Currently codes can only be claimed for overseas accounts, not chinese.
"""
if uid is not None:
fetch_endpoint(
"https://hk4e-api-os.mihoyo.com/common/apicdkey/api/webExchangeCdkey",
cookie=cookie,
params=dict(
uid=uid, region=recognize_server(uid), cdkey=code, game_biz="hk4e_global", lang="en"
),
)
else:
# cannot claim codes for accounts with ar lower than 10
accounts = [
account for account in get_game_accounts(cookie=cookie) if account["level"] >= 10
]
for i, account in enumerate(accounts):
if i:
time.sleep(5) # there's a ratelimit of 1 request every 5 seconds
redeem_code(code, account["uid"], cookie)
def get_recommended_users(page_size: int = None) -> List[Dict[str, Any]]:
"""Gets a list of recommended active users"""
return fetch_endpoint(
"community/user/wapi/recommendActive",
cookie={},
params=dict(page_size=page_size or 0x10000, offset=0, gids=2),
)["list"]
def get_hot_posts(forum_id: int = 1, size: int = 100, lang: str = "en-us") -> List[Dict[str, Any]]:
"""Fetches hot posts from the front page of hoyolabs
Posts are split into different forums set by ids 1-5.
There may be less posts returned than size.
"""
# the api is physically unable to return more than 2 ^ 24 bytes
# that's around 2 ^ 15 posts so we limit the amount to 2 ^ 14
# the user shouldn't be getting that many posts in the first place
return fetch_endpoint(
"community/post/api/forumHotPostFullList",
cookie={},
params=dict(forum_id=forum_id, page_size=min(size, 0x4000), lang=lang),
)["posts"]

View File

@ -1,84 +0,0 @@
"""The official genshin map
Gets data from the official genshin map such as categories, points and similar.
"""
import json
from typing import Any, Dict, List
from urllib.parse import urljoin
from .caching import permanent_cache
from .genshinstats import fetch_endpoint
OS_MAP_URL = "https://api-os-takumi-static.mihoyo.com/common/map_user/ys_obc/v1/map/"
__all__ = [
"fetch_map_endpoint",
"get_map_image",
"get_map_icons",
"get_map_labels",
"get_map_locations",
"get_map_points",
"get_map_tile",
]
def fetch_map_endpoint(endpoint: str, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's webstatic map api.
Only currently liyue is supported.
Takes in an endpoint url which is joined with the base url.
A request is then sent and returns a parsed response.
"""
kwargs.setdefault("params", {}).update({"map_id": 2, "app_sn": "ys_obc", "lang": "en-us"})
url = urljoin(OS_MAP_URL, endpoint)
return fetch_endpoint(url, cookie={}, **kwargs)
@permanent_cache()
def get_map_image() -> str:
"""Get the url to the entire map image"""
data = fetch_map_endpoint("info")["info"]["detail"]
return json.loads(data)["slices"][0][0]["url"]
@permanent_cache()
def get_map_icons() -> Dict[int, str]:
"""Get all icons for the map"""
data = fetch_map_endpoint("spot_kind/get_icon_list")["icons"]
return {i["id"]: i["url"] for i in data}
@permanent_cache()
def get_map_labels() -> List[Dict[str, Any]]:
"""Get labels and label categories"""
return fetch_map_endpoint("label/tree")["tree"]
def get_map_locations() -> List[Dict[str, Any]]:
"""Get all locations on the map"""
return fetch_map_endpoint("map_anchor/list")["list"]
def get_map_points() -> List[Dict[str, Any]]:
"""Get points on the map"""
return fetch_map_endpoint("point/list")["point_list"]
def get_map_tile(
x: int, y: int, width: int, height: int, resolution: int = 1, image: str = None
) -> str:
"""Gets a map tile at a position
You may set an x, y, width and height of the resulting image
however you shoudl prefer to use multiples of 256 because they are cached
on the mihoyo servers.
Resolution dictates the resolution of the image as a percentage. 100 is highest and 0 is lowest.
You should pick values from 100, 50, 25 and 12.5
"""
image = image or get_map_image()
return (
image
+ f"?x-oss-process=image/resize,p_{round(resolution)}/crop,x_{x},y_{y},w_{width},h_{height}"
)

View File

@ -1,462 +0,0 @@
"""Prettifiers for genshinstats api returns.
Fixes the huge problem of outdated field names in the api,
that were leftover from during development
"""
import re
from datetime import datetime
character_icons = {
"PlayerGirl": "Traveler",
"PlayerBoy": "Traveler",
"Ambor": "Amber",
"Qin": "Jean",
"Hutao": "Hu Tao",
"Feiyan": "Yanfei",
"Kazuha": "Kadehara Kazuha",
"Sara": "Kujou Sara",
"Shougun": "Raiden Shogun",
"Tohma": "Thoma",
}
def _recognize_character_icon(url: str) -> str:
"""Recognizes a character's icon url and returns its name."""
exp = r"game_record/genshin/character_.*_(\w+)(?:@\dx)?.png"
match = re.search(exp, url)
if match is None:
raise ValueError(f"{url!r} is not a character icon or image url")
character = match.group(1)
return character_icons.get(character) or character
def prettify_stats(data):
s = data["stats"]
h = data["homes"][0] if data["homes"] else None
return {
"stats": {
"achievements": s["achievement_number"],
"active_days": s["active_day_number"],
"characters": s["avatar_number"],
"spiral_abyss": s["spiral_abyss"],
"anemoculi": s["anemoculus_number"],
"geoculi": s["geoculus_number"],
"electroculi": s["electroculus_number"],
"common_chests": s["common_chest_number"],
"exquisite_chests": s["exquisite_chest_number"],
"precious_chests": s["precious_chest_number"],
"luxurious_chests": s["luxurious_chest_number"],
"unlocked_waypoints": s["way_point_number"],
"unlocked_domains": s["domain_number"],
},
"characters": [
{
"name": i["name"],
"rarity": i["rarity"]
if i["rarity"] < 100
else i["rarity"] - 100, # aloy has 105 stars
"element": i["element"],
"level": i["level"],
"friendship": i["fetter"],
"icon": i["image"],
"id": i["id"],
}
for i in data["avatars"]
],
"teapot": {
# only unique data between realms are names and icons
"realms": [{"name": s["name"], "icon": s["icon"]} for s in data["homes"]],
"level": h["level"],
"comfort": h["comfort_num"],
"comfort_name": h["comfort_level_name"],
"comfort_icon": h["comfort_level_icon"],
"items": h["item_num"],
"visitors": h["visit_num"], # currently not in use
}
if h
else None,
"explorations": [
{
"name": i["name"],
"explored": round(i["exploration_percentage"] / 10, 1),
"type": i["type"],
"level": i["level"],
"icon": i["icon"],
"offerings": i["offerings"],
}
for i in data["world_explorations"]
],
}
def prettify_characters(data):
return [
{
"name": i["name"],
"rarity": i["rarity"] if i["rarity"] < 100 else i["rarity"] - 100, # aloy has 105 stars
"element": i["element"],
"level": i["level"],
"friendship": i["fetter"],
"constellation": sum(c["is_actived"] for c in i["constellations"]),
"icon": i["icon"],
"image": i["image"],
"id": i["id"],
"collab": i["rarity"] >= 100,
**(
{"traveler_name": "Aether" if "Boy" in i["icon"] else "Lumine"}
if "Player" in i["icon"]
else {}
),
"weapon": {
"name": i["weapon"]["name"],
"rarity": i["weapon"]["rarity"],
"type": i["weapon"]["type_name"],
"level": i["weapon"]["level"],
"ascension": i["weapon"]["promote_level"],
"refinement": i["weapon"]["affix_level"],
"description": i["weapon"]["desc"],
"icon": i["weapon"]["icon"],
"id": i["weapon"]["id"],
},
"artifacts": [
{
"name": a["name"],
"pos_name": {
1: "flower",
2: "feather",
3: "hourglass",
4: "goblet",
5: "crown",
}[a["pos"]],
"full_pos_name": a["pos_name"],
"pos": a["pos"],
"rarity": a["rarity"],
"level": a["level"],
"set": {
"name": a["set"]["name"],
"effect_type": ["none", "single", "classic"][len(a["set"]["affixes"])],
"effects": [
{
"pieces": e["activation_number"],
"effect": e["effect"],
}
for e in a["set"]["affixes"]
],
"set_id": int(re.search(r"UI_RelicIcon_(\d+)_\d+", a["icon"]).group(1)), # type: ignore
"id": a["set"]["id"],
},
"icon": a["icon"],
"id": a["id"],
}
for a in i["reliquaries"]
],
"constellations": [
{
"name": c["name"],
"effect": c["effect"],
"is_activated": c["is_actived"],
"index": c["pos"],
"icon": c["icon"],
"id": c["id"],
}
for c in i["constellations"]
],
"outfits": [
{"name": c["name"], "icon": c["icon"], "id": c["id"]} for c in i["costumes"]
],
}
for i in data
]
def prettify_abyss(data):
fchars = lambda d: [
{
"value": a["value"],
"name": _recognize_character_icon(a["avatar_icon"]),
"rarity": a["rarity"] if a["rarity"] < 100 else a["rarity"] - 100, # aloy has 105 stars
"icon": a["avatar_icon"],
"id": a["avatar_id"],
}
for a in d
]
todate = lambda x: datetime.fromtimestamp(int(x)).strftime("%Y-%m-%d")
totime = lambda x: datetime.fromtimestamp(int(x)).isoformat(" ")
return {
"season": data["schedule_id"],
"season_start_time": todate(data["start_time"]),
"season_end_time": todate(data["end_time"]),
"stats": {
"total_battles": data["total_battle_times"],
"total_wins": data["total_win_times"],
"max_floor": data["max_floor"],
"total_stars": data["total_star"],
},
"character_ranks": {
"most_played": fchars(data["reveal_rank"]),
"most_kills": fchars(data["defeat_rank"]),
"strongest_strike": fchars(data["damage_rank"]),
"most_damage_taken": fchars(data["take_damage_rank"]),
"most_bursts_used": fchars(data["normal_skill_rank"]),
"most_skills_used": fchars(data["energy_skill_rank"]),
},
"floors": [
{
"floor": f["index"],
"stars": f["star"],
"max_stars": f["max_star"],
"icon": f["icon"],
"chambers": [
{
"chamber": l["index"],
"stars": l["star"],
"max_stars": l["max_star"],
"has_halves": len(l["battles"]) == 2,
"battles": [
{
"half": b["index"],
"timestamp": totime(b["timestamp"]),
"characters": [
{
"name": _recognize_character_icon(c["icon"]),
"rarity": c["rarity"]
if c["rarity"] < 100
else c["rarity"] - 100, # aloy has 105 stars
"level": c["level"],
"icon": c["icon"],
"id": c["id"],
}
for c in b["avatars"]
],
}
for b in l["battles"]
],
}
for l in f["levels"]
],
}
for f in data["floors"]
],
}
def prettify_activities(data):
activities = {
k: v if v.get("exists_data") else {"records": []}
for activity in data["activities"]
for k, v in activity.items()
}
return {
"hyakunin": [
{
"id": r["challenge_id"],
"name": r["challenge_name"],
"difficulty": r["difficulty"],
"medal_icon": r["heraldry_icon"],
"score": r["max_score"],
"multiplier": r["score_multiple"],
"lineups": [
{
"characters": [
{
"name": _recognize_character_icon(c["icon"]),
"rarity": c["rarity"]
if c["rarity"] < 100
else c["rarity"] - 100, # aloy has 105 stars
"level": c["level"],
"icon": c["icon"],
"id": c["id"],
"trial": c["is_trail_avatar"],
}
for c in l["avatars"]
],
"skills": [
{"name": s["name"], "desc": s["desc"], "icon": s["icon"], "id": s["id"]}
for s in l["skills"]
],
}
for l in r["lineups"]
],
}
for r in activities["sumo"]["records"]
],
"labyrinth": None,
}
def prettify_notes(data):
return {
"resin": data["current_resin"],
"until_resin_limit": data["resin_recovery_time"],
"max_resin": data["max_resin"],
"total_commissions": data["total_task_num"],
"completed_commissions": data["finished_task_num"],
"claimed_commission_reward": data["is_extra_task_reward_received"],
"max_boss_discounts": data["resin_discount_num_limit"],
"remaining_boss_discounts": data["remain_resin_discount_num"],
"expeditions": [
{
"icon": exp["avatar_side_icon"],
"remaining_time": exp["remained_time"],
"status": exp["status"],
}
for exp in data["expeditions"]
],
"max_expeditions": data["max_expedition_num"],
"realm_currency": data["current_home_coin"],
"max_realm_currency": data["max_home_coin"],
"until_realm_currency_limit": data["home_coin_recovery_time"],
}
def prettify_game_accounts(data):
return [
{
"uid": int(a["game_uid"]),
"server": a["region_name"],
"level": a["level"],
"nickname": a["nickname"],
# idk what these are for:
"biz": a["game_biz"],
"is_chosen": a["is_chosen"],
"is_official": a["is_official"],
}
for a in data
]
def prettify_wish_history(data, banner_name=None):
return [
{
"type": i["item_type"],
"name": i["name"],
"rarity": int(i["rank_type"]),
"time": i["time"],
"id": int(i["id"]),
"banner": banner_name,
"banner_type": int(i["gacha_type"]),
"uid": int(i["uid"]),
}
for i in data
]
def prettify_gacha_items(data):
return [
{
"name": i["name"],
"type": i["item_type"],
"rarity": int(i["rank_type"]),
"id": 10000000 + int(i["item_id"]) - 1000
if len(i["item_id"]) == 4
else int(i["item_id"]),
}
for i in data
]
def prettify_banner_details(data):
per = lambda p: None if p == "0%" else float(p[:-1].replace(",", "."))
fprobs = (
lambda l: [
{
"type": i["item_type"],
"name": i["item_name"],
"rarity": int(i["rank"]),
"is_up": bool(i["is_up"]),
"order_value": i["order_value"],
}
for i in l
]
if l
else []
)
fitems = (
lambda l: [
{
"type": i["item_type"],
"name": i["item_name"],
"element": {
"": "Anemo",
"": "Pyro",
"": "Hydro",
"": "Electro",
"": "Cryo",
"": "Geo",
"": "Dendro",
"": None,
}[i["item_attr"]],
"icon": i["item_img"],
}
for i in l
]
if l
else []
)
return {
"banner_type_name": {
100: "Novice Wishes",
200: "Permanent Wish",
301: "Character Event Wish",
302: "Weapon Event Wish",
}[int(data["gacha_type"])],
"banner_type": int(data["gacha_type"]),
"banner": re.sub(r"<.*?>", "", data["title"]).strip(),
"title": data["title"],
"content": data["content"],
"date_range": data["date_range"],
"r5_up_prob": per(data["r5_up_prob"]), # probability for rate-up 5*
"r4_up_prob": per(data["r4_up_prob"]), # probability for rate-up 4*
"r5_prob": per(data["r5_prob"]), # probability for 5*
"r4_prob": per(data["r4_prob"]), # probability for 4*
"r3_prob": per(data["r3_prob"]), # probability for 3*
"r5_guarantee_prob": per(data["r5_baodi_prob"]), # probability for 5* incl. guarantee
"r4_guarantee_prob": per(data["r4_baodi_prob"]), # probability for 4* incl. guarantee
"r3_guarantee_prob": per(data["r3_baodi_prob"]), # probability for 3* incl. guarantee
"r5_up_items": fitems(
data["r5_up_items"]
), # list of 5* rate-up items that you can get from banner
"r4_up_items": fitems(
data["r4_up_items"]
), # list of 4* rate-up items that you can get from banner
"r5_items": fprobs(data["r5_prob_list"]), # list 5* of items that you can get from banner
"r4_items": fprobs(data["r4_prob_list"]), # list 4* of items that you can get from banner
"r3_items": fprobs(data["r3_prob_list"]), # list 3* of items that you can get from banner
"items": fprobs(
sorted(
data["r5_prob_list"] + data["r4_prob_list"] + data["r3_prob_list"],
key=lambda x: x["order_value"],
)
),
}
def prettify_trans(data, reasons={}):
if data and "name" in data[0]:
# transaction item
return [
{
"time": i["time"],
"name": i["name"],
"rarity": int(i["rank"]),
"amount": int(i["add_num"]),
"reason": reasons.get(int(i["reason"]), ""),
"reason_id": int(i["reason"]),
"uid": int(i["uid"]),
"id": int(i["id"]),
}
for i in data
]
else:
# transaction
return [
{
"time": i["time"],
"amount": int(i["add_num"]),
"reason": reasons.get(int(i["reason"]), ""),
"reason_id": int(i["reason"]),
"uid": int(i["uid"]),
"id": int(i["id"]),
}
for i in data
]

View File

@ -1,194 +0,0 @@
"""Logs for currency "transactions".
Logs for artifact, weapon, resin, genesis crystol and primogem "transactions".
You may view a history of everything you have gained in the last 3 months.
"""
import math
import sys
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
from .caching import permanent_cache
from .pretty import prettify_trans
from .wishes import fetch_gacha_endpoint, static_session
__all__ = [
"fetch_transaction_endpoint",
"get_primogem_log",
"get_resin_log",
"get_crystal_log",
"get_artifact_log",
"get_weapon_log",
"current_resin",
"approximate_current_resin",
]
YSULOG_URL = "https://hk4e-api-os.mihoyo.com/ysulog/api/"
def fetch_transaction_endpoint(
endpoint: str, authkey: Optional[str] = None, **kwargs
) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's transaction logs api.
Takes in an endpoint url which is joined with the base url.
If an authkey is provided, it uses that authkey specifically.
A request is then sent and returns a parsed response.
"""
url = urljoin(YSULOG_URL, endpoint)
return fetch_gacha_endpoint(url, authkey, **kwargs)
@permanent_cache("lang")
def _get_reasons(lang: str = "en-us") -> Dict[int, str]:
r = static_session.get(
f"https://mi18n-os.mihoyo.com/webstatic/admin/mi18n/hk4e_global/m02251421001311/m02251421001311-{lang}.json"
)
r.raise_for_status()
data = r.json()
return {
int(k.split("_")[-1]): v
for k, v in data.items()
if k.startswith("selfinquiry_general_reason_")
}
def _get_transactions(
endpoint: str, size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""A paginator that uses mihoyo's id paginator algorithm to yield pages"""
if size is not None and size <= 0:
return
page_size = 20
size = size or sys.maxsize
while True:
data = fetch_transaction_endpoint(
endpoint, authkey=authkey, params=dict(size=min(page_size, size), end_id=end_id)
)["list"]
data = prettify_trans(data, _get_reasons(lang))
yield from data
size -= page_size
if len(data) < page_size or size <= 0:
break
end_id = data[-1]["id"]
def get_primogem_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Gets all transactions of primogems
This means stuff like getting primogems from rewards and explorations or making wishes.
Records go only 3 months back.
"""
return _get_transactions("getPrimogemLog", size, authkey, lang, end_id)
def get_crystal_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get all transactions of genesis crystals
Records go only 3 months back.
"""
return _get_transactions("getCrystalLog", size, authkey, lang, end_id)
def get_resin_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Gets all usage of resin
This means using them in ley lines, domains, crafting and weekly bosses.
Records go only 3 months back.
"""
return _get_transactions("getResinLog", size, authkey, lang, end_id)
def get_artifact_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get the log of all artifacts gotten or destroyed in the last 3 months"""
return _get_transactions("getArtifactLog", size, authkey, lang, end_id)
def get_weapon_log(
size: int = None, authkey: str = None, lang: str = "en-us", end_id: int = 0
) -> Iterator[Dict[str, Any]]:
"""Get the log of all weapons gotten or destroyed in the last 3 months"""
return _get_transactions("getWeaponLog", size, authkey, lang, end_id)
def current_resin(
last_resin_time: datetime,
last_resin_amount: float,
current_time: datetime = None,
authkey: str = None,
):
"""Gets the current resin based off an amount of resin you've had at any time before
Works by getting all usages after the last resin time and emulating how the resin would be generated.
Keep in mind that this approach works only if the user hasn't played in the last hour.
"""
current_time = current_time or datetime.utcnow()
resin_usage: List[Dict[str, Any]] = [{"time": str(current_time), "amount": 0}]
for usage in get_resin_log(authkey=authkey):
if datetime.fromisoformat(usage["time"]) < last_resin_time:
break
resin_usage.append(usage)
resin_usage.reverse()
resin = last_resin_amount
for usage in resin_usage:
usage_time = datetime.fromisoformat(usage["time"])
recovered_resin = (usage_time - last_resin_time).total_seconds() / (8 * 60)
resin = min(resin + recovered_resin, 160) + usage["amount"]
last_resin_time = usage_time
# better raise an error than to leave users confused
if resin < 0:
raise ValueError("Last resin time is wrong or amount is too low")
return resin
def approximate_current_resin(time: datetime = None, authkey: str = None):
"""Roughly approximates how much resin using a minmax calculation
The result can have an offset of around 5 resin in some cases.
"""
# if any algorithm peeps can help with this one I'd appreciate it
recovery_rate = 8 * 60
current_max = shadow_max = 160.0
current_min = shadow_min = 0.0
time = time or datetime.utcnow()
last_amount = 0
for usage in get_resin_log(authkey=authkey):
usage_time = datetime.fromisoformat(usage["time"])
if time < usage_time:
continue
amount_recovered = (time - usage_time).total_seconds() / recovery_rate
cur_amount: int = usage["amount"]
shadow_max += cur_amount + amount_recovered
shadow_min += last_amount + amount_recovered
current_max = max(current_min, min(current_max, shadow_max))
current_min = max(current_min, min(current_max, shadow_min))
time = usage_time
last_amount = usage["amount"]
if math.isclose(current_max, current_min):
break
resin = (current_max + current_min) / 2
return resin

View File

@ -1,120 +0,0 @@
"""Various utility functions for genshinstats."""
import inspect
import os.path
import re
import warnings
from functools import wraps
from typing import Callable, Iterable, Optional, Type, TypeVar, Union
from .errors import AccountNotFound
__all__ = [
"USER_AGENT",
"recognize_server",
"recognize_id",
"is_game_uid",
"is_chinese",
"get_logfile",
]
T = TypeVar("T")
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
def recognize_server(uid: int) -> str:
"""Recognizes which server a UID is from."""
server = {
"1": "cn_gf01",
"2": "cn_gf01",
"5": "cn_qd01",
"6": "os_usa",
"7": "os_euro",
"8": "os_asia",
"9": "os_cht",
}.get(str(uid)[0])
if server:
return server
else:
raise AccountNotFound(f"UID {uid} isn't associated with any server")
def recognize_id(id: int) -> Optional[str]:
"""Attempts to recognize what item type an id is"""
if 10000000 < id < 20000000:
return "character"
elif 1000000 < id < 10000000:
return "artifact_set"
elif 100000 < id < 1000000:
return "outfit"
elif 50000 < id < 100000:
return "artifact"
elif 10000 < id < 50000:
return "weapon"
elif 100 < id < 1000:
return "constellation"
elif 10 ** 17 < id < 10 ** 19:
return "transaction"
# not sure about these ones:
elif 1 <= id <= 4:
return "exploration"
else:
return None
def is_game_uid(uid: int) -> bool:
"""Recognizes whether the uid is a game uid."""
return bool(re.fullmatch(r"[6789]\d{8}", str(uid)))
def is_chinese(x: Union[int, str]) -> bool:
"""Recognizes whether the server/uid is chinese."""
return str(x).startswith(("cn", "1", "5"))
def get_logfile() -> Optional[str]:
"""Find and return the Genshin Impact logfile. None if not found."""
mihoyo_dir = os.path.expanduser("~/AppData/LocalLow/miHoYo/")
for name in ["Genshin Impact", "原神", "YuanShen"]:
output_log = os.path.join(mihoyo_dir, name, "output_log.txt")
if os.path.isfile(output_log):
return output_log
return None # no genshin installation
def retry(
tries: int = 3,
exceptions: Union[Type[BaseException], Iterable[Type[BaseException]]] = Exception,
) -> Callable[[T], T]:
"""A classic retry() decorator"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
for _ in range(tries):
try:
return func(*args, **kwargs)
except exceptions as e:
exc = e
else:
raise Exception(f"Maximum tries ({tries}) exceeded: {exc}") from exc # type: ignore
return inner
return wrapper # type: ignore
def deprecated(
message: str = "{} is deprecated and will be removed in future versions",
) -> Callable[[T], T]:
"""Shows a warning when a function is attempted to be used"""
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
warnings.warn(message.format(func.__name__), PendingDeprecationWarning)
return func(*args, **kwargs)
return inner
return wrapper # type: ignore

View File

@ -1,277 +0,0 @@
"""Genshin Impact wish history.
Gets wish history from the current banners in a clean api.
Requires an authkey that is fetched automatically from a logfile.
"""
import base64
import heapq
import os
import re
import sys
from itertools import chain, islice
from tempfile import gettempdir
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import unquote, urljoin
from requests import Session
from .errors import AuthkeyError, MissingAuthKey, raise_for_error
from .pretty import *
from .utils import USER_AGENT, get_logfile
from .caching import permanent_cache
__all__ = [
"extract_authkey",
"get_authkey",
"set_authkey",
"get_banner_ids",
"fetch_gacha_endpoint",
"get_banner_types",
"get_wish_history",
"get_gacha_items",
"get_banner_details",
"get_uid_from_authkey",
"validate_authkey",
]
GENSHIN_LOG = get_logfile()
GACHA_INFO_URL = "https://hk4e-api-os.mihoyo.com/event/gacha_info/api/"
AUTHKEY_FILE = os.path.join(gettempdir(), "genshinstats_authkey.txt")
session = Session()
session.headers.update(
{
# recommended header
"user-agent": USER_AGENT
}
)
session.params = {
# required params
"authkey_ver": "1",
"lang": "en",
# authentications params
"authkey": "",
# transaction params
"sign_type": "2",
}
static_session = Session() # extra session for static resources
def _get_short_lang_code(lang: str) -> str:
"""Returns an alternative short lang code"""
return lang if "zh" in lang else lang.split("-")[0]
def _read_logfile(logfile: str = None) -> str:
"""Returns the contents of a logfile"""
if GENSHIN_LOG is None:
raise FileNotFoundError("No Genshin Installation was found, could not get gacha data.")
with open(logfile or GENSHIN_LOG) as file:
return file.read()
def extract_authkey(string: str) -> Optional[str]:
"""Extracts an authkey from the provided string. Returns None if not found."""
match = re.search(r"https://.+?authkey=([^&#]+)", string, re.MULTILINE)
if match is not None:
return unquote(match.group(1))
return None
def get_authkey(logfile: str = None) -> str:
"""Gets the query for log requests.
This will either be done from the logs or from a tempfile.
"""
# first try the log
authkey = extract_authkey(_read_logfile(logfile))
if authkey is not None:
with open(AUTHKEY_FILE, "w") as file:
file.write(authkey)
return authkey
# otherwise try the tempfile (may be expired!)
if os.path.isfile(AUTHKEY_FILE):
with open(AUTHKEY_FILE) as file:
return file.read()
raise MissingAuthKey(
"No authkey could be found in the logs or in a tempfile. "
"Open the history in-game first before attempting to request it."
)
def set_authkey(authkey: str = None) -> None:
"""Sets an authkey for log requests.
You may pass in an authkey, a url with an authkey
or a path to a logfile with the authkey.
"""
if authkey is None or os.path.isfile(authkey):
authkey = get_authkey(authkey)
else:
authkey = extract_authkey(authkey) or authkey
session.params["authkey"] = authkey # type: ignore
def get_banner_ids(logfile: str = None) -> List[str]:
"""Gets all banner ids from a log file.
You need to open the details of all banners for this to work.
"""
log = _read_logfile(logfile)
ids = re.findall(r"OnGetWebViewPageFinish:https://.+?gacha_id=([^&#]+)", log)
return list(set(ids))
def fetch_gacha_endpoint(endpoint: str, authkey: str = None, **kwargs) -> Dict[str, Any]:
"""Fetch an enpoint from mihoyo's gacha info.
Takes in an endpoint url which is joined with the base url.
If an authkey is provided, it uses that authkey specifically.
A request is then sent and returns a parsed response.
Includes error handling and getting the authkey.
"""
if authkey is None:
session.params["authkey"] = session.params["authkey"] or get_authkey() # type: ignore
else:
kwargs.setdefault("params", {})["authkey"] = authkey
method = kwargs.pop("method", "get")
url = urljoin(GACHA_INFO_URL, endpoint)
r = session.request(method, url, **kwargs)
r.raise_for_status()
data = r.json()
if data["retcode"] == 0:
return data["data"]
raise_for_error(data)
@permanent_cache("lang")
def get_banner_types(authkey: str = None, lang: str = "en") -> Dict[int, str]:
"""Gets ids for all banners and their names"""
banners = fetch_gacha_endpoint(
"getConfigList", authkey=authkey, params=dict(lang=_get_short_lang_code(lang))
)["gacha_type_list"]
return {int(i["key"]): i["name"] for i in banners}
def get_wish_history(
banner_type: int = None,
size: int = None,
authkey: str = None,
end_id: int = 0,
lang: str = "en",
) -> Iterator[Dict[str, Any]]:
"""Gets wish history.
Note that pulls are yielded and not returned to account for pagination.
When a banner_type is set, only data from that banner type is retuned.
You can get banner types and their names from get_banner_types.
If a size is set the total returned amount of pulls will be equal to or lower than the size.
To be able to get history starting from somewhere other than the last pull
you may pass in the id of the pull right chronologically after the one you want to start from as end_id.
"""
if size is not None and size <= 0:
return
if banner_type is None:
# we get data from all banners by getting data from every individual banner
# and then sorting it by pull date with heapq.merge
gens = [
get_wish_history(banner_type, None, authkey, end_id, lang)
for banner_type in get_banner_types(authkey)
]
yield from islice(heapq.merge(*gens, key=lambda x: x["time"], reverse=True), size)
return
# we create banner_name outside prettify so we don't make extra requests
banner_name = get_banner_types(authkey, lang)[banner_type]
lang = _get_short_lang_code(lang)
page_size = 20
size = size or sys.maxsize
while True:
data = fetch_gacha_endpoint(
"getGachaLog",
authkey=authkey,
params=dict(
gacha_type=banner_type, size=min(page_size, size), end_id=end_id, lang=lang
),
)["list"]
data = prettify_wish_history(data, banner_name)
yield from data
size -= page_size
if len(data) < page_size or size <= 0:
break
end_id = data[-1]["id"]
def get_gacha_items(lang: str = "en-us") -> List[Dict[str, Any]]:
"""Gets the list of characters and weapons that can be gotten from the gacha."""
r = static_session.get(
f"https://webstatic-sea.mihoyo.com/hk4e/gacha_info/os_asia/items/{lang}.json"
)
r.raise_for_status()
return prettify_gacha_items(r.json())
def get_banner_details(banner_id: str, lang: str = "en-us") -> Dict[str, Any]:
"""Gets details of a specific banner.
This requires the banner's id.
These keep rotating so you need to get them with get_banner_ids().
example standard wish: "a37a19624270b092e7250edfabce541a3435c2"
The newbie gacha has no json resource tied to it so you can't get info about it.
"""
r = static_session.get(
f"https://webstatic-sea.mihoyo.com/hk4e/gacha_info/os_asia/{banner_id}/{lang}.json"
)
r.raise_for_status()
return prettify_banner_details(r.json())
def get_uid_from_authkey(authkey: str = None) -> int:
"""Gets a uid from an authkey.
If an authkey is not passed in the function uses the currently set authkey.
"""
# for safety we use all banners, probably overkill
# they are sorted from most to least pulled on for speed
histories = [get_wish_history(i, 1, authkey) for i in (301, 200, 302, 100)]
pull = next(chain.from_iterable(histories), None)
if pull is None: # very rare but possible
raise Exception("User has never made a wish")
return pull["uid"]
def validate_authkey(authkey: Any, previous_authkey: str = None) -> bool:
"""Checks whether an authkey is valid by sending a request
If a previous authkey is provided the function also checks if the
authkey belongs to the same person as the previous one.
"""
if not isinstance(authkey, str) or len(authkey) != 1024:
return False # invalid format
try:
base64.b64decode(authkey)
except:
return False # invalid base64 format
if previous_authkey and authkey[:682] != previous_authkey[:682]:
return False
try:
fetch_gacha_endpoint("getConfigList", authkey=authkey)
except AuthkeyError:
return False
return True