""" System related utilities for PagerMaid to integrate into the system. """ from platform import node from getpass import getuser from os import geteuid from requests import head from requests.exceptions import MissingSchema, InvalidURL, ConnectionError from pagermaid import log from pagermaid.listener import listener from pagermaid.utils import attach_log, execute @listener(outgoing=True, command="sh", description="在 Telegram 上执行 Shell 命令。", parameters="<命令>") async def sh(context): """ Use the command-line from Telegram. """ user = getuser() command = context.arguments hostname = node() if context.is_channel and not context.is_group: await context.edit("`出错了呜呜呜 ~ 当前配置禁止在频道中执行 Shell 命令。`") return if not command: await context.edit("`出错了呜呜呜 ~ 无效的参数。`") return if geteuid() == 0: await context.edit( f"`{user}`@{hostname} ~" f"\n> `#` {command}" ) else: await context.edit( f"`{user}`@{hostname} ~" f"\n> `$` {command}" ) result = await execute(command) if result: if len(result) > 4096: await attach_log(result, context.chat_id, "output.log", context.id) return if geteuid() == 0: await context.edit( f"`{user}`@{hostname} ~" f"\n> `#` {command}" f"\n`{result}`" ) else: await context.edit( f"`{user}`@{hostname} ~" f"\n> `$` {command}" f"\n`{result}`" ) else: return await log(f"在 Shell 中执行命令 `{command}`") @listener(outgoing=True, command="restart", diagnostics=False, description="使 PagerMaid 重新启动") async def restart(context): """ To re-execute PagerMaid. """ if not context.text[0].isalpha(): await context.edit("尝试重新启动系统。") await log("PagerMaid-Modify 重新启动。") await context.client.disconnect() @listener(outgoing=True, command="trace", description="跟踪 URL 的重定向。", parameters="") async def trace(context): """ Trace URL redirects. """ url = context.arguments reply = await context.get_reply_message() if reply: url = reply.text if url: if url.startswith("https://") or url.startswith("http://"): pass else: url = "https://" + url await context.edit("跟踪重定向中 . . .") result = str("") for url in url_tracer(url): count = 0 if result: result += " ↴\n" + url else: result = url if count == 128: result += "\n\n出错了呜呜呜 ~ 超过128次重定向,正在中止!" break if result: if len(result) > 4096: await context.edit("输出超出限制,正在附加文件。") await attach_log(result, context.chat_id, "output.log", context.id) return await context.edit( "重定向:\n" f"{result}" ) await log(f"Traced redirects of {context.arguments}.") else: await context.edit( "出错了呜呜呜 ~ 发出HTTP请求时出了点问题。" ) else: await context.edit("无效的参数。") @listener(outgoing=True, command="contact", description="向 Kat 发送消息。", parameters="") async def contact(context): """ Sends a message to Kat. """ await context.edit("`对话已打开,请单击 `[here](tg://user?id=503691334)` 进入.`", parse_mode="markdown") message = "Hi, I would like to report something about PagerMaid." if context.arguments: message = context.arguments await context.client.send_message( 503691334, message ) def url_tracer(url): """ Method to trace URL redirects. """ while True: yield url try: response = head(url) except MissingSchema: break except InvalidURL: break except ConnectionError: break if 300 < response.status_code < 400: url = response.headers['location'] else: break