video-stream/program/start.py
2022-03-02 21:01:45 +08:00

252 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Video + Music Stream Telegram Bot
Copyright (c) 2022-present levina=lab <https://github.com/levina-lab>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but without any warranty; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/licenses.html>
"""
import asyncio
from datetime import datetime
from sys import version_info
from time import time
from config import (
ALIVE_IMG,
ALIVE_NAME,
BOT_USERNAME,
GROUP_SUPPORT,
OWNER_USERNAME,
UPDATES_CHANNEL,
)
from driver.decorators import check_blacklist
from program import __version__, LOGS
from driver.core import bot, me_bot, me_user
from driver.filters import command
from driver.database.dbchat import add_served_chat, is_served_chat
from driver.database.dbpunish import is_gbanned_user
from driver.database.dbusers import add_served_user, is_served_user
from driver.database.dblockchat import blacklisted_chats
from pyrogram import Client, filters, __version__ as pyrover
from pyrogram.errors import FloodWait, ChatAdminRequired
from pytgcalls import (__version__ as pytover)
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, ChatJoinRequest
__major__ = 0
__minor__ = 2
__micro__ = 1
__python_version__ = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"
START_TIME = datetime.utcnow()
START_TIME_ISO = START_TIME.replace(microsecond=0).isoformat()
TIME_DURATION_UNITS = (
("week", 60 * 60 * 24 * 7),
("day", 60 * 60 * 24),
("hour", 60 * 60),
("min", 60),
("sec", 1),
)
async def _human_time_duration(seconds):
if seconds == 0:
return "inf"
parts = []
for unit, div in TIME_DURATION_UNITS:
amount, seconds = divmod(int(seconds), div)
if amount > 0:
parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s"))
return ", ".join(parts)
@Client.on_message(
command(["start", f"start@{BOT_USERNAME}"]) & filters.private & ~filters.edited
)
@check_blacklist()
async def start_(c: Client, message: Message):
user_id = message.from_user.id
if await is_served_user(user_id):
pass
else:
await add_served_user(user_id)
return
await message.reply_text(
f"""✨ **欢迎 {message.from_user.mention()} !**\n
💭 [{me_bot.first_name}](https://t.me/{BOT_USERNAME}) **可以让你通过 Telegram 的视频聊天功能在群组中播放音乐和视频!**
💡 **要了解机器人的所有命令,请点击 » 📚 命令**
🔖 **要知道如何使用这个机器人,请点击 » ❓ 基础教程**
""",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
" 邀请我到你的群组 ",
url=f"https://t.me/{BOT_USERNAME}?startgroup=true",
)
],
[InlineKeyboardButton("❓ 基础教程", callback_data="user_guide")],
[
InlineKeyboardButton("📚 命令", callback_data="command_list"),
InlineKeyboardButton("❤️ 捐赠", url=f"https://t.me/{OWNER_USERNAME}"),
],
[
InlineKeyboardButton(
"👥 官方群组", url=f"https://t.me/{GROUP_SUPPORT}"
),
InlineKeyboardButton(
"📣 官方频道", url=f"https://t.me/{UPDATES_CHANNEL}"
),
],
[
InlineKeyboardButton(
"🌐 源代码", url="https://gitlab.com/Xtao-Labs/video-stream"
)
],
]
),
disable_web_page_preview=True,
)
@Client.on_message(
command(["alive", f"alive@{BOT_USERNAME}"]) & filters.group & ~filters.edited
)
@check_blacklist()
async def alive(c: Client, message: Message):
chat_id = message.chat.id
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton("✨ 群组", url=f"https://t.me/{GROUP_SUPPORT}"),
InlineKeyboardButton(
"📣 频道", url=f"https://t.me/{UPDATES_CHANNEL}"
),
]
]
)
alive = f"**你好 {message.from_user.mention()},我是 {me_bot.first_name}**\n\n" \
f"🧑🏼‍💻 维护者:[{ALIVE_NAME}](https://t.me/{OWNER_USERNAME})\n" \
f"👾 Bot 版本:`v{__version__}`\n" \
f"🔥 Pyrogram 版本:`{pyrover}`\n" \
f"🐍 Python 版本:`{__python_version__}`\n" \
f"✨ PyTgCalls 版本:`{pytover.__version__}`\n" \
f"🆙 在线时间:`{uptime}`\n" \
f"\n" \
f"❤ **感谢使用本机器人**"
await c.send_photo(
chat_id,
photo=f"{ALIVE_IMG}",
caption=alive,
reply_markup=keyboard,
)
@Client.on_message(command(["ping", f"ping@{BOT_USERNAME}"]) & ~filters.edited)
@check_blacklist()
async def ping_pong(c: Client, message: Message):
start = time()
m_reply = await message.reply_text("pinging...")
delta_ping = time() - start
await m_reply.edit_text("🏓 `PONG!!`\n" f"⚡️ `{delta_ping * 1000:.3f} ms`")
@Client.on_message(command(["uptime", f"uptime@{BOT_USERNAME}"]) & ~filters.edited)
@check_blacklist()
async def get_uptime(c: Client, message: Message):
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
await message.reply_text(
"🤖 在线状态:\n"
f"• **在线时间:**`{uptime}`\n"
f"• **开启时间:**`{START_TIME_ISO}`"
)
@Client.on_chat_join_request()
async def approve_join_chat(c: Client, m: ChatJoinRequest):
if not m.from_user:
return
try:
await c.approve_chat_join_request(m.chat.id, m.from_user.id)
except FloodWait as e:
await asyncio.sleep(e.x + 2)
await c.approve_chat_join_request(m.chat.id, m.from_user.id)
@Client.on_message(filters.new_chat_members)
async def new_chat(c: Client, m: Message):
chat_id = m.chat.id
if await is_served_chat(chat_id):
pass
else:
await add_served_chat(chat_id)
for member in m.new_chat_members:
try:
if member.id == me_bot.id:
if chat_id in await blacklisted_chats():
await m.reply_text(
"❗️这个群组已经被维护者拉黑!"
)
return await bot.leave_chat(chat_id)
if member.id == me_bot.id:
return await m.reply(
"❤️ 感谢添加我到你的群组!\n\n"
"请先添加我为管理员,否则我可能无法正常工作。并且不要忘记发送 `/userbotjoin` 来邀请 userbot 进群。\n\n"
"使用命令 `/reload` 来刷新管理员列表。",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton("📣 官方频道", url=f"https://t.me/{UPDATES_CHANNEL}"),
InlineKeyboardButton("💭 官方群组", url=f"https://t.me/{GROUP_SUPPORT}")
], [
InlineKeyboardButton("👤 userbot", url=f"https://t.me/{me_user.username}")
]
]
)
)
return
except BaseException:
return
chat_watcher_group = 5
@Client.on_message(group=chat_watcher_group)
async def chat_watcher_func(_, message: Message):
userid = message.from_user.id
suspect = f"[{message.from_user.first_name}](tg://user?id={message.from_user.id})"
if await is_gbanned_user(userid):
try:
await message.chat.ban_member(userid)
except ChatAdminRequired:
LOGS.info(f"can't remove gbanned user from chat: {message.chat.id}")
return
await message.reply_text(
f"👮🏼 (> {suspect} <)\n\n"
f"**已封禁全局封禁用户**\n\n"
f"🚫 **原因:** spam"
)