2022-04-06 14:39:27 +00:00
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import platform
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from os import path
|
|
|
|
|
from time import time
|
|
|
|
|
from typing import Any, Dict, Optional, Tuple
|
|
|
|
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
|
from tools.constants import INSTALL_SPEEDTEST, SPEEDTEST_PATH_FILE
|
|
|
|
|
|
|
|
|
|
from .helpers import basher
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Speedtester:
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
|
self._timer = time()
|
|
|
|
|
logger.info("Speedtest start ...")
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
|
logger.info(
|
|
|
|
|
f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
|
|
|
|
|
|
|
|
|
|
async def running(self, cmd: str) -> Tuple[str]:
|
|
|
|
|
"""开始执行speedtest
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
cmd (str, optional): speedtest的完整指令,需要返回json格式.
|
|
|
|
|
Defaults to 'speedtest-cli --share --json'.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Tuple[str]: 第一个值是文本/错误,第二个是图片link
|
|
|
|
|
"""
|
|
|
|
|
await self.init_for_speedtest()
|
|
|
|
|
# 超时报错
|
|
|
|
|
res = await basher(cmd, timeout=60)
|
|
|
|
|
logger.info(f"Speedtest Execution | {res}")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# output result
|
|
|
|
|
self.__output: Dict[str, Any] = json.loads(res.get('output'))
|
|
|
|
|
self.__server: Dict[str, Any] = self.__output.get('server')
|
|
|
|
|
except Exception:
|
|
|
|
|
return f"⚠️ Error\n```{res.get('error')}```", ''
|
|
|
|
|
else:
|
|
|
|
|
text = "**Speedtest**\n" \
|
|
|
|
|
f"Server: {self.get_server()}\n" \
|
|
|
|
|
f"Sponsor: {self.get_sponsor()}\n" \
|
|
|
|
|
f"Upload: {self.get_speed('upload')}\n" \
|
|
|
|
|
f"Download: {self.get_speed('download')}\n" \
|
|
|
|
|
f"jitter: {self.get_ping('jitter')}\n" \
|
|
|
|
|
f"Latency: {self.get_ping('latency')}\n" \
|
|
|
|
|
f"Time: {self.get_time()}"
|
|
|
|
|
return text, f"{self.__output.get('result').get('url')}.png"
|
|
|
|
|
|
|
|
|
|
async def list_servers_ids(self, cmd: str) -> str:
|
|
|
|
|
await self.init_for_speedtest()
|
|
|
|
|
res = await basher(cmd, timeout=10)
|
|
|
|
|
logger.info(f"Speedtest Execution | {res}")
|
|
|
|
|
if not res.get('error'):
|
|
|
|
|
try:
|
|
|
|
|
self.__output: Dict[str, Any] = json.loads(res.get('output'))
|
|
|
|
|
except (TypeError, AttributeError, json.decoder.JSONDecodeError):
|
|
|
|
|
return "⚠️ Unable to get ids of servers"
|
|
|
|
|
else:
|
|
|
|
|
tmp = '\n'.join(
|
|
|
|
|
f"`{k.get('id')}` **|** {k.get('name')} **|** {k.get('location')} {k.get('country')}"
|
|
|
|
|
for k in self.__output.get('servers')
|
|
|
|
|
)
|
|
|
|
|
return f"**Speedtest节点列表:**\n{tmp}"
|
|
|
|
|
return f"⚠️ Error\n```{res.get('error')}```"
|
|
|
|
|
|
|
|
|
|
def get_server(self) -> str:
|
|
|
|
|
location = self.__server.get('location')
|
|
|
|
|
country = self.__server.get('country')
|
|
|
|
|
return f"`{location} {country}`"
|
|
|
|
|
|
|
|
|
|
def get_sponsor(self) -> str:
|
|
|
|
|
return f"`{self.__server.get('name')}`"
|
|
|
|
|
|
|
|
|
|
def get_speed(self, opt: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Args:
|
|
|
|
|
opt (str): upload or download
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: Convert to bits
|
|
|
|
|
"""
|
|
|
|
|
def convert(bits) -> str:
|
|
|
|
|
"""Unit conversion"""
|
|
|
|
|
power = 1000
|
|
|
|
|
n = 0
|
|
|
|
|
units = {
|
|
|
|
|
0: 'bps',
|
|
|
|
|
1: 'Kbps',
|
|
|
|
|
2: 'Mbps',
|
|
|
|
|
3: 'Gbps',
|
|
|
|
|
4: 'Tbps'
|
|
|
|
|
}
|
|
|
|
|
while bits > power:
|
|
|
|
|
bits = bits / power
|
|
|
|
|
n = n + 1
|
|
|
|
|
return f"{bits:.3f} {units.get(n)}"
|
|
|
|
|
return f"`{convert(self.__output.get(opt).get('bandwidth')*8)}`"
|
|
|
|
|
|
|
|
|
|
def get_ping(self, opt: str) -> str:
|
|
|
|
|
return f"`{self.__output.get('ping').get(opt):.3f}`"
|
|
|
|
|
|
|
|
|
|
def get_time(self) -> str:
|
|
|
|
|
return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
|
|
|
|
|
|
|
|
|
|
async def init_for_speedtest(self, opt: str = 'install') -> Optional[str]:
|
2022-04-07 02:55:52 +00:00
|
|
|
|
def exists_file() -> bool:
|
|
|
|
|
return path.exists(SPEEDTEST_PATH_FILE)
|
2022-04-06 14:39:27 +00:00
|
|
|
|
|
2022-04-07 03:02:54 +00:00
|
|
|
|
if platform.uname().system != "Linux" or platform.uname().machine not in [
|
|
|
|
|
'i386', 'x86_64', 'armel', 'armhf', 'aarch64',
|
|
|
|
|
]:
|
|
|
|
|
text = f"Unsupported System >>> {platform.uname().system} - {platform.uname().machine}"
|
2022-04-06 14:39:27 +00:00
|
|
|
|
logger.warning(text)
|
|
|
|
|
return text
|
|
|
|
|
elif opt == 'install':
|
2022-04-07 02:55:52 +00:00
|
|
|
|
if not exists_file():
|
|
|
|
|
await self.__download_file()
|
2022-04-06 14:39:27 +00:00
|
|
|
|
logger.success("First install speedtest")
|
|
|
|
|
return
|
|
|
|
|
elif opt == 'update':
|
2022-04-07 02:55:52 +00:00
|
|
|
|
if exists_file():
|
|
|
|
|
os.remove(SPEEDTEST_PATH_FILE)
|
|
|
|
|
await self.__download_file()
|
|
|
|
|
if exists_file():
|
2022-04-06 14:39:27 +00:00
|
|
|
|
text = "✅ Update speedtest successfully."
|
|
|
|
|
logger.success(text)
|
|
|
|
|
return text
|
|
|
|
|
return "❌ Failed to update speedtest!"
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f'Wrong speedtest option {opt}!')
|
|
|
|
|
|
2022-04-07 02:55:52 +00:00
|
|
|
|
async def __download_file(self) -> None:
|
|
|
|
|
await basher(INSTALL_SPEEDTEST, timeout=30)
|
2022-04-06 14:39:27 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# import asyncio
|
|
|
|
|
# import json
|
|
|
|
|
# from datetime import datetime
|
|
|
|
|
# from time import time
|
|
|
|
|
# from typing import Any, Dict, Tuple
|
|
|
|
|
|
|
|
|
|
# from loguru import logger
|
|
|
|
|
|
|
|
|
|
# from .helpers import execute
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# class Speedtester:
|
|
|
|
|
# def __init__(self) -> None:
|
|
|
|
|
# pass
|
|
|
|
|
|
|
|
|
|
# async def __aenter__(self):
|
|
|
|
|
# self._timer = time()
|
|
|
|
|
# logger.info(f"Speedtest start in {self._timer}")
|
|
|
|
|
# return self
|
|
|
|
|
|
|
|
|
|
# async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
|
# logger.info(
|
|
|
|
|
# f"Speedtest over and takes {time()-self._timer:.5f} seconds.")
|
|
|
|
|
|
|
|
|
|
# async def running(self, cmd: str) -> Tuple[str]:
|
|
|
|
|
# """开始执行speedtest
|
|
|
|
|
|
|
|
|
|
# Args:
|
|
|
|
|
# cmd (str, optional): speedtest的完整指令,需要返回json格式.
|
|
|
|
|
# Defaults to 'speedtest-cli --share --json'.
|
|
|
|
|
|
|
|
|
|
# Returns:
|
|
|
|
|
# Tuple[str]: 第一个值是文本/错误,第二个是图片link
|
|
|
|
|
# """
|
|
|
|
|
# # 超时报错
|
|
|
|
|
# res = await asyncio.wait_for(execute(cmd), timeout=60)
|
|
|
|
|
# logger.info(f"Speedtest Execution | {res}")
|
|
|
|
|
# await logger.complete()
|
|
|
|
|
|
|
|
|
|
# if not res.get('error'):
|
|
|
|
|
# if 'list' in cmd:
|
|
|
|
|
# return res.get('output'), ''
|
|
|
|
|
|
|
|
|
|
# try:
|
|
|
|
|
# # output result
|
|
|
|
|
# self.__output: Dict[str, Any] = json.loads(res.get('output'))
|
|
|
|
|
# self.__server: Dict[str, Any] = self.__output.get('server')
|
|
|
|
|
# except (TypeError, AttributeError, json.decoder.JSONDecodeError):
|
|
|
|
|
# return "⚠️ Unable to get detailed data.", ''
|
|
|
|
|
|
|
|
|
|
# else:
|
|
|
|
|
# text = "**Speedtest**\n" \
|
|
|
|
|
# f"Server: {self.get_server()}\n" \
|
|
|
|
|
# f"Sponsor: {self.get_sponsor()}\n" \
|
|
|
|
|
# f"Upload: {self.get_speed('upload')}\n" \
|
|
|
|
|
# f"Download: {self.get_speed('download')}\n" \
|
|
|
|
|
# f"Latency: {self.get_latency()}\n" \
|
|
|
|
|
# f"Time: {self.get_time()}"
|
|
|
|
|
# return text, self.__output.get('share')
|
|
|
|
|
|
|
|
|
|
# return f"⚠️ Error | {res.get('error')}", ''
|
|
|
|
|
|
|
|
|
|
# def get_server(self) -> str:
|
|
|
|
|
# country = self.__server.get('country')
|
|
|
|
|
# cc = self.__server.get('cc')
|
|
|
|
|
# return f"`{country} - {cc}`"
|
|
|
|
|
|
|
|
|
|
# def get_sponsor(self) -> str:
|
|
|
|
|
# return f"`{self.__server.get('sponsor')}`"
|
|
|
|
|
|
|
|
|
|
# def get_speed(self, option: str) -> str:
|
|
|
|
|
# """
|
|
|
|
|
# Args:
|
|
|
|
|
# option (str): upload or download
|
|
|
|
|
|
|
|
|
|
# Returns:
|
|
|
|
|
# str: Convert to bits
|
|
|
|
|
# """
|
|
|
|
|
# return f"`{self.convert(self.__output.get(option))}`"
|
|
|
|
|
|
|
|
|
|
# def get_latency(self) -> str:
|
|
|
|
|
# return f"`{self.__server.get('latency')}`"
|
|
|
|
|
|
|
|
|
|
# def get_time(self) -> str:
|
|
|
|
|
# return f"`{datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S')}`"
|
|
|
|
|
|
|
|
|
|
# @staticmethod
|
|
|
|
|
# def convert(bits) -> str:
|
|
|
|
|
# """Unit conversion"""
|
|
|
|
|
# power = 1024
|
|
|
|
|
# n = 0
|
|
|
|
|
# units = {
|
|
|
|
|
# 0: 'bps',
|
|
|
|
|
# 1: 'Kbps',
|
|
|
|
|
# 2: 'Mbps',
|
|
|
|
|
# 3: 'Gbps',
|
|
|
|
|
# 4: 'Tbps'
|
|
|
|
|
# }
|
|
|
|
|
# while bits > power:
|
|
|
|
|
# bits = bits / power
|
|
|
|
|
# n = n + 1
|
|
|
|
|
# return f"{bits:.3f} {units.get(n)}"
|