PagerMaid_Plugins_Pyro/speedtest_go/main.py
xtaodada 7fc4fb65f0
All checks were successful
Github commit to telegram / build (push) Successful in 10s
speedtest_go 测速
2024-09-28 22:44:37 +08:00

148 lines
4.4 KiB
Python

import platform
import tarfile
import json
from pathlib import Path
from pagermaid.enums import Message
from pagermaid.enums.command import CommandHandler
from pagermaid.listener import listener
from pagermaid.services import client
from pagermaid.utils import lang, execute, safe_remove
VERSION = "1.7.9"
DATA_PATH = Path("data") / "speedtest_go"
DATA_PATH.mkdir(parents=True, exist_ok=True)
FILE_PATH = DATA_PATH / (f"{VERSION}.exe" if platform.system() == "Windows" else VERSION)
FILE_TAR_PATH = DATA_PATH / f"{VERSION}.tar.gz"
def unit_convert(byte):
"""Converts byte into readable formats."""
power = 1000
zero = 0
units = {0: "", 1: "Kb/s", 2: "Mb/s", 3: "Gb/s", 4: "Tb/s"}
while byte > power:
byte /= power
zero += 1
return f"{round(byte, 2)} {units[zero]}"
class NotSupportSystem(BaseException):
"""不支持的系统"""
def get_download_url() -> str:
system = platform.system()
system_str = {"Windows": "Windows", "Darwin": "Darwin", "Linux": "Linux"}.get(system)
if not system_str:
raise NotSupportSystem(f"不支持的系统: {system}")
bits, _ = platform.architecture()
processor = platform.processor()
arm = "arm" in processor.lower()
software_type = ""
if bits == "64bit":
if arm:
software_type = "arm64"
else:
software_type = "x86_64"
else:
software_type = "i386"
return f"https://github.com/showwin/speedtest-go/releases/download/v{VERSION}/speedtest-go_{VERSION}_{system_str}_{software_type}.tar.gz"
def unzip_file():
tar = tarfile.open(FILE_TAR_PATH)
file_names = tar.getnames()
for file_name in file_names:
if not file_name.startswith("speedtest-go"):
continue
tar.extract(file_name, DATA_PATH)
(DATA_PATH / file_name).rename(FILE_PATH)
tar.close()
safe_remove(str(FILE_TAR_PATH))
async def download_go() -> None:
if FILE_PATH.exists():
return
if FILE_TAR_PATH.exists():
FILE_TAR_PATH.unlink()
url = get_download_url()
head = await client.head(url, follow_redirects=True)
try:
head.raise_for_status()
except Exception as e:
raise NotSupportSystem(f"下载链接生成失败,请联系管理员:{e}")
file_data = await client.get(url, follow_redirects=True)
with open(FILE_TAR_PATH, "wb") as f:
f.write(file_data.content)
try:
unzip_file()
except Exception as e:
raise NotSupportSystem(f"解压失败,请联系管理员:{e}")
async def run_command(command: str) -> str:
await download_go()
args = [str(FILE_PATH)] + command.split()
result = await execute(" ".join(args))
return result
async def get_all_ids():
data = await run_command("-l --json")
return ("附近的测速点有:\n\n" + data) if data else "获取失败,请联系管理员"
async def run_speedtest(server: int) -> str:
args = f"--server={server} --json" if server else "--json"
_data = await run_command(args)
try:
result = json.loads(_data)
result["server"] = result["servers"][0]
except json.JSONDecodeError:
return "解析输出失败,请联系管理员"
latency = round(result['server']['latency'] / 1000 / 1000, 2)
return (
f"**Speedtest** \n"
f"Server: `{result['server']['name']} - "
f"{result['server']['country']}` \n"
f"Sponsor: `{result['server']['sponsor']}` \n"
f"Upload: `{unit_convert(result['server']['ul_speed'])}` \n"
f"Download: `{unit_convert(result['server']['dl_speed'])}` \n"
f"Latency: `{latency}ms` \n"
f"Timestamp: `{result['timestamp']}`"
)
@listener(
command="speedtest_go",
need_admin=True,
description=lang('speedtest_des'),
parameters="(list/server id)",
)
async def speedtest_go(message: Message):
""" Tests internet speed using speedtest. """
try:
server = int(message.arguments)
except ValueError:
server = 0
msg: Message = await message.edit(lang("speedtest_processing"))
des = await run_speedtest(server)
return await msg.edit(des)
speedtest_go: "CommandHandler"
@speedtest_go.sub_command(
command="list",
)
async def speedtest_go_list(message: Message):
msg: Message = await message.edit(lang("speedtest_processing"))
try:
des = await get_all_ids()
except NotSupportSystem as e:
des = str(e)
return await msg.edit(des)