import os import re import sys import asyncio import subprocess from asyncio import sleep from git import Repo from pyrogram.types import Message from driver.filters import command from pyrogram import Client, filters from os import system, execle, environ from driver.decorators import sudo_users_only from git.exc import InvalidGitRepositoryError from config import UPSTREAM_REPO, BOT_USERNAME def gen_chlog(repo, diff): upstream_repo_url = Repo().remotes[0].config_reader.get("url").replace(".git", "") ac_br = repo.active_branch.name ch_log = tldr_log = "" ch = f"updates for [{ac_br}]:" ch_tl = f"updates for {ac_br}:" d_form = "%d/%m/%y || %H:%M" for c in repo.iter_commits(diff): ch_log += ( f"\n\nšŸ’¬ {c.count()} šŸ—“ [{c.committed_datetime.strftime(d_form)}]\n" f"[{c.summary}] šŸ‘Øā€šŸ’» {c.author}" ) tldr_log += f"\n\nšŸ’¬ {c.count()} šŸ—“ [{c.committed_datetime.strftime(d_form)}]\n[{c.summary}] šŸ‘Øā€šŸ’» {c.author}" if ch_log: return str(ch + ch_log), str(ch_tl + tldr_log) return ch_log, tldr_log def updater(): try: repo = Repo() except InvalidGitRepositoryError: repo = Repo.init() origin = repo.create_remote("upstream", UPSTREAM_REPO) origin.fetch() repo.create_head("main", origin.refs.main) repo.heads.main.set_tracking_branch(origin.refs.main) repo.heads.main.checkout(True) ac_br = repo.active_branch.name if "upstream" in repo.remotes: ups_rem = repo.remote("upstream") else: ups_rem = repo.create_remote("upstream", UPSTREAM_REPO) ups_rem.fetch(ac_br) changelog, tl_chnglog = gen_chlog(repo, f"HEAD..upstream/{ac_br}") return bool(changelog) @Client.on_message(command(["update", f"update@{BOT_USERNAME}"]) & ~filters.edited) @sudo_users_only async def update_repo(_, message: Message): chat_id = message.chat.id msg = await message.reply("šŸ”„ `processing update...`") update_avail = updater() if update_avail: await msg.edit("āœ… update finished\n\nā€¢ bot restarted, back active again in 1 minutes.") system("git pull -f && pip3 install --no-cache-dir -r requirements.txt") execle(sys.executable, sys.executable, "main.py", environ) return await msg.edit("bot is **up-to-date** with [main](https://github.com/levina-lab/video-stream/tree/main)", disable_web_page_preview=True) @Client.on_message(command(["restart", f"restart@{BOT_USERNAME}"]) & ~filters.edited) @sudo_users_only async def restart_bot(_, message: Message): msg = await message.reply("`restarting bot...`") args = [sys.executable, "main.py"] await msg.edit("āœ… bot restarted\n\nā€¢ now you can use this bot again.") execle(sys.executable, *args, environ) return