sticker-captcha-bot/pyromod/listen/listen.py

165 lines
5.1 KiB
Python
Raw Normal View History

2022-07-02 11:44:57 +00:00
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
import asyncio
import functools
import pyrogram
from typing import Dict
from sticker.scheduler import add_delete_message_job
from ..utils import patch, patchable
from ..utils.errors import ListenerCanceled, TimeoutConversationError
pyrogram.errors.ListenerCanceled = ListenerCanceled
@patch(pyrogram.client.Client)
class Client:
@patchable
def __init__(self, *args, **kwargs):
self.listening = {}
self.using_mod = True
self.old__init__(*args, **kwargs)
@patchable
async def listen(self, chat_id, filters=None, timeout=None):
if type(chat_id) != int:
chat = await self.get_chat(chat_id)
chat_id = chat.id
future = self.loop.create_future()
future.add_done_callback(
functools.partial(self.clear_listener, chat_id)
)
self.listening.update({
chat_id: {"future": future, "filters": filters}
})
try:
return await asyncio.wait_for(future, timeout)
except asyncio.exceptions.TimeoutError as e:
raise TimeoutConversationError() from e
@patchable
async def ask(self, chat_id, text, filters=None, timeout=None, *args, **kwargs):
request = await self.send_message(chat_id, text, *args, **kwargs)
response = await self.listen(chat_id, filters, timeout)
response.request = request
return response
@patchable
def clear_listener(self, chat_id, future):
if future == self.listening[chat_id]["future"]:
self.listening.pop(chat_id, None)
@patchable
def cancel_listener(self, chat_id):
listener = self.listening.get(chat_id)
if not listener or listener['future'].done():
return
listener['future'].set_exception(ListenerCanceled())
self.clear_listener(chat_id, listener['future'])
@patchable
def cancel_all_listener(self):
for chat_id in self.listening:
self.cancel_listener(chat_id)
@patch(pyrogram.handlers.message_handler.MessageHandler)
class MessageHandler:
@patchable
def __init__(self, callback: callable, filters=None):
self.user_callback = callback
self.old__init__(self.resolve_listener, filters)
@patchable
async def resolve_listener(self, client, message, *args):
listener = client.listening.get(message.chat.id)
if listener and not listener['future'].done():
listener['future'].set_result(message)
else:
if listener and listener['future'].done():
client.clear_listener(message.chat.id, listener['future'])
await self.user_callback(client, message, *args)
@patchable
async def check(self, client, update):
listener = client.listening.get(update.chat.id)
if listener and not listener['future'].done():
return await listener['filters'](client, update) if callable(listener['filters']) else True
return (
await self.filters(client, update)
if callable(self.filters)
else True
)
@patch(pyrogram.types.user_and_chats.chat.Chat)
class Chat(pyrogram.types.Chat):
@patchable
def listen(self, *args, **kwargs):
return self._client.listen(self.id, *args, **kwargs)
@patchable
def ask(self, *args, **kwargs):
return self._client.ask(self.id, *args, **kwargs)
@patchable
def cancel_listener(self):
return self._client.cancel_listener(self.id)
@patch(pyrogram.types.user_and_chats.user.User)
class User(pyrogram.types.User):
@patchable
def listen(self, *args, **kwargs):
return self._client.listen(self.id, *args, **kwargs)
@patchable
def ask(self, *args, **kwargs):
return self._client.ask(self.id, *args, **kwargs)
@patchable
def cancel_listener(self):
return self._client.cancel_listener(self.id)
@patch(pyrogram.types.messages_and_media.Message)
class Message(pyrogram.types.Message):
@patchable
async def safe_delete(self, revoke: bool = True):
try:
return await self._client.delete_messages(
chat_id=self.chat.id,
message_ids=self.id,
revoke=revoke
)
except Exception as e: # noqa
return False
@patchable
async def delay_delete(self, delay: int = 60):
add_delete_message_job(self, delay)