PamGram/modules/apihelper/abyss_team.py
Karako 4c702515a0
Enhance AssetsService
Co-authored-by: xtaodada <xtao@xtaolink.cn>
2022-10-07 13:02:49 +08:00

69 lines
2.5 KiB
Python

import time
from typing import List, Optional
import httpx
from pydantic import BaseModel, parse_obj_as, validator
class Member(BaseModel):
star: int
attr: str
name: str
class TeamRate(BaseModel):
rate: float
formation: List[Member]
ownerNum: Optional[int]
@validator('rate', pre=True)
def str2float(cls, v): # pylint: disable=R0201
return float(v.replace('%', '')) / 100.0 if isinstance(v, str) else v
class TeamRateResult(BaseModel):
rateListUp: List[TeamRate]
rateListDown: List[TeamRate]
userCount: int
def sort(self, characters: List[str]):
for team in self.rateListUp:
team.ownerNum = len(set(characters) & {member.name for member in team.formation})
for team in self.rateListDown:
team.ownerNum = len(set(characters) & {member.name for member in team.formation})
self.rateListUp.sort(key=lambda x: (x.ownerNum / 4 * x.rate), reverse=True)
self.rateListDown.sort(key=lambda x: (x.ownerNum / 4 * x.rate), reverse=True)
class AbyssTeamData:
TEAM_RATE_API = "https://www.youchuang.fun/gamerole/formationRate"
HEADERS = {
'Host': 'www.youchuang.fun',
'Referer': 'https://servicewechat.com/wxce4dbe0cb0f764b3/91/page-frame.html',
'User-Agent': 'Mozilla/5.0 (iPad; CPU OS 15_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) '
'Mobile/15E148 MicroMessenger/8.0.20(0x1800142f) NetType/WIFI Language/zh_CN',
'content-type': 'application/json'
}
VERSION = "3.1"
def __init__(self):
self.client = httpx.AsyncClient(headers=self.HEADERS)
self.time = 0
self.data = None
self.ttl = 10 * 60
async def get_data(self) -> TeamRateResult:
if self.data is None or self.time + self.ttl < time.time():
data_up = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 1})
data_up_json = data_up.json()["result"]
data_down = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 2})
data_down_json = data_down.json()["result"]
self.data = TeamRateResult(rateListUp=parse_obj_as(List[TeamRate], data_up_json["rateList"]),
rateListDown=parse_obj_as(List[TeamRate], data_down_json["rateList"]),
userCount=data_up_json["userCount"])
self.time = time.time()
return self.data.copy(deep=True)
async def close(self):
await self.client.aclose()