mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-16 11:02:56 +00:00
128 lines
4.0 KiB
Python
128 lines
4.0 KiB
Python
import contextlib
|
|
|
|
from PIL import Image
|
|
from os.path import exists
|
|
from httpx import ReadTimeout
|
|
|
|
from pagermaid.listener import listener
|
|
from pagermaid.single_utils import safe_remove
|
|
from pagermaid.enums import Client, Message, AsyncClient
|
|
from pagermaid.utils import lang, pip_install
|
|
|
|
pip_install("speedtest-cli", alias="speedtest")
|
|
|
|
from speedtest import (
|
|
Speedtest,
|
|
ShareResultsConnectFailure,
|
|
ShareResultsSubmitFailure,
|
|
NoMatchedServers,
|
|
SpeedtestBestServerFailure,
|
|
SpeedtestHTTPError,
|
|
)
|
|
|
|
|
|
def unit_convert(byte):
|
|
"""Converts byte into readable formats."""
|
|
power = 1000
|
|
zero = 0
|
|
units = {0: "", 1: "Kb/s", 2: "Mb/s", 3: "Gb/s", 4: "Tb/s"}
|
|
while byte > power:
|
|
byte /= power
|
|
zero += 1
|
|
return f"{round(byte, 2)} {units[zero]}"
|
|
|
|
|
|
async def run_speedtest(request: AsyncClient, message: Message):
|
|
test = Speedtest()
|
|
server = int(message.arguments) if len(message.parameter) == 1 else None
|
|
if server:
|
|
servers = test.get_closest_servers()
|
|
for i in servers:
|
|
if i["id"] == str(server):
|
|
test.servers = [i]
|
|
break
|
|
test.get_best_server(servers=test.servers)
|
|
test.download()
|
|
test.upload()
|
|
with contextlib.suppress(ShareResultsConnectFailure):
|
|
test.results.share()
|
|
result = test.results.dict()
|
|
des = (
|
|
f"**Speedtest** \n"
|
|
f"Server: `{result['server']['name']} - "
|
|
f"{result['server']['cc']}` \n"
|
|
f"Sponsor: `{result['server']['sponsor']}` \n"
|
|
f"Upload: `{unit_convert(result['upload'])}` \n"
|
|
f"Download: `{unit_convert(result['download'])}` \n"
|
|
f"Latency: `{result['ping']}` \n"
|
|
f"Timestamp: `{result['timestamp']}`"
|
|
)
|
|
if result["share"]:
|
|
data = await request.get(
|
|
result["share"].replace("http:", "https:"), follow_redirects=True
|
|
)
|
|
with open("speedtest.png", mode="wb") as f:
|
|
f.write(data.content)
|
|
with contextlib.suppress(Exception):
|
|
img = Image.open("speedtest.png")
|
|
c = img.crop((17, 11, 727, 389))
|
|
c.save("speedtest.png")
|
|
return des, "speedtest.png" if exists("speedtest.png") else None
|
|
|
|
|
|
async def get_all_ids():
|
|
test = Speedtest()
|
|
servers = test.get_closest_servers()
|
|
return (
|
|
(
|
|
"附近的测速点有:\n\n"
|
|
+ "\n".join(
|
|
f"`{i['id']}` - `{int(i['d'])}km` - `{i['name']}` - `{i['sponsor']}`"
|
|
for i in servers
|
|
),
|
|
None,
|
|
)
|
|
if servers
|
|
else ("附近没有测速点", None)
|
|
)
|
|
|
|
|
|
@listener(
|
|
command="speedtest",
|
|
description=lang("speedtest_des"),
|
|
parameters="(Server ID/测速点列表)",
|
|
)
|
|
async def speedtest(client: Client, message: Message, request: AsyncClient):
|
|
"""Tests internet speed using speedtest."""
|
|
if message.arguments == "测速点列表":
|
|
msg = message
|
|
else:
|
|
msg: Message = await message.edit(lang("speedtest_processing"))
|
|
try:
|
|
if message.arguments == "测速点列表":
|
|
des, photo = await get_all_ids()
|
|
else:
|
|
des, photo = await run_speedtest(request, message)
|
|
except SpeedtestHTTPError:
|
|
return await msg.edit(lang("speedtest_ConnectFailure"))
|
|
except (ValueError, TypeError):
|
|
return await msg.edit(lang("arg_error"))
|
|
except (SpeedtestBestServerFailure, NoMatchedServers):
|
|
return await msg.edit(lang("speedtest_ServerFailure"))
|
|
except (ShareResultsSubmitFailure, RuntimeError, ReadTimeout):
|
|
return await msg.edit(lang("speedtest_ConnectFailure"))
|
|
if not photo:
|
|
return await msg.edit(des)
|
|
try:
|
|
await client.send_photo(
|
|
message.chat.id,
|
|
photo,
|
|
caption=des,
|
|
reply_to_message_id=message.reply_to_top_message_id
|
|
or message.reply_to_message_id,
|
|
)
|
|
except Exception:
|
|
return await msg.edit(des)
|
|
await msg.safe_delete()
|
|
safe_remove(photo)
|