merge pull request #168 from levina-lab/calls

improvements
This commit is contained in:
levina 2022-02-21 22:13:46 +07:00 committed by GitHub
commit b1f8749e76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 12 deletions

View File

@ -187,6 +187,7 @@ async def admin_set(_, query: CallbackQuery):
» /userbotjoin - invite the userbot to join group » /userbotjoin - invite the userbot to join group
» /userbotleave - order userbot to leave from group » /userbotleave - order userbot to leave from group
» /startvc - start/restart the group call » /startvc - start/restart the group call
» /stopvc - stop/discard the group call
__Powered by {BOT_NAME} AI__""", __Powered by {BOT_NAME} AI__""",
reply_markup=InlineKeyboardMarkup( reply_markup=InlineKeyboardMarkup(

View File

@ -2,6 +2,8 @@ import asyncio
from config import BOT_USERNAME, SUDO_USERS from config import BOT_USERNAME, SUDO_USERS
from program.utils.function import get_calls
from driver.core import user, me_bot from driver.core import user, me_bot
from driver.filters import command, other_filters from driver.filters import command, other_filters
from driver.database.dbchat import remove_served_chat from driver.database.dbchat import remove_served_chat
@ -11,11 +13,10 @@ from driver.decorators import authorized_users_only, bot_creator, check_blacklis
from pyrogram.types import Message from pyrogram.types import Message
from pyrogram import Client, filters from pyrogram import Client, filters
from pyrogram.raw.types import InputPeerChannel from pyrogram.raw.types import InputPeerChannel
from pyrogram.raw.functions.phone import CreateGroupCall from pyrogram.raw.functions.phone import CreateGroupCall, DiscardGroupCall
from pyrogram.errors import UserAlreadyParticipant, UserNotParticipant, ChatAdminRequired from pyrogram.errors import UserAlreadyParticipant, UserNotParticipant, ChatAdminRequired
@Client.on_message( @Client.on_message(
command(["userbotjoin", f"userbotjoin@{BOT_USERNAME}"]) & other_filters command(["userbotjoin", f"userbotjoin@{BOT_USERNAME}"]) & other_filters
) )
@ -114,6 +115,27 @@ async def start_group_call(c: Client, m: Message):
) )
@Client.on_message(command(["stopvc", f"stopvc@{BOT_USERNAME}"]) & other_filters)
@check_blacklist()
@authorized_users_only
async def stop_group_call(c: Client, m: Message):
chat_id = m.chat.id
msg = await c.send_message(chat_id, "`stopping...`")
if not (
group_call := (
await get_calls(m, err_msg="group call not active")
)
):
await msg.edit_text("❌ The group call already ended")
return
await user.send(
DiscardGroupCall(
call=group_call
)
)
await msg.edit_text("✅ Group call has ended !")
@Client.on_message(filters.left_chat_member) @Client.on_message(filters.left_chat_member)
async def bot_kicked(c: Client, m: Message): async def bot_kicked(c: Client, m: Message):
bot_id = me_bot.id bot_id = me_bot.id

View File

@ -1,10 +0,0 @@
def bytes(size: float) -> str:
if not size:
return ""
power = 1024
t_n = 0
power_dict = {0: " ", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
t_n += 1
return "{:.2f} {}B".format(size, power_dict[t_n])

38
program/utils/function.py Normal file
View File

@ -0,0 +1,38 @@
from typing import Optional
from driver.core import user
from pyrogram.raw.functions.channels import GetFullChannel
from pyrogram.raw.functions.messages import GetFullChat
from pyrogram.types import Message
from pyrogram.raw.types import (
InputGroupCall,
InputPeerChannel,
InputPeerChat,
)
async def get_calls(m: Message, err_msg: str = "") -> Optional[InputGroupCall]:
chat_peer = await user.resolve_peer(m.chat.id)
if isinstance(chat_peer, (InputPeerChannel, InputPeerChat)):
if isinstance(chat_peer, InputPeerChannel):
full_chat = (await user.send(GetFullChannel(channel=chat_peer))).full_chat
elif isinstance(chat_peer, InputPeerChat):
full_chat = (
await user.send(GetFullChat(chat_id=chat_peer.chat_id))
).full_chat
if full_chat is not None:
return full_chat.call
await c.send_message(m.chat.id, f"❌ no group calls found\n\n» `{err_msg}`")
return False
def bytes(size: float) -> str:
if not size:
return ""
power = 1024
t_n = 0
power_dict = {0: " ", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
t_n += 1
return "{:.2f} {}B".format(size, power_dict[t_n])