import contextlib from PIL import Image from os.path import exists from httpx import ReadTimeout from pagermaid.listener import listener from pagermaid.single_utils import safe_remove from pagermaid.enums import Client, Message, AsyncClient from pagermaid.utils import lang, pip_install pip_install("speedtest-cli", alias="speedtest") from speedtest import ( Speedtest, ShareResultsConnectFailure, ShareResultsSubmitFailure, NoMatchedServers, SpeedtestBestServerFailure, SpeedtestHTTPError, ) def unit_convert(byte): """Converts byte into readable formats.""" power = 1000 zero = 0 units = {0: "", 1: "Kb/s", 2: "Mb/s", 3: "Gb/s", 4: "Tb/s"} while byte > power: byte /= power zero += 1 return f"{round(byte, 2)} {units[zero]}" async def run_speedtest(request: AsyncClient, message: Message): test = Speedtest() server = int(message.arguments) if len(message.parameter) == 1 else None if server: servers = test.get_closest_servers() for i in servers: if i["id"] == str(server): test.servers = [i] break test.get_best_server(servers=test.servers) test.download() test.upload() with contextlib.suppress(ShareResultsConnectFailure): test.results.share() result = test.results.dict() des = ( f"**Speedtest** \n" f"Server: `{result['server']['name']} - " f"{result['server']['cc']}` \n" f"Sponsor: `{result['server']['sponsor']}` \n" f"Upload: `{unit_convert(result['upload'])}` \n" f"Download: `{unit_convert(result['download'])}` \n" f"Latency: `{result['ping']}` \n" f"Timestamp: `{result['timestamp']}`" ) if result["share"]: data = await request.get( result["share"].replace("http:", "https:"), follow_redirects=True ) with open("speedtest.png", mode="wb") as f: f.write(data.content) with contextlib.suppress(Exception): img = Image.open("speedtest.png") c = img.crop((17, 11, 727, 389)) c.save("speedtest.png") return des, "speedtest.png" if exists("speedtest.png") else None async def get_all_ids(): test = Speedtest() servers = test.get_closest_servers() return ( ( "附近的测速点有:\n\n" + "\n".join( f"`{i['id']}` - `{int(i['d'])}km` - `{i['name']}` - `{i['sponsor']}`" for i in servers ), None, ) if servers else ("附近没有测速点", None) ) @listener( command="speedtest", description=lang("speedtest_des"), parameters="(Server ID/测速点列表)", ) async def speedtest(client: Client, message: Message, request: AsyncClient): """Tests internet speed using speedtest.""" if message.arguments == "测速点列表": msg = message else: msg: Message = await message.edit(lang("speedtest_processing")) try: if message.arguments == "测速点列表": des, photo = await get_all_ids() else: des, photo = await run_speedtest(request, message) except SpeedtestHTTPError: return await msg.edit(lang("speedtest_ConnectFailure")) except (ValueError, TypeError): return await msg.edit(lang("arg_error")) except (SpeedtestBestServerFailure, NoMatchedServers): return await msg.edit(lang("speedtest_ServerFailure")) except (ShareResultsSubmitFailure, RuntimeError, ReadTimeout): return await msg.edit(lang("speedtest_ConnectFailure")) if not photo: return await msg.edit(des) try: await client.send_photo( message.chat.id, photo, caption=des, reply_to_message_id=message.reply_to_top_message_id or message.reply_to_message_id, ) except Exception: return await msg.edit(des) await msg.safe_delete() safe_remove(photo)