""" Video + Music Stream Telegram Bot Copyright (c) 2022-present levina=lab This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but without any warranty; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see """ import os import shlex from typing import Tuple from git.exc import InvalidGitRepositoryError, GitCommandError from git import Repo from pyrogram.types import Message from pyrogram import Client, filters from program import LOGS from config import UPSTREAM_BRANCH, UPSTREAM_REPO, BOT_USERNAME from driver.filters import command from driver.decorators import bot_creator from driver.database.dbqueue import get_active_chats, remove_active_chat async def install_requirements(cmd: str) -> Tuple[str, str, int, int]: args = shlex.split(cmd) process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() return ( stdout.decode("utf-8", "replace").strip(), stderr.decode("utf-8", "replace").strip(), process.returncode, process.pid, ) @Client.on_message(command(["update", f"update@{BOT_USERNAME}"]) & ~filters.edited) @bot_creator async def update_bot(_, message: Message): chat_id = message.chat.id msg = await bot.send_message(chat_id, "❖ Checking updates...") try: repo = Repo() except GitCommandError: await msg.edit("Git command error !") return except InvalidGitRepositoryError: await msg.edit("❖ Checking Git updates...") repo = Repo.init() if "origin" in repo.remotes: origin = repo.remote("origin") else: origin = repo.create_remote("origin", UPSTREAM_REPO) origin.fetch() repo.create_head(UPSTREAM_BRANCH, origin.refs[UPSTREAM_BRANCH]) repo.heads[UPSTREAM_BRANCH].set_tracking_branch( origin.refs[UPSTREAM_BRANCH] ) repo.heads[UPSTREAM_BRANCH].checkout(True) try: repo.create_remote("origin", UPSTREAM_REPO) except BaseException: pass nrs = repo.remote("origin") nrs.fetch(UPSTREAM_BRANCH) try: nrs.pull(UPSTREAM_BRANCH) except GitCommandError: repo.git.reset("--hard", "FETCH_HEAD") await install_requirements( "pip3 install --no-cache-dir -r requirements.txt" ) await msg.edit("Done, bot has updated !\n\n> now the bot will be rebooted to apply the update and will active again in 5-10 second(s).") return served_chats = [] try: chats = await get_active_chats() for chat in chats: served_chats.append(int(chat["chat_id"])) except BaseException: pass for x in served_chats: try: await remove_active_chat(x) except BaseException: pass return os.system(f"kill -9 {os.getpid()} && python3 main.py") @Client.on_message(command(["restart", f"restart@{BOT_USERNAME}"]) & ~filters.edited) @bot_creator async def restart_bot(_, message: Message): try: msg = await message.reply_text("❖ Restarting bot...") LOGS.info("[INFO]: >> BOT SERVER RESTARTED <<") except BaseException as err: LOGS.info(f"[ERROR]: {err}") return await msg.edit_text("✅ Bot has restarted !\n\n» back active again in 5-10 seconds.") os.system(f"kill -9 {os.getpid()} && python3 main.py")