sycgram/tools/speedtests.py

250 lines
8.1 KiB
Python
Raw Normal View History

2022-04-06 14:39:27 +00:00
import json
import os
import platform
from datetime import datetime
from os import path
from time import time
from typing import Any, Dict, Optional, Tuple
from loguru import logger
from tools.constants import INSTALL_SPEEDTEST, SPEEDTEST_PATH_FILE
from .helpers import basher
class Speedtester:
def __init__(self) -> None:
pass
async def __aenter__(self):
self._timer = time()
logger.info("Speedtest start ...")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
logger.info(
f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
async def running(self, cmd: str) -> Tuple[str]:
"""开始执行speedtest
Args:
cmd (str, optional): speedtest的完整指令需要返回json格式.
Defaults to 'speedtest-cli --share --json'.
Returns:
Tuple[str]: 第一个值是文本/错误第二个是图片link
"""
await self.init_for_speedtest()
# 超时报错
res = await basher(cmd, timeout=60)
logger.info(f"Speedtest Execution | {res}")
try:
# output result
self.__output: Dict[str, Any] = json.loads(res.get('output'))
self.__server: Dict[str, Any] = self.__output.get('server')
except Exception:
return f"⚠️ Error\n```{res.get('error')}```", ''
else:
text = "**Speedtest**\n" \
f"Server: {self.get_server()}\n" \
f"Sponsor: {self.get_sponsor()}\n" \
f"Upload: {self.get_speed('upload')}\n" \
f"Download: {self.get_speed('download')}\n" \
f"jitter: {self.get_ping('jitter')}\n" \
f"Latency: {self.get_ping('latency')}\n" \
f"Time: {self.get_time()}"
return text, f"{self.__output.get('result').get('url')}.png"
async def list_servers_ids(self, cmd: str) -> str:
await self.init_for_speedtest()
res = await basher(cmd, timeout=10)
logger.info(f"Speedtest Execution | {res}")
if not res.get('error'):
try:
self.__output: Dict[str, Any] = json.loads(res.get('output'))
except (TypeError, AttributeError, json.decoder.JSONDecodeError):
return "⚠️ Unable to get ids of servers"
else:
tmp = '\n'.join(
f"`{k.get('id')}` **|** {k.get('name')} **|** {k.get('location')} {k.get('country')}"
for k in self.__output.get('servers')
)
return f"**Speedtest节点列表**\n{tmp}"
return f"⚠️ Error\n```{res.get('error')}```"
def get_server(self) -> str:
location = self.__server.get('location')
country = self.__server.get('country')
return f"`{location} {country}`"
def get_sponsor(self) -> str:
return f"`{self.__server.get('name')}`"
def get_speed(self, opt: str) -> str:
"""
Args:
opt (str): upload or download
Returns:
str: Convert to bits
"""
def convert(bits) -> str:
"""Unit conversion"""
power = 1000
n = 0
units = {
0: 'bps',
1: 'Kbps',
2: 'Mbps',
3: 'Gbps',
4: 'Tbps'
}
while bits > power:
bits = bits / power
n = n + 1
return f"{bits:.3f} {units.get(n)}"
return f"`{convert(self.__output.get(opt).get('bandwidth')*8)}`"
def get_ping(self, opt: str) -> str:
return f"`{self.__output.get('ping').get(opt):.3f}`"
def get_time(self) -> str:
return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
async def init_for_speedtest(self, opt: str = 'install') -> Optional[str]:
2022-04-07 02:55:52 +00:00
def exists_file() -> bool:
return path.exists(SPEEDTEST_PATH_FILE)
2022-04-06 14:39:27 +00:00
2022-04-07 03:02:54 +00:00
if platform.uname().system != "Linux" or platform.uname().machine not in [
'i386', 'x86_64', 'armel', 'armhf', 'aarch64',
]:
text = f"Unsupported System >>> {platform.uname().system} - {platform.uname().machine}"
2022-04-06 14:39:27 +00:00
logger.warning(text)
return text
elif opt == 'install':
2022-04-07 02:55:52 +00:00
if not exists_file():
await self.__download_file()
2022-04-06 14:39:27 +00:00
logger.success("First install speedtest")
return
elif opt == 'update':
2022-04-07 02:55:52 +00:00
if exists_file():
os.remove(SPEEDTEST_PATH_FILE)
await self.__download_file()
if exists_file():
2022-04-06 14:39:27 +00:00
text = "✅ Update speedtest successfully."
logger.success(text)
return text
return "❌ Failed to update speedtest"
else:
raise ValueError(f'Wrong speedtest option {opt}')
2022-04-07 02:55:52 +00:00
async def __download_file(self) -> None:
await basher(INSTALL_SPEEDTEST, timeout=30)
2022-04-06 14:39:27 +00:00
# import asyncio
# import json
# from datetime import datetime
# from time import time
# from typing import Any, Dict, Tuple
# from loguru import logger
# from .helpers import execute
# class Speedtester:
# def __init__(self) -> None:
# pass
# async def __aenter__(self):
# self._timer = time()
# logger.info(f"Speedtest start in {self._timer}")
# return self
# async def __aexit__(self, exc_type, exc_val, exc_tb):
# logger.info(
# f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
# async def running(self, cmd: str) -> Tuple[str]:
# """开始执行speedtest
# Args:
# cmd (str, optional): speedtest的完整指令需要返回json格式.
# Defaults to 'speedtest-cli --share --json'.
# Returns:
# Tuple[str]: 第一个值是文本/错误第二个是图片link
# """
# # 超时报错
# res = await asyncio.wait_for(execute(cmd), timeout=60)
# logger.info(f"Speedtest Execution | {res}")
# await logger.complete()
# if not res.get('error'):
# if 'list' in cmd:
# return res.get('output'), ''
# try:
# # output result
# self.__output: Dict[str, Any] = json.loads(res.get('output'))
# self.__server: Dict[str, Any] = self.__output.get('server')
# except (TypeError, AttributeError, json.decoder.JSONDecodeError):
# return "⚠️ Unable to get detailed data.", ''
# else:
# text = "**Speedtest**\n" \
# f"Server: {self.get_server()}\n" \
# f"Sponsor: {self.get_sponsor()}\n" \
# f"Upload: {self.get_speed('upload')}\n" \
# f"Download: {self.get_speed('download')}\n" \
# f"Latency: {self.get_latency()}\n" \
# f"Time: {self.get_time()}"
# return text, self.__output.get('share')
# return f"⚠️ Error | {res.get('error')}", ''
# def get_server(self) -> str:
# country = self.__server.get('country')
# cc = self.__server.get('cc')
# return f"`{country} - {cc}`"
# def get_sponsor(self) -> str:
# return f"`{self.__server.get('sponsor')}`"
# def get_speed(self, option: str) -> str:
# """
# Args:
# option (str): upload or download
# Returns:
# str: Convert to bits
# """
# return f"`{self.convert(self.__output.get(option))}`"
# def get_latency(self) -> str:
# return f"`{self.__server.get('latency')}`"
# def get_time(self) -> str:
# return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
# @staticmethod
# def convert(bits) -> str:
# """Unit conversion"""
# power = 1024
# n = 0
# units = {
# 0: 'bps',
# 1: 'Kbps',
# 2: 'Mbps',
# 3: 'Gbps',
# 4: 'Tbps'
# }
# while bits > power:
# bits = bits / power
# n = n + 1
# return f"{bits:.3f} {units.get(n)}"