mirror of
https://github.com/TeamPGM/PagerMaid-Pyro.git
synced 2024-11-24 21:13:36 +00:00
0c87310f77
✨ Support custom apt source
326 lines
13 KiB
Python
326 lines
13 KiB
Python
""" PagerMaid module to manage plugins. """
|
|
|
|
import contextlib
|
|
from os import remove, path, sep
|
|
from os.path import exists
|
|
from re import search, I
|
|
from shutil import copyfile, move
|
|
|
|
from pagermaid import log, working_dir
|
|
from pagermaid.common.plugin import plugin_remote_manager, plugin_manager
|
|
from pagermaid.common.reload import reload_all
|
|
from pagermaid.enums import Message
|
|
from pagermaid.listener import listener
|
|
from pagermaid.utils import upload_attachment, lang
|
|
|
|
|
|
def remove_plugin(name):
|
|
plugin_directory = f"{working_dir}{sep}plugins{sep}"
|
|
with contextlib.suppress(FileNotFoundError):
|
|
remove(f"{plugin_directory}{name}.py")
|
|
with contextlib.suppress(FileNotFoundError):
|
|
remove(f"{plugin_directory}{name}.py.disabled")
|
|
|
|
|
|
def move_plugin(file_path):
|
|
name = path.basename(file_path)[:-3]
|
|
plugin_directory = f"{working_dir}{sep}plugins{sep}"
|
|
remove_plugin(name)
|
|
move(file_path, plugin_directory)
|
|
|
|
|
|
@listener(
|
|
is_plugin=False,
|
|
outgoing=True,
|
|
command="apt",
|
|
need_admin=True,
|
|
diagnostics=False,
|
|
description=lang("apt_des"),
|
|
parameters=lang("apt_parameters"),
|
|
)
|
|
async def plugin(message: Message):
|
|
if len(message.parameter) == 0:
|
|
await message.edit(lang("arg_error"))
|
|
return
|
|
reply = message.reply_to_message
|
|
plugin_directory = f"{working_dir}{sep}plugins{sep}"
|
|
if message.parameter[0] == "install":
|
|
if len(message.parameter) == 1:
|
|
file_path = await plugin_manager.download_from_message(message)
|
|
if file_path is None:
|
|
await message.edit(lang("apt_no_py"))
|
|
return
|
|
plugin_name = path.basename(file_path)[:-3]
|
|
plugin_manager.remove_plugin(plugin_name)
|
|
move_plugin(file_path)
|
|
await message.edit(
|
|
f"<b>{lang('apt_name')}</b>\n\n"
|
|
f"{lang('apt_plugin')} "
|
|
f"{plugin_name} {lang('apt_installed')}"
|
|
)
|
|
await log(f"{lang('apt_install_success')} {plugin_name}.")
|
|
await reload_all()
|
|
elif len(message.parameter) >= 2:
|
|
await plugin_manager.load_remote_plugins()
|
|
process_list = message.parameter
|
|
message = await message.edit(lang("apt_processing"))
|
|
del process_list[0]
|
|
success_list = []
|
|
failed_list = []
|
|
no_need_list = []
|
|
for i in process_list:
|
|
if plugin_manager.get_remote_plugin(i):
|
|
local_temp = plugin_manager.get_local_plugin(i)
|
|
if local_temp and not plugin_manager.plugin_need_update(i):
|
|
no_need_list.append(i)
|
|
else:
|
|
try:
|
|
if await plugin_manager.install_remote_plugin(i):
|
|
success_list.append(i)
|
|
else:
|
|
failed_list.append(i)
|
|
except Exception:
|
|
failed_list.append(i)
|
|
else:
|
|
failed_list.append(i)
|
|
text = f"<b>{lang('apt_name')}</b>\n\n"
|
|
if len(success_list) > 0:
|
|
text += lang("apt_install_success") + " : %s\n" % ", ".join(
|
|
success_list
|
|
)
|
|
if len(failed_list) > 0:
|
|
text += lang("apt_not_found") + " %s\n" % ", ".join(failed_list)
|
|
if len(no_need_list) > 0:
|
|
text += lang("apt_no_update") + " %s\n" % ", ".join(no_need_list)
|
|
await log(text)
|
|
restart = len(success_list) > 0
|
|
await message.edit(text)
|
|
if restart:
|
|
await reload_all()
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "remove":
|
|
if len(message.parameter) == 2:
|
|
if plugin_manager.remove_plugin(message.parameter[1]):
|
|
await message.edit(
|
|
f"{lang('apt_remove_success')} {message.parameter[1]}"
|
|
)
|
|
await log(f"{lang('apt_remove')} {message.parameter[1]}.")
|
|
await reload_all()
|
|
elif "/" in message.parameter[1]:
|
|
await message.edit(lang("arg_error"))
|
|
else:
|
|
await message.edit(lang("apt_not_exist"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "status":
|
|
if len(message.parameter) == 1:
|
|
(
|
|
active_plugins,
|
|
disabled_plugins,
|
|
inactive_plugins,
|
|
) = plugin_manager.get_plugins_status()
|
|
active_plugins_string = ", ".join(active_plugins)
|
|
inactive_plugins_string = ", ".join([i.name for i in inactive_plugins])
|
|
disabled_plugins_string = ", ".join([i.name for i in disabled_plugins])
|
|
if len(active_plugins) == 0:
|
|
active_plugins_string = f"`{lang('apt_no_running_plugins')}`"
|
|
if len(inactive_plugins) == 0:
|
|
inactive_plugins_string = f"`{lang('apt_no_load_failed_plugins')}`"
|
|
if len(disabled_plugins) == 0:
|
|
disabled_plugins_string = f"`{lang('apt_no_disabled_plugins')}`"
|
|
output = (
|
|
f"**{lang('apt_plugin_list')}**\n"
|
|
f"{lang('apt_plugin_running')}: {active_plugins_string}\n"
|
|
f"{lang('apt_plugin_disabled')}: {disabled_plugins_string}\n"
|
|
f"{lang('apt_plugin_failed')}: {inactive_plugins_string}"
|
|
)
|
|
await message.edit(output)
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "enable":
|
|
if len(message.parameter) == 2:
|
|
if plugin_manager.enable_plugin(message.parameter[1]):
|
|
await message.edit(
|
|
f"{lang('apt_plugin')} {message.parameter[1]} "
|
|
f"{lang('apt_enable')}"
|
|
)
|
|
await log(f"{lang('apt_enable')} {message.parameter[1]}.")
|
|
await reload_all()
|
|
else:
|
|
await message.edit(lang("apt_not_exist"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "disable":
|
|
if len(message.parameter) == 2:
|
|
if plugin_manager.disable_plugin(message.parameter[1]):
|
|
await message.edit(
|
|
f"{lang('apt_plugin')} {message.parameter[1]} "
|
|
f"{lang('apt_disable')}"
|
|
)
|
|
await log(f"{lang('apt_disable')} {message.parameter[1]}.")
|
|
await reload_all()
|
|
else:
|
|
await message.edit(lang("apt_not_exist"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "upload":
|
|
if len(message.parameter) == 2:
|
|
file_name = f"{message.parameter[1]}.py"
|
|
reply_id = None
|
|
if reply:
|
|
reply_id = reply.id
|
|
if exists(f"{plugin_directory}{file_name}"):
|
|
copyfile(f"{plugin_directory}{file_name}", file_name)
|
|
elif exists(f"{plugin_directory}{file_name}.disabled"):
|
|
copyfile(f"{plugin_directory}{file_name}.disabled", file_name)
|
|
if exists(file_name):
|
|
try:
|
|
await message.edit(lang("apt_uploading"))
|
|
await upload_attachment(
|
|
file_name,
|
|
message.chat.id,
|
|
reply_id,
|
|
thumb=f"pagermaid{sep}assets{sep}logo.jpg",
|
|
caption=f"<b>{lang('apt_name')}</b>\n\n"
|
|
f"PagerMaid-Pyro {message.parameter[1]} plugin.",
|
|
)
|
|
await message.safe_delete()
|
|
finally:
|
|
remove(file_name)
|
|
else:
|
|
await message.edit(lang("apt_not_exist"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "update":
|
|
if not exists(f"{plugin_directory}version.json"):
|
|
await message.edit(lang("apt_why_not_install_a_plugin"))
|
|
return
|
|
await plugin_manager.load_remote_plugins()
|
|
updated_plugins = [
|
|
i.name for i in await plugin_manager.update_all_remote_plugin() if i
|
|
]
|
|
if len(updated_plugins) == 0:
|
|
await message.edit(
|
|
f"<b>{lang('apt_name')}</b>\n\n"
|
|
+ lang("apt_loading_from_online_but_nothing_need_to_update")
|
|
)
|
|
else:
|
|
message = await message.edit(lang("apt_loading_from_online_and_updating"))
|
|
await message.edit(
|
|
f"<b>{lang('apt_name')}</b>\n\n"
|
|
+ lang("apt_reading_list")
|
|
+ "\n"
|
|
+ "、".join(updated_plugins)
|
|
)
|
|
await reload_all()
|
|
elif message.parameter[0] == "search":
|
|
if len(message.parameter) == 1:
|
|
await message.edit(lang("apt_search_no_name"))
|
|
elif len(message.parameter) == 2:
|
|
await plugin_manager.load_remote_plugins()
|
|
search_result = []
|
|
plugin_name = message.parameter[1]
|
|
for i in plugin_manager.remote_plugins:
|
|
if search(plugin_name, i.name, I):
|
|
search_result.append(f"`{i.name}` / `{i.version}`\n {i.des_short}")
|
|
if len(search_result) == 0:
|
|
await message.edit(lang("apt_search_not_found"))
|
|
else:
|
|
await message.edit(
|
|
f"{lang('apt_search_result_hint')}:\n\n"
|
|
+ "\n\n".join(search_result)
|
|
)
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
elif message.parameter[0] == "show":
|
|
if len(message.parameter) == 1:
|
|
await message.edit(lang("apt_search_no_name"))
|
|
elif len(message.parameter) == 2:
|
|
await plugin_manager.load_remote_plugins()
|
|
search_result = ""
|
|
plugin_name = message.parameter[1]
|
|
for i in plugin_manager.remote_plugins:
|
|
if plugin_name == i.name:
|
|
if i.supported:
|
|
search_support = lang("apt_search_supporting")
|
|
else:
|
|
search_support = lang("apt_search_not_supporting")
|
|
search_result = (
|
|
f"{lang('apt_plugin_name')}:`{i.name}`\n"
|
|
f"{lang('apt_plugin_ver')}:`Ver {i.version}`\n"
|
|
f"{lang('apt_plugin_section')}:`{i.section}`\n"
|
|
f"{lang('apt_plugin_maintainer')}:`{i.maintainer}`\n"
|
|
f"{lang('apt_plugin_size')}:`{i.size}`\n"
|
|
f"{lang('apt_plugin_support')}:{search_support}\n"
|
|
f"{lang('apt_plugin_des_short')}:{i.des_short}"
|
|
)
|
|
break
|
|
if search_result == "":
|
|
await message.edit(lang("apt_search_not_found"))
|
|
else:
|
|
await message.edit(search_result)
|
|
elif message.parameter[0] == "export":
|
|
if not exists(f"{plugin_directory}version.json"):
|
|
await message.edit(lang("apt_why_not_install_a_plugin"))
|
|
return
|
|
message = await message.edit(lang("stats_loading"))
|
|
list_plugin = []
|
|
for key, value in plugin_manager.version_map.items():
|
|
if not value:
|
|
continue
|
|
list_plugin.append(key)
|
|
if len(list_plugin) == 0:
|
|
await message.edit(lang("apt_why_not_install_a_plugin"))
|
|
else:
|
|
await message.edit(",apt install " + " ".join(list_plugin))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
|
|
|
|
@listener(
|
|
is_plugin=False,
|
|
outgoing=True,
|
|
command="apt_source",
|
|
need_admin=True,
|
|
description=lang("apt_source_des"),
|
|
parameters=lang("apt_source_parameters"),
|
|
)
|
|
async def apt_source(message: Message):
|
|
if len(message.parameter) == 0:
|
|
remotes = plugin_remote_manager.get_remotes()
|
|
if len(remotes) == 0:
|
|
await message.edit(lang("apt_source_not_found"))
|
|
return
|
|
await message.edit(
|
|
f"{lang('apt_source_header')}\n\n" + "\n".join([i.text for i in remotes]),
|
|
disable_web_page_preview=True,
|
|
)
|
|
elif len(message.parameter) == 2:
|
|
url = message.parameter[1]
|
|
if not url.endswith("/"):
|
|
url += "/"
|
|
if message.parameter[0] == "add":
|
|
try:
|
|
status = await plugin_manager.fetch_remote_url(url)
|
|
except Exception:
|
|
status = False
|
|
if status:
|
|
if plugin_remote_manager.add_remote(url):
|
|
await message.edit(lang("apt_source_add_success"))
|
|
await plugin_manager.load_remote_plugins(enable_cache=False)
|
|
else:
|
|
await message.edit(lang("apt_source_add_failed"))
|
|
else:
|
|
await message.edit(lang("apt_source_add_invalid"))
|
|
elif message.parameter[0] == "del":
|
|
if plugin_remote_manager.remove_remote(url):
|
|
await message.edit(lang("apt_source_del_success"))
|
|
await plugin_manager.load_remote_plugins(enable_cache=False)
|
|
else:
|
|
await message.edit(lang("apt_source_del_failed"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|
|
else:
|
|
await message.edit(lang("arg_error"))
|