This commit is contained in:
xtaodada 2022-07-02 19:44:57 +08:00
parent 61b590b087
commit 9e2d53f464
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
23 changed files with 818 additions and 0 deletions

6
.gitignore vendored
View File

@ -127,3 +127,9 @@ dmypy.json
# Pyre type checker
.pyre/
# data
data/
*.session*
config.yml
.idea/

16
config.gen.yml Normal file
View File

@ -0,0 +1,16 @@
# API Credentials of your telegram application created at https://my.telegram.org/apps
api_id: "ID_HERE"
api_hash: "HASH_HERE"
# bot token
bot_token: "TOKEN_HERE"
# Either debug logging is enabled or not
debug: "False"
# socks5
proxy_addr: ""
proxy_port: ""
# ipv6
ipv6: "False"

19
plugins/invite.py Normal file
View File

@ -0,0 +1,19 @@
import contextlib
from pyrogram.types import ChatMemberUpdated
from sticker.single_utils import Client
from sticker import bot
MSG_PUBLIC = """您好,我发现此群组为公开群组,您需要联系创建者打开 `管理员批准后才能入群` 功能,我才能正常工作。"""
MSG_SUCCESS = """验证成功,您已经成为群组的一员了!"""
MSG_FAILURE = """验证失败,请重试。"""
@bot.on_chat_member_updated()
async def invite(client: Client, chat_member_updated: ChatMemberUpdated):
chat = chat_member_updated.chat
if user := chat_member_updated.new_chat_member:
if user.user.is_self and chat.username:
with contextlib.suppress(Exception):
await client.send_message(chat.id, MSG_PUBLIC)

31
plugins/new_member.py Normal file
View File

@ -0,0 +1,31 @@
import contextlib
from pyrogram.types import ChatJoinRequest
from pyrogram import filters
from sticker.single_utils import Client
from sticker import bot
from pyromod.utils.errors import TimeoutConversationError
MSG = """您好,群组 %s 开启了验证功能。
您需要在 30 秒内发送任意一个 贴纸 来完成验证"""
MSG_SUCCESS = """验证成功,您已经成为群组的一员了!"""
MSG_FAILURE = """验证失败,请重试。"""
@bot.on_chat_join_request()
async def new_member(client: Client, chat_join_request: ChatJoinRequest):
chat = chat_join_request.chat
user = chat_join_request.from_user
try:
await client.ask(user.id, MSG % chat.title, filters=filters.sticker, timeout=30)
with contextlib.suppress(Exception):
await client.send_message(user.id, MSG_SUCCESS)
await chat_join_request.approve()
except TimeoutConversationError:
with contextlib.suppress(Exception):
await client.send_message(user.id, MSG_FAILURE)
await chat_join_request.decline()

24
plugins/start.py Normal file
View File

@ -0,0 +1,24 @@
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from sticker.single_utils import Message, Client
from sticker import bot
@bot.on_message(filters=filters.private & filters.command("start"))
async def start(client: Client, message: Message):
me = await client.get_me()
await message.reply(
f"""你好,我是 <b>{me.first_name}</b>
我可以主动私聊申请加入群组的新成员进行贴纸验证""",
quote=True,
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton(
"Github",
url="https://github.com/Xtao-Labs/sticker-captcha-bot")],
[InlineKeyboardButton(
"邀请入群",
url=f"https://t.me/{me.username}?startgroup=start&admin=can_invite_users")]
]))

21
pyromod/__init__.py Normal file
View File

@ -0,0 +1,21 @@
"""
pyromod - A monkeypatched add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
__version__ = "1.5"

View File

@ -0,0 +1,21 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
from .filters import dice

View File

@ -0,0 +1,28 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
import pyrogram
def dice(ctx, message):
return hasattr(message, 'dice') and message.dice
pyrogram.filters.dice = dice

View File

@ -0,0 +1,20 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
from .helpers import ikb, bki, ntb, btn, kb, kbtn, array_chunk, force_reply

View File

@ -0,0 +1,75 @@
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ReplyKeyboardMarkup, ForceReply
def ikb(rows=None):
if rows is None:
rows = []
lines = []
for row in rows:
line = []
for button in row:
button = btn(*button) # InlineKeyboardButton
line.append(button)
lines.append(line)
return InlineKeyboardMarkup(inline_keyboard=lines)
# return {'inline_keyboard': lines}
def btn(text, value, type='callback_data'):
return InlineKeyboardButton(text, **{type: value})
# return {'text': text, type: value}
# The inverse of above
def bki(keyboard):
lines = []
for row in keyboard.inline_keyboard:
line = []
for button in row:
button = ntb(button) # btn() format
line.append(button)
lines.append(line)
return lines
# return ikb() format
def ntb(button):
for btn_type in ['callback_data', 'url', 'switch_inline_query', 'switch_inline_query_current_chat',
'callback_game']:
value = getattr(button, btn_type)
if value:
break
button = [button.text, value]
if btn_type != 'callback_data':
button.append(btn_type)
return button
# return {'text': text, type: value}
def kb(rows=None, **kwargs):
if rows is None:
rows = []
lines = []
for row in rows:
line = []
for button in row:
button_type = type(button)
if button_type == str:
button = KeyboardButton(button)
elif button_type == dict:
button = KeyboardButton(**button)
line.append(button)
lines.append(line)
return ReplyKeyboardMarkup(keyboard=lines, **kwargs)
kbtn = KeyboardButton
def force_reply(selective=True):
return ForceReply(selective=selective)
def array_chunk(input_, size):
return [input_[i:i + size] for i in range(0, len(input_), size)]

View File

@ -0,0 +1,21 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
from .listen import Client, MessageHandler, Chat, User

164
pyromod/listen/listen.py Normal file
View File

@ -0,0 +1,164 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
import asyncio
import functools
import pyrogram
from typing import Dict
from sticker.scheduler import add_delete_message_job
from ..utils import patch, patchable
from ..utils.errors import ListenerCanceled, TimeoutConversationError
pyrogram.errors.ListenerCanceled = ListenerCanceled
@patch(pyrogram.client.Client)
class Client:
@patchable
def __init__(self, *args, **kwargs):
self.listening = {}
self.using_mod = True
self.old__init__(*args, **kwargs)
@patchable
async def listen(self, chat_id, filters=None, timeout=None):
if type(chat_id) != int:
chat = await self.get_chat(chat_id)
chat_id = chat.id
future = self.loop.create_future()
future.add_done_callback(
functools.partial(self.clear_listener, chat_id)
)
self.listening.update({
chat_id: {"future": future, "filters": filters}
})
try:
return await asyncio.wait_for(future, timeout)
except asyncio.exceptions.TimeoutError as e:
raise TimeoutConversationError() from e
@patchable
async def ask(self, chat_id, text, filters=None, timeout=None, *args, **kwargs):
request = await self.send_message(chat_id, text, *args, **kwargs)
response = await self.listen(chat_id, filters, timeout)
response.request = request
return response
@patchable
def clear_listener(self, chat_id, future):
if future == self.listening[chat_id]["future"]:
self.listening.pop(chat_id, None)
@patchable
def cancel_listener(self, chat_id):
listener = self.listening.get(chat_id)
if not listener or listener['future'].done():
return
listener['future'].set_exception(ListenerCanceled())
self.clear_listener(chat_id, listener['future'])
@patchable
def cancel_all_listener(self):
for chat_id in self.listening:
self.cancel_listener(chat_id)
@patch(pyrogram.handlers.message_handler.MessageHandler)
class MessageHandler:
@patchable
def __init__(self, callback: callable, filters=None):
self.user_callback = callback
self.old__init__(self.resolve_listener, filters)
@patchable
async def resolve_listener(self, client, message, *args):
listener = client.listening.get(message.chat.id)
if listener and not listener['future'].done():
listener['future'].set_result(message)
else:
if listener and listener['future'].done():
client.clear_listener(message.chat.id, listener['future'])
await self.user_callback(client, message, *args)
@patchable
async def check(self, client, update):
listener = client.listening.get(update.chat.id)
if listener and not listener['future'].done():
return await listener['filters'](client, update) if callable(listener['filters']) else True
return (
await self.filters(client, update)
if callable(self.filters)
else True
)
@patch(pyrogram.types.user_and_chats.chat.Chat)
class Chat(pyrogram.types.Chat):
@patchable
def listen(self, *args, **kwargs):
return self._client.listen(self.id, *args, **kwargs)
@patchable
def ask(self, *args, **kwargs):
return self._client.ask(self.id, *args, **kwargs)
@patchable
def cancel_listener(self):
return self._client.cancel_listener(self.id)
@patch(pyrogram.types.user_and_chats.user.User)
class User(pyrogram.types.User):
@patchable
def listen(self, *args, **kwargs):
return self._client.listen(self.id, *args, **kwargs)
@patchable
def ask(self, *args, **kwargs):
return self._client.ask(self.id, *args, **kwargs)
@patchable
def cancel_listener(self):
return self._client.cancel_listener(self.id)
@patch(pyrogram.types.messages_and_media.Message)
class Message(pyrogram.types.Message):
@patchable
async def safe_delete(self, revoke: bool = True):
try:
return await self._client.delete_messages(
chat_id=self.chat.id,
message_ids=self.id,
revoke=revoke
)
except Exception as e: # noqa
return False
@patchable
async def delay_delete(self, delay: int = 60):
add_delete_message_job(self, delay)

20
pyromod/nav/__init__.py Normal file
View File

@ -0,0 +1,20 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
from .pagination import Pagination

91
pyromod/nav/pagination.py Normal file
View File

@ -0,0 +1,91 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
import math
from ..helpers import array_chunk
class Pagination:
def __init__(self, objects, page_data=None, item_data=None, item_title=None):
default_page_callback = (lambda x: str(x))
default_item_callback = (lambda i, pg: f'[{pg}] {i}')
self.objects = objects
self.page_data = page_data or default_page_callback
self.item_data = item_data or default_item_callback
self.item_title = item_title or default_item_callback
def create(self, page, lines=5, columns=1):
quant_per_page = lines * columns
page = 1 if page <= 0 else page
offset = (page - 1) * quant_per_page
stop = offset + quant_per_page
cutted = self.objects[offset:stop]
total = len(self.objects)
pages_range = [*range(1, math.ceil(total / quant_per_page) + 1)] # each item is a page
last_page = len(pages_range)
nav = []
if page <= 3:
for n in [1, 2, 3]:
if n not in pages_range:
continue
text = f"· {n} ·" if n == page else n
nav.append((text, self.page_data(n)))
if last_page >= 4:
nav.append(
('4 ' if last_page > 5 else 4, self.page_data(4))
)
if last_page > 4:
nav.append(
(f'{last_page} »' if last_page > 5 else last_page, self.page_data(last_page))
)
elif page >= last_page - 2:
nav.extend(
[
('« 1' if last_page > 5 else 1, self.page_data(1)),
(
f' {last_page - 3}' if last_page > 5 else last_page - 3,
self.page_data(last_page - 3),
),
]
)
for n in range(last_page - 2, last_page + 1):
text = f"· {n} ·" if n == page else n
nav.append((text, self.page_data(n)))
else:
nav = [
('« 1', self.page_data(1)),
(f' {page - 1}', self.page_data(page - 1)),
(f'· {page} ·', "noop"),
(f'{page + 1} ', self.page_data(page + 1)),
(f'{last_page} »', self.page_data(last_page)),
]
buttons = [
(self.item_title(item, page), self.item_data(item, page))
for item in cutted
]
kb_lines = array_chunk(buttons, columns)
if last_page > 1:
kb_lines.append(nav)
return kb_lines

21
pyromod/utils/__init__.py Normal file
View File

@ -0,0 +1,21 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
from .utils import patch, patchable

19
pyromod/utils/errors.py Normal file
View File

@ -0,0 +1,19 @@
class TimeoutConversationError(Exception):
"""
Occurs when the conversation times out.
"""
def __init__(self):
super().__init__(
"Response read timed out"
)
class ListenerCanceled(Exception):
"""
Occurs when the listener is canceled.
"""
def __init__(self):
super().__init__(
"Listener was canceled"
)

38
pyromod/utils/utils.py Normal file
View File

@ -0,0 +1,38 @@
"""
pyromod - A monkeypatcher add-on for Pyrogram
Copyright (C) 2020 Cezar H. <https://github.com/usernein>
This file is part of pyromod.
pyromod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
pyromod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with pyromod. If not, see <https://www.gnu.org/licenses/>.
"""
def patch(obj):
def is_patchable(item):
return getattr(item[1], 'patchable', False)
def wrapper(container):
for name, func in filter(is_patchable, container.__dict__.items()):
old = getattr(obj, name, None)
setattr(obj, f'old{name}', old)
setattr(obj, name, func)
return container
return wrapper
def patchable(func):
func.patchable = True
return func

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
pyrogram==2.0.30
TgCrypto>=1.2.3
PyYAML>=6.0
coloredlogs>=15.0.1
sqlitedict==2.0.0
apscheduler==3.9.1
pytz

48
sticker/__init__.py Normal file
View File

@ -0,0 +1,48 @@
import contextlib
from coloredlogs import ColoredFormatter
from datetime import datetime, timezone
from logging import getLogger, StreamHandler, CRITICAL, INFO, basicConfig, DEBUG
from sticker.config import Config
from sticker.scheduler import scheduler
import pyromod.listen
from pyrogram import Client
import sys
logs = getLogger(__name__)
logging_format = "%(levelname)s [%(asctime)s] [%(name)s] %(message)s"
logging_handler = StreamHandler()
logging_handler.setFormatter(ColoredFormatter(logging_format))
root_logger = getLogger()
root_logger.setLevel(DEBUG if Config.DEBUG else CRITICAL)
root_logger.addHandler(logging_handler)
pyro_logger = getLogger("pyrogram")
pyro_logger.setLevel(DEBUG)
pyro_logger.addHandler(logging_handler)
basicConfig(level=DEBUG if Config.DEBUG else INFO)
logs.setLevel(DEBUG if Config.DEBUG else INFO)
# easy check
if not Config.API_ID:
logs.error("Api-ID Not Found!")
sys.exit(1)
elif not Config.API_HASH:
logs.error("Api-Hash Not Found!")
sys.exit(1)
start_time = datetime.now(timezone.utc)
with contextlib.suppress(ImportError):
import uvloop # noqa
uvloop.install()
if not scheduler.running:
scheduler.start()
bot = Client("sticker",
bot_token=Config.BOT_TOKEN,
session_string=Config.STRING_SESSION,
api_id=Config.API_ID,
api_hash=Config.API_HASH,
ipv6=Config.IPV6,
proxy=Config.PROXY,
plugins={"root": "plugins"})

12
sticker/__main__.py Normal file
View File

@ -0,0 +1,12 @@
from pyrogram import idle
from sticker import bot, logs
async def main():
await bot.start()
logs.info("bot started.")
await idle()
await bot.stop()
bot.run(main())

50
sticker/config.py Normal file
View File

@ -0,0 +1,50 @@
import os
from json import load as load_json
import sys
from yaml import load, FullLoader, safe_load
from shutil import copyfile
def strtobool(val):
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return 1
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return 0
else:
raise ValueError("invalid truth value %r" % (val,))
try:
config = load(open(r"config.yml"), Loader=FullLoader)
except FileNotFoundError:
print("The configuration file does not exist, and a new configuration file is being generated.")
copyfile(f"{os.getcwd()}{os.sep}config.gen.yml", "config.yml")
sys.exit(1)
class Config:
try:
API_ID = int(os.environ.get("API_ID", config["api_id"]))
API_HASH = os.environ.get("API_HASH", config["api_hash"])
BOT_TOKEN = os.environ.get("BOT_TOKEN", config["bot_token"])
STRING_SESSION = os.environ.get("STRING_SESSION")
DEBUG = strtobool(os.environ.get("PGM_DEBUG", config["debug"]))
IPV6 = strtobool(os.environ.get("PGM_IPV6", config["ipv6"]))
PROXY_ADDRESS = os.environ.get("PGM_PROXY_ADDRESS", config["proxy_addr"])
PROXY_PORT = os.environ.get("PGM_PROXY_PORT", config["proxy_port"])
PROXY = None
if PROXY_ADDRESS and PROXY_PORT:
PROXY = dict(
hostname=PROXY_ADDRESS,
port=PROXY_PORT,
)
except ValueError as e:
print(e)
sys.exit(1)

25
sticker/scheduler.py Normal file
View File

@ -0,0 +1,25 @@
import contextlib
import datetime
import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sticker.single_utils import Message
scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
async def delete_message(message: Message) -> bool:
with contextlib.suppress(Exception):
await message.delete()
return True
return False
def add_delete_message_job(message: Message, delete_seconds: int = 60):
scheduler.add_job(
delete_message, "date",
id=f"{message.chat.id}|{message.id}|delete_message",
name=f"{message.chat.id}|{message.id}|delete_message",
args=[message],
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai")) + datetime.timedelta(seconds=delete_seconds),
replace_existing=True)

41
sticker/single_utils.py Normal file
View File

@ -0,0 +1,41 @@
import contextlib
from os import sep, remove, mkdir
from os.path import exists
from typing import Optional
from pyrogram import Client
from pyrogram.types import Message
from pyromod.utils.errors import TimeoutConversationError, ListenerCanceled
from sqlitedict import SqliteDict
# init folders
if not exists("data"):
mkdir("data")
sqlite = SqliteDict(f"data{sep}data.sqlite", autocommit=True)
def safe_remove(name: str) -> None:
with contextlib.suppress(FileNotFoundError):
remove(name)
class Client(Client): # noqa
async def listen(self, chat_id, filters=None, timeout=None) -> Optional[Message]:
return
async def ask(self, chat_id, text, filters=None, timeout=None, *args, **kwargs) -> Optional[Message]:
return
def cancel_listener(self, chat_id):
""" Cancel the conversation with the given chat_id. """
return
class Message(Message): # noqa
async def delay_delete(self, delete_seconds: int = 60) -> Optional[bool]:
return
async def safe_delete(self, revoke: bool = True) -> None:
return