""" The system module. """ import sys from platform import uname import discord from discord.ext import commands from subprocess import run, PIPE from requests import head from requests.exceptions import MissingSchema, InvalidURL, ConnectionError from pagermaid import color, des_handler, par_handler, redis_status from pagermaid.utils import process_command class Info(commands.Cog): def __init__(self, bot): self.bot = bot @commands.command() async def info(self, ctx): git_hash = run("git rev-parse --short HEAD", stdout=PIPE, shell=True).stdout.decode().strip() get_hash_link = f"https://github.com/Xtao-Labs/PagerMaid-Discord/commit/{git_hash}" dpy_repo = "https://github.com/Rapptz/discord.py" python_url = "https://www.python.org/" app_info = await self.bot.application_info() if app_info.team: owner = app_info.team.name else: owner = app_info.owner dpy_version = "[{}]({})".format(discord.__version__, dpy_repo) python_version = "[{}.{}.{}]({})".format(*sys.version_info[:3], python_url) git_version = "[{}]({})".format(git_hash, get_hash_link) database = '在线' if redis_status() else '离线' embed = discord.Embed(title="PagerMaid-Discord 运行状态", color=color) embed.add_field(name="实例创建者", value=str(owner)) embed.add_field(name="Python", value=python_version) embed.add_field(name="discord.py", value=dpy_version) embed.add_field(name="Git commit", value=git_version) embed.add_field(name="主机名", value=uname().node) embed.add_field(name="主机平台", value=sys.platform) embed.add_field(name="Kernel", value=uname().release) embed.add_field(name="数据库", value=database) embed.add_field(name="服务器数", value=str(len(self.bot.guilds))) embed.add_field(name="交流群", value='[点击加入](https://discord.gg/A4mWpa83e6)') embed.add_field(name="邀请", value=f'[点击邀请](https://discord.com/oauth2/authorize' f'?client_id={app_info.id}&scope=bot)') await ctx.send(embed=embed) @commands.command() async def trace(self, context): message = process_command(context) if len(message.parameters) == 1: url = message.arguments if url.startswith("https://") or url.startswith("http://"): pass else: url = "https://" + url msg = await context.reply('跟踪重定向中...') result = str("") for url in url_tracer(url): count = 0 if result: result += " ↴\n" + url else: result = url if count == 128: result += f"\n\n重定向超过 128 次,已停止追踪。" break if result: if len(result) > 2000: result = result[:2000] await msg.edit(content=f"跟踪重定向:\n" f"{result}") else: await msg.edit(content='服务器连接失败。') else: await context.reply('您好像输入了一个无效的参数。') @commands.command() async def servers(self, context): guilds = sorted(self.bot.guilds, key=lambda s: s.name.lower()) msg = "\n".join(f"{guild.name} (`{guild.id}`)" for guild in guilds) await context.send(msg) des_handler('info', '查看程序信息。') par_handler('info', '') des_handler('trace', '追踪重定向。') par_handler('trace', '') des_handler('servers', '列出所有服务器。') par_handler('servers', '') def setup(bot): bot.add_cog(Info(bot)) def url_tracer(url): """ Method to trace URL redirects. """ while True: yield url try: response = head(url) except MissingSchema: break except InvalidURL: break except ConnectionError: break if 300 < response.status_code < 400: url = response.headers['location'] else: break