speedtest_go 测速
All checks were successful
Github commit to telegram / build (push) Successful in 10s

This commit is contained in:
xtaodada 2024-09-28 22:44:37 +08:00
parent d2416f49be
commit 7fc4fb65f0
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
2 changed files with 149 additions and 0 deletions

2
speedtest_go/DES.md Normal file
View File

@ -0,0 +1,2 @@
测速。
指令:,speedtest_go

147
speedtest_go/main.py Normal file
View File

@ -0,0 +1,147 @@
import platform
import tarfile
import json
from pathlib import Path
from pagermaid.enums import Message
from pagermaid.enums.command import CommandHandler
from pagermaid.listener import listener
from pagermaid.services import client
from pagermaid.utils import lang, execute, safe_remove
VERSION = "1.7.9"
DATA_PATH = Path("data") / "speedtest_go"
DATA_PATH.mkdir(parents=True, exist_ok=True)
FILE_PATH = DATA_PATH / (f"{VERSION}.exe" if platform.system() == "Windows" else VERSION)
FILE_TAR_PATH = DATA_PATH / f"{VERSION}.tar.gz"
def unit_convert(byte):
"""Converts byte into readable formats."""
power = 1000
zero = 0
units = {0: "", 1: "Kb/s", 2: "Mb/s", 3: "Gb/s", 4: "Tb/s"}
while byte > power:
byte /= power
zero += 1
return f"{round(byte, 2)} {units[zero]}"
class NotSupportSystem(BaseException):
"""不支持的系统"""
def get_download_url() -> str:
system = platform.system()
system_str = {"Windows": "Windows", "Darwin": "Darwin", "Linux": "Linux"}.get(system)
if not system_str:
raise NotSupportSystem(f"不支持的系统: {system}")
bits, _ = platform.architecture()
processor = platform.processor()
arm = "arm" in processor.lower()
software_type = ""
if bits == "64bit":
if arm:
software_type = "arm64"
else:
software_type = "x86_64"
else:
software_type = "i386"
return f"https://github.com/showwin/speedtest-go/releases/download/v{VERSION}/speedtest-go_{VERSION}_{system_str}_{software_type}.tar.gz"
def unzip_file():
tar = tarfile.open(FILE_TAR_PATH)
file_names = tar.getnames()
for file_name in file_names:
if not file_name.startswith("speedtest-go"):
continue
tar.extract(file_name, DATA_PATH)
(DATA_PATH / file_name).rename(FILE_PATH)
tar.close()
safe_remove(str(FILE_TAR_PATH))
async def download_go() -> None:
if FILE_PATH.exists():
return
if FILE_TAR_PATH.exists():
FILE_TAR_PATH.unlink()
url = get_download_url()
head = await client.head(url, follow_redirects=True)
try:
head.raise_for_status()
except Exception as e:
raise NotSupportSystem(f"下载链接生成失败,请联系管理员:{e}")
file_data = await client.get(url, follow_redirects=True)
with open(FILE_TAR_PATH, "wb") as f:
f.write(file_data.content)
try:
unzip_file()
except Exception as e:
raise NotSupportSystem(f"解压失败,请联系管理员:{e}")
async def run_command(command: str) -> str:
await download_go()
args = [str(FILE_PATH)] + command.split()
result = await execute(" ".join(args))
return result
async def get_all_ids():
data = await run_command("-l --json")
return ("附近的测速点有:\n\n" + data) if data else "获取失败,请联系管理员"
async def run_speedtest(server: int) -> str:
args = f"--server={server} --json" if server else "--json"
_data = await run_command(args)
try:
result = json.loads(_data)
result["server"] = result["servers"][0]
except json.JSONDecodeError:
return "解析输出失败,请联系管理员"
latency = round(result['server']['latency'] / 1000 / 1000, 2)
return (
f"**Speedtest** \n"
f"Server: `{result['server']['name']} - "
f"{result['server']['country']}` \n"
f"Sponsor: `{result['server']['sponsor']}` \n"
f"Upload: `{unit_convert(result['server']['ul_speed'])}` \n"
f"Download: `{unit_convert(result['server']['dl_speed'])}` \n"
f"Latency: `{latency}ms` \n"
f"Timestamp: `{result['timestamp']}`"
)
@listener(
command="speedtest_go",
need_admin=True,
description=lang('speedtest_des'),
parameters="(list/server id)",
)
async def speedtest_go(message: Message):
""" Tests internet speed using speedtest. """
try:
server = int(message.arguments)
except ValueError:
server = 0
msg: Message = await message.edit(lang("speedtest_processing"))
des = await run_speedtest(server)
return await msg.edit(des)
speedtest_go: "CommandHandler"
@speedtest_go.sub_command(
command="list",
)
async def speedtest_go_list(message: Message):
msg: Message = await message.edit(lang("speedtest_processing"))
try:
des = await get_all_ids()
except NotSupportSystem as e:
des = str(e)
return await msg.edit(des)