sycgram/tools/speedtests.py

250 lines
8.1 KiB
Python
Raw Permalink Normal View History

2022-04-06 14:39:27 +00:00
import json
import os
import platform
from datetime import datetime
from os import path
from time import time
from typing import Any, Dict, Optional, Tuple
from loguru import logger
2022-04-09 05:47:04 +00:00
from tools.constants import INSTALL_SPEEDTEST, SPEEDTEST_PATH_FILE, SYCGRAM_ERROR
2022-04-06 14:39:27 +00:00
from .helpers import basher
class Speedtester:
def __init__(self) -> None:
pass
async def __aenter__(self):
self._timer = time()
logger.info("Speedtest start ...")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
logger.info(
f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
async def running(self, cmd: str) -> Tuple[str]:
"""开始执行speedtest
Args:
cmd (str, optional): speedtest的完整指令需要返回json格式.
Returns:
Tuple[str]: 第一个值是文本/错误第二个是图片link
"""
2022-04-09 05:47:04 +00:00
await self.install_speedtest_cli()
2022-04-06 14:39:27 +00:00
# 超时报错
res = await basher(cmd, timeout=60)
logger.info(f"Speedtest Execution | {res}")
try:
# output result
self.__output: Dict[str, Any] = json.loads(res.get('output'))
self.__server: Dict[str, Any] = self.__output.get('server')
2022-04-09 05:47:04 +00:00
except Exception as e:
logger.error(e)
return f"⚠️ Speedtest Error\n```{res.get('error')}```", ''
2022-04-06 14:39:27 +00:00
else:
text = "**Speedtest**\n" \
f"Server: {self.get_server()}\n" \
f"Sponsor: {self.get_sponsor()}\n" \
f"Upload: {self.get_speed('upload')}\n" \
f"Download: {self.get_speed('download')}\n" \
f"jitter: {self.get_ping('jitter')}\n" \
f"Latency: {self.get_ping('latency')}\n" \
f"Time: {self.get_time()}"
return text, f"{self.__output.get('result').get('url')}.png"
async def list_servers_ids(self, cmd: str) -> str:
2022-04-09 05:47:04 +00:00
await self.install_speedtest_cli()
2022-04-06 14:39:27 +00:00
res = await basher(cmd, timeout=10)
logger.info(f"Speedtest Execution | {res}")
if not res.get('error'):
try:
self.__output: Dict[str, Any] = json.loads(res.get('output'))
2022-04-09 05:47:04 +00:00
except Exception:
return "**{SYCGRAM_ERROR}**\n> # `⚠️ Unable to get ids of servers`"
2022-04-06 14:39:27 +00:00
else:
tmp = '\n'.join(
f"`{k.get('id')}` **|** {k.get('name')} **|** {k.get('location')} {k.get('country')}"
for k in self.__output.get('servers')
)
return f"**Speedtest节点列表**\n{tmp}"
2022-04-09 05:47:04 +00:00
return f"**{SYCGRAM_ERROR}**\n```{res.get('error')}```"
2022-04-06 14:39:27 +00:00
def get_server(self) -> str:
location = self.__server.get('location')
country = self.__server.get('country')
return f"`{location} {country}`"
def get_sponsor(self) -> str:
return f"`{self.__server.get('name')}`"
def get_speed(self, opt: str) -> str:
"""
Args:
opt (str): upload or download
Returns:
str: Convert to bits
"""
def convert(bits) -> str:
"""Unit conversion"""
power = 1000
n = 0
units = {
0: 'bps',
1: 'Kbps',
2: 'Mbps',
3: 'Gbps',
4: 'Tbps'
}
while bits > power:
bits = bits / power
n = n + 1
return f"{bits:.3f} {units.get(n)}"
return f"`{convert(self.__output.get(opt).get('bandwidth')*8)}`"
def get_ping(self, opt: str) -> str:
return f"`{self.__output.get('ping').get(opt):.3f}`"
def get_time(self) -> str:
return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
2022-04-09 05:47:04 +00:00
async def install_speedtest_cli(self, opt: str = 'install') -> Optional[str]:
2022-04-07 02:55:52 +00:00
def exists_file() -> bool:
return path.exists(SPEEDTEST_PATH_FILE)
2022-04-06 14:39:27 +00:00
2022-04-07 03:02:54 +00:00
if platform.uname().system != "Linux" or platform.uname().machine not in [
'i386', 'x86_64', 'armel', 'armhf', 'aarch64',
]:
2022-04-09 05:47:04 +00:00
text = f"Unsupported System >>> {platform.uname().system} {platform.uname().machine}"
2022-04-06 14:39:27 +00:00
logger.warning(text)
return text
elif opt == 'install':
2022-04-07 02:55:52 +00:00
if not exists_file():
await self.__download_file()
2022-04-06 14:39:27 +00:00
logger.success("First install speedtest")
return
elif opt == 'update':
2022-04-07 02:55:52 +00:00
if exists_file():
os.remove(SPEEDTEST_PATH_FILE)
await self.__download_file()
if exists_file():
2022-04-09 05:47:04 +00:00
text = "Update speedtest successfully."
2022-04-06 14:39:27 +00:00
logger.success(text)
return text
2022-04-09 05:47:04 +00:00
return "Failed to update speedtest"
2022-04-06 14:39:27 +00:00
else:
raise ValueError(f'Wrong speedtest option {opt}')
2022-04-07 02:55:52 +00:00
async def __download_file(self) -> None:
await basher(INSTALL_SPEEDTEST, timeout=30)
2022-04-06 14:39:27 +00:00
# import asyncio
# import json
# from datetime import datetime
# from time import time
# from typing import Any, Dict, Tuple
# from loguru import logger
# from .helpers import execute
# class Speedtester:
# def __init__(self) -> None:
# pass
# async def __aenter__(self):
# self._timer = time()
# logger.info(f"Speedtest start in {self._timer}")
# return self
# async def __aexit__(self, exc_type, exc_val, exc_tb):
# logger.info(
# f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
# async def running(self, cmd: str) -> Tuple[str]:
# """开始执行speedtest
# Args:
# cmd (str, optional): speedtest的完整指令需要返回json格式.
# Defaults to 'speedtest-cli --share --json'.
# Returns:
# Tuple[str]: 第一个值是文本/错误第二个是图片link
# """
# # 超时报错
# res = await asyncio.wait_for(execute(cmd), timeout=60)
# logger.info(f"Speedtest Execution | {res}")
# await logger.complete()
# if not res.get('error'):
# if 'list' in cmd:
# return res.get('output'), ''
# try:
# # output result
# self.__output: Dict[str, Any] = json.loads(res.get('output'))
# self.__server: Dict[str, Any] = self.__output.get('server')
# except (TypeError, AttributeError, json.decoder.JSONDecodeError):
# return "⚠️ Unable to get detailed data.", ''
# else:
# text = "**Speedtest**\n" \
# f"Server: {self.get_server()}\n" \
# f"Sponsor: {self.get_sponsor()}\n" \
# f"Upload: {self.get_speed('upload')}\n" \
# f"Download: {self.get_speed('download')}\n" \
# f"Latency: {self.get_latency()}\n" \
# f"Time: {self.get_time()}"
# return text, self.__output.get('share')
# return f"⚠️ Error | {res.get('error')}", ''
# def get_server(self) -> str:
# country = self.__server.get('country')
# cc = self.__server.get('cc')
# return f"`{country} - {cc}`"
# def get_sponsor(self) -> str:
# return f"`{self.__server.get('sponsor')}`"
# def get_speed(self, option: str) -> str:
# """
# Args:
# option (str): upload or download
# Returns:
# str: Convert to bits
# """
# return f"`{self.convert(self.__output.get(option))}`"
# def get_latency(self) -> str:
# return f"`{self.__server.get('latency')}`"
# def get_time(self) -> str:
# return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
# @staticmethod
# def convert(bits) -> str:
# """Unit conversion"""
# power = 1024
# n = 0
# units = {
# 0: 'bps',
# 1: 'Kbps',
# 2: 'Mbps',
# 3: 'Gbps',
# 4: 'Tbps'
# }
# while bits > power:
# bits = bits / power
# n = n + 1
# return f"{bits:.3f} {units.get(n)}"