PagerMaid-Pyro/pagermaid/utils.py

216 lines
6.7 KiB
Python
Raw Permalink Normal View History

2022-06-20 13:55:14 +00:00
import contextlib
2022-05-23 12:40:30 +00:00
import subprocess
from importlib.util import find_spec
from os.path import exists
from typing import Optional
import httpx
from os import remove
from sys import executable
from asyncio import create_subprocess_shell, sleep
from asyncio.subprocess import PIPE
from pyrogram import filters
2022-05-23 12:40:30 +00:00
from pagermaid.config import Config
from pagermaid import bot
from pagermaid.group_manager import enforce_permission
2022-06-07 12:44:45 +00:00
from pagermaid.single_utils import _status_sudo, get_sudo_list, Message, sqlite
2022-05-23 12:40:30 +00:00
def lang(text: str) -> str:
2023-03-12 03:56:01 +00:00
"""i18n"""
2023-06-18 03:13:16 +00:00
return Config.lang_dict.get(text, Config.lang_default_dict.get(text, text))
2022-05-23 12:40:30 +00:00
def alias_command(command: str, disallow_alias: bool = False) -> str:
2023-03-12 03:56:01 +00:00
"""alias"""
2022-06-20 13:55:14 +00:00
return command if disallow_alias else Config.alias_dict.get(command, command)
2022-05-23 12:40:30 +00:00
async def attach_report(plaintext, file_name, reply_id=None, caption=None):
2023-03-12 03:56:01 +00:00
"""Attach plaintext as logs."""
2022-06-20 13:55:14 +00:00
with open(file_name, "w+") as file:
file.write(plaintext)
2022-05-23 12:40:30 +00:00
try:
await bot.send_document(
"PagerMaid_Modify_bot",
file_name,
reply_to_message_id=reply_id,
2023-03-12 03:56:01 +00:00
caption=caption,
2022-05-23 12:40:30 +00:00
)
2022-06-06 09:30:27 +00:00
except Exception: # noqa
2022-05-23 12:40:30 +00:00
return
remove(file_name)
async def attach_log(plaintext, chat_id, file_name, reply_id=None, caption=None):
2023-03-12 03:56:01 +00:00
"""Attach plaintext as logs."""
with open(file_name, "w+", encoding="utf-8") as file:
2022-06-20 13:55:14 +00:00
file.write(plaintext)
2022-05-23 12:40:30 +00:00
await bot.send_document(
2023-03-12 03:56:01 +00:00
chat_id, file_name, reply_to_message_id=reply_id, caption=caption
2022-05-23 12:40:30 +00:00
)
remove(file_name)
2024-02-04 07:33:01 +00:00
async def upload_attachment(file_path, chat_id, reply_id, message_thread_id=None, caption=None, thumb=None):
2023-03-12 03:56:01 +00:00
"""Uploads a local attachment file."""
2022-05-23 12:40:30 +00:00
if not exists(file_path):
return False
try:
await bot.send_document(
chat_id,
file_path,
2024-02-04 07:33:01 +00:00
message_thread_id=message_thread_id,
2022-05-23 12:40:30 +00:00
thumb=thumb,
reply_to_message_id=reply_id,
2023-03-12 03:56:01 +00:00
caption=caption,
2022-05-23 12:40:30 +00:00
)
except BaseException as exception:
raise exception
return True
async def execute(command, pass_error=True):
2023-03-12 03:56:01 +00:00
"""Executes command and returns output, with the option of enabling stderr."""
2022-05-23 12:40:30 +00:00
executor = await create_subprocess_shell(
2023-03-12 03:56:01 +00:00
command, stdout=PIPE, stderr=PIPE, stdin=PIPE
2022-05-23 12:40:30 +00:00
)
stdout, stderr = await executor.communicate()
if pass_error:
try:
2023-03-12 03:56:01 +00:00
result = str(stdout.decode().strip()) + str(stderr.decode().strip())
2022-05-23 12:40:30 +00:00
except UnicodeDecodeError:
2023-03-12 03:56:01 +00:00
result = str(stdout.decode("gbk").strip()) + str(
stderr.decode("gbk").strip()
)
2022-05-23 12:40:30 +00:00
else:
try:
result = str(stdout.decode().strip())
except UnicodeDecodeError:
2023-03-12 03:56:01 +00:00
result = str(stdout.decode("gbk").strip())
2022-05-23 12:40:30 +00:00
return result
2023-03-12 03:56:01 +00:00
def pip_install(
package: str, version: Optional[str] = "", alias: Optional[str] = ""
) -> bool:
"""Auto install extra pypi packages"""
2022-05-23 12:40:30 +00:00
if not alias:
# when import name is not provided, use package name
alias = package
if find_spec(alias) is None:
subprocess.call([executable, "-m", "pip", "install", f"{package}{version}"])
if find_spec(package) is None:
return False
return True
2023-03-12 03:56:01 +00:00
async def edit_delete(
message: Message,
text: str,
time: int = 5,
parse_mode: Optional["enums.ParseMode"] = None,
disable_web_page_preview: bool = None,
):
2022-05-23 12:40:30 +00:00
sudo_users = get_sudo_list()
from_id = message.from_user.id if message.from_user else message.sender_chat.id
if from_id in sudo_users:
reply_to = message.reply_to_message
event = (
2023-03-12 03:56:01 +00:00
await reply_to.reply(
text,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
)
2022-05-23 12:40:30 +00:00
if reply_to
else await message.reply(
2023-03-12 03:56:01 +00:00
text,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
2022-05-23 12:40:30 +00:00
)
)
else:
event = await message.edit(
2023-03-12 03:56:01 +00:00
text,
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
2022-05-23 12:40:30 +00:00
)
await sleep(time)
return await event.delete()
def get_permission_name(is_plugin: bool, need_admin: bool, command: str) -> str:
2023-03-12 03:56:01 +00:00
"""Get permission name."""
2022-05-23 12:40:30 +00:00
if is_plugin:
2022-06-20 13:55:14 +00:00
return f"plugins_root.{command}" if need_admin else f"plugins.{command}"
2022-05-23 12:40:30 +00:00
else:
2022-06-20 13:55:14 +00:00
return f"system.{command}" if need_admin else f"modules.{command}"
2022-05-23 12:40:30 +00:00
def sudo_filter(permission: str):
async def if_sudo(flt, _, message: Message):
if not _status_sudo():
return False
try:
2023-03-12 03:56:01 +00:00
from_id = (
message.from_user.id if message.from_user else message.sender_chat.id
)
2022-05-23 12:40:30 +00:00
sudo_list = get_sudo_list()
if from_id not in sudo_list:
if message.chat.id in sudo_list:
return enforce_permission(message.chat.id, flt.permission)
return False
return enforce_permission(from_id, flt.permission)
2022-06-06 09:30:27 +00:00
except Exception: # noqa
2022-05-23 12:40:30 +00:00
return False
return filters.create(if_sudo, permission=permission)
def from_self(message: Message) -> bool:
if message.outgoing:
return True
2022-08-14 04:44:43 +00:00
return message.from_user.is_self if message.from_user else False
2022-05-23 12:40:30 +00:00
2022-06-06 09:30:27 +00:00
def from_msg_get_sudo_uid(message: Message) -> int:
2023-03-12 03:56:01 +00:00
"""Get the sudo uid from the message."""
2022-06-06 09:30:27 +00:00
from_id = message.from_user.id if message.from_user else message.sender_chat.id
2022-08-14 04:44:43 +00:00
return from_id if from_id in get_sudo_list() else message.chat.id
2022-06-06 09:30:27 +00:00
def check_manage_subs(message: Message) -> bool:
2023-03-12 03:56:01 +00:00
return from_self(message) or enforce_permission(
from_msg_get_sudo_uid(message), "modules.manage_subs"
)
2022-06-06 09:30:27 +00:00
2022-06-07 12:44:45 +00:00
async def process_exit(start: int, _client, message=None):
data = sqlite.get("exit_msg", {})
cid, mid = data.get("cid", 0), data.get("mid", 0)
if start and data and cid and mid:
2022-07-03 10:07:55 +00:00
with contextlib.suppress(Exception):
msg: Message = await _client.get_messages(cid, mid)
if msg:
2022-06-20 13:55:14 +00:00
await msg.edit(
2023-03-12 03:56:01 +00:00
(
(msg.text or msg.caption)
if msg.from_user.is_self and (msg.text or msg.caption)
else ""
)
+ f'\n\n> {lang("restart_complete")}'
)
2022-06-07 12:44:45 +00:00
del sqlite["exit_msg"]
if message:
sqlite["exit_msg"] = {"cid": message.chat.id, "mid": message.id}
2022-05-23 12:40:30 +00:00
""" Init httpx client """
# 使用自定义 UA
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.72 Safari/537.36"
2022-05-23 12:40:30 +00:00
}
client = httpx.AsyncClient(timeout=10.0, headers=headers)