# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-present Dan # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . import inspect import re from typing import Callable, Union, List, Pattern import pyrogram from pyrogram.types import Message, CallbackQuery, InlineQuery, InlineKeyboardMarkup, ReplyKeyboardMarkup, Update class Filter: async def __call__(self, client: "pyrogram.Client", update: Update): raise NotImplementedError def __invert__(self): return InvertFilter(self) def __and__(self, other): return AndFilter(self, other) def __or__(self, other): return OrFilter(self, other) class InvertFilter(Filter): def __init__(self, base): self.base = base async def __call__(self, client: "pyrogram.Client", update: Update): if inspect.iscoroutinefunction(self.base.__call__): x = await self.base(client, update) else: x = await client.loop.run_in_executor( client.executor, self.base, client, update ) return not x class AndFilter(Filter): def __init__(self, base, other): self.base = base self.other = other async def __call__(self, client: "pyrogram.Client", update: Update): if inspect.iscoroutinefunction(self.base.__call__): x = await self.base(client, update) else: x = await client.loop.run_in_executor( client.executor, self.base, client, update ) # short circuit if not x: return False if inspect.iscoroutinefunction(self.other.__call__): y = await self.other(client, update) else: y = await client.loop.run_in_executor( client.executor, self.other, client, update ) return x and y class OrFilter(Filter): def __init__(self, base, other): self.base = base self.other = other async def __call__(self, client: "pyrogram.Client", update: Update): if inspect.iscoroutinefunction(self.base.__call__): x = await self.base(client, update) else: x = await client.loop.run_in_executor( client.executor, self.base, client, update ) # short circuit if x: return True if inspect.iscoroutinefunction(self.other.__call__): y = await self.other(client, update) else: y = await client.loop.run_in_executor( client.executor, self.other, client, update ) return x or y CUSTOM_FILTER_NAME = "CustomFilter" def create(func: Callable, name: str = None, **kwargs) -> Filter: """Easily create a custom filter. Custom filters give you extra control over which updates are allowed or not to be processed by your handlers. Parameters: func (``callable``): A function that accepts three positional arguments *(filter, client, update)* and returns a boolean: True if the update should be handled, False otherwise. The *filter* argument refers to the filter itself and can be used to access keyword arguments (read below). The *client* argument refers to the :obj:`~pyrogram.Client` that received the update. The *update* argument type will vary depending on which `Handler `_ is coming from. For example, in a :obj:`~pyrogram.handlers.MessageHandler` the *update* argument will be a :obj:`~pyrogram.types.Message`; in a :obj:`~pyrogram.handlers.CallbackQueryHandler` the *update* will be a :obj:`~pyrogram.types.CallbackQuery`. Your function body can then access the incoming update attributes and decide whether to allow it or not. name (``str``, *optional*): Your filter's name. Can be anything you like. Defaults to "CustomFilter". **kwargs (``any``, *optional*): Any keyword argument you would like to pass. Useful when creating parameterized custom filters, such as :meth:`~pyrogram.filters.command` or :meth:`~pyrogram.filters.regex`. """ return type( name or func.__name__ or CUSTOM_FILTER_NAME, (Filter,), {"__call__": func, **kwargs} )() # region all_filter async def all_filter(_, __, ___): return True all = create(all_filter) """Filter all messages.""" # endregion # region me_filter async def me_filter(_, __, m: Message): return bool(m.from_user and m.from_user.is_self or m.outgoing) me = create(me_filter) """Filter messages generated by you yourself.""" # endregion # region bot_filter async def bot_filter(_, __, m: Message): return bool(m.from_user and m.from_user.is_bot) bot = create(bot_filter) """Filter messages coming from bots.""" # endregion # region incoming_filter async def incoming_filter(_, __, m: Message): return not m.outgoing incoming = create(incoming_filter) """Filter incoming messages. Messages sent to your own chat (Saved Messages) are also recognised as incoming.""" # endregion # region outgoing_filter async def outgoing_filter(_, __, m: Message): return m.outgoing outgoing = create(outgoing_filter) """Filter outgoing messages. Messages sent to your own chat (Saved Messages) are not recognized as outgoing.""" # endregion # region text_filter async def text_filter(_, __, m: Message): return bool(m.text) text = create(text_filter) """Filter text messages.""" # endregion # region reply_filter async def reply_filter(_, __, m: Message): return bool(m.reply_to_message) reply = create(reply_filter) """Filter messages that are replies to other messages.""" # endregion # region forwarded_filter async def forwarded_filter(_, __, m: Message): return bool(m.forward_date) forwarded = create(forwarded_filter) """Filter messages that are forwarded.""" # endregion # region caption_filter async def caption_filter(_, __, m: Message): return bool(m.caption) caption = create(caption_filter) """Filter media messages that contain captions.""" # endregion # region edited_filter async def edited_filter(_, __, m: Message): return bool(m.edit_date) edited = create(edited_filter) """Filter edited messages.""" # endregion # region audio_filter async def audio_filter(_, __, m: Message): return bool(m.audio) audio = create(audio_filter) """Filter messages that contain :obj:`~pyrogram.types.Audio` objects.""" # endregion # region document_filter async def document_filter(_, __, m: Message): return bool(m.document) document = create(document_filter) """Filter messages that contain :obj:`~pyrogram.types.Document` objects.""" # endregion # region photo_filter async def photo_filter(_, __, m: Message): return bool(m.photo) photo = create(photo_filter) """Filter messages that contain :obj:`~pyrogram.types.Photo` objects.""" # endregion # region sticker_filter async def sticker_filter(_, __, m: Message): return bool(m.sticker) sticker = create(sticker_filter) """Filter messages that contain :obj:`~pyrogram.types.Sticker` objects.""" # endregion # region animation_filter async def animation_filter(_, __, m: Message): return bool(m.animation) animation = create(animation_filter) """Filter messages that contain :obj:`~pyrogram.types.Animation` objects.""" # endregion # region game_filter async def game_filter(_, __, m: Message): return bool(m.game) game = create(game_filter) """Filter messages that contain :obj:`~pyrogram.types.Game` objects.""" # endregion # region video_filter async def video_filter(_, __, m: Message): return bool(m.video) video = create(video_filter) """Filter messages that contain :obj:`~pyrogram.types.Video` objects.""" # endregion # region media_group_filter async def media_group_filter(_, __, m: Message): return bool(m.media_group_id) media_group = create(media_group_filter) """Filter messages containing photos or videos being part of an album.""" # endregion # region voice_filter async def voice_filter(_, __, m: Message): return bool(m.voice) voice = create(voice_filter) """Filter messages that contain :obj:`~pyrogram.types.Voice` note objects.""" # endregion # region video_note_filter async def video_note_filter(_, __, m: Message): return bool(m.video_note) video_note = create(video_note_filter) """Filter messages that contain :obj:`~pyrogram.types.VideoNote` objects.""" # endregion # region contact_filter async def contact_filter(_, __, m: Message): return bool(m.contact) contact = create(contact_filter) """Filter messages that contain :obj:`~pyrogram.types.Contact` objects.""" # endregion # region location_filter async def location_filter(_, __, m: Message): return bool(m.location) location = create(location_filter) """Filter messages that contain :obj:`~pyrogram.types.Location` objects.""" # endregion # region venue_filter async def venue_filter(_, __, m: Message): return bool(m.venue) venue = create(venue_filter) """Filter messages that contain :obj:`~pyrogram.types.Venue` objects.""" # endregion # region web_page_filter async def web_page_filter(_, __, m: Message): return bool(m.web_page) web_page = create(web_page_filter) """Filter messages sent with a webpage preview.""" # endregion # region poll_filter async def poll_filter(_, __, m: Message): return bool(m.poll) poll = create(poll_filter) """Filter messages that contain :obj:`~pyrogram.types.Poll` objects.""" # endregion # region dice_filter async def dice_filter(_, __, m: Message): return bool(m.dice) dice = create(dice_filter) """Filter messages that contain :obj:`~pyrogram.types.Dice` objects.""" # endregion # region private_filter async def private_filter(_, __, m: Message): return bool(m.chat and m.chat.type in {"private", "bot"}) private = create(private_filter) """Filter messages sent in private chats.""" # endregion # region group_filter async def group_filter(_, __, m: Message): return bool(m.chat and m.chat.type in {"group", "supergroup"}) group = create(group_filter) """Filter messages sent in group or supergroup chats.""" # endregion # region channel_filter async def channel_filter(_, __, m: Message): return bool(m.chat and m.chat.type == "channel") channel = create(channel_filter) """Filter messages sent in channels.""" # endregion # region new_chat_members_filter async def new_chat_members_filter(_, __, m: Message): return bool(m.new_chat_members) new_chat_members = create(new_chat_members_filter) """Filter service messages for new chat members.""" # endregion # region left_chat_member_filter async def left_chat_member_filter(_, __, m: Message): return bool(m.left_chat_member) left_chat_member = create(left_chat_member_filter) """Filter service messages for members that left the chat.""" # endregion # region new_chat_title_filter async def new_chat_title_filter(_, __, m: Message): return bool(m.new_chat_title) new_chat_title = create(new_chat_title_filter) """Filter service messages for new chat titles.""" # endregion # region new_chat_photo_filter async def new_chat_photo_filter(_, __, m: Message): return bool(m.new_chat_photo) new_chat_photo = create(new_chat_photo_filter) """Filter service messages for new chat photos.""" # endregion # region delete_chat_photo_filter async def delete_chat_photo_filter(_, __, m: Message): return bool(m.delete_chat_photo) delete_chat_photo = create(delete_chat_photo_filter) """Filter service messages for deleted photos.""" # endregion # region group_chat_created_filter async def group_chat_created_filter(_, __, m: Message): return bool(m.group_chat_created) group_chat_created = create(group_chat_created_filter) """Filter service messages for group chat creations.""" # endregion # region supergroup_chat_created_filter async def supergroup_chat_created_filter(_, __, m: Message): return bool(m.supergroup_chat_created) supergroup_chat_created = create(supergroup_chat_created_filter) """Filter service messages for supergroup chat creations.""" # endregion # region channel_chat_created_filter async def channel_chat_created_filter(_, __, m: Message): return bool(m.channel_chat_created) channel_chat_created = create(channel_chat_created_filter) """Filter service messages for channel chat creations.""" # endregion # region migrate_to_chat_id_filter async def migrate_to_chat_id_filter(_, __, m: Message): return bool(m.migrate_to_chat_id) migrate_to_chat_id = create(migrate_to_chat_id_filter) """Filter service messages that contain migrate_to_chat_id.""" # endregion # region migrate_from_chat_id_filter async def migrate_from_chat_id_filter(_, __, m: Message): return bool(m.migrate_from_chat_id) migrate_from_chat_id = create(migrate_from_chat_id_filter) """Filter service messages that contain migrate_from_chat_id.""" # endregion # region pinned_message_filter async def pinned_message_filter(_, __, m: Message): return bool(m.pinned_message) pinned_message = create(pinned_message_filter) """Filter service messages for pinned messages.""" # endregion # region game_high_score_filter async def game_high_score_filter(_, __, m: Message): return bool(m.game_high_score) game_high_score = create(game_high_score_filter) """Filter service messages for game high scores.""" # endregion # region reply_keyboard_filter async def reply_keyboard_filter(_, __, m: Message): return isinstance(m.reply_markup, ReplyKeyboardMarkup) reply_keyboard = create(reply_keyboard_filter) """Filter messages containing reply keyboard markups""" # endregion # region inline_keyboard_filter async def inline_keyboard_filter(_, __, m: Message): return isinstance(m.reply_markup, InlineKeyboardMarkup) inline_keyboard = create(inline_keyboard_filter) """Filter messages containing inline keyboard markups""" # endregion # region mentioned_filter async def mentioned_filter(_, __, m: Message): return bool(m.mentioned) mentioned = create(mentioned_filter) """Filter messages containing mentions""" # endregion # region via_bot_filter async def via_bot_filter(_, __, m: Message): return bool(m.via_bot) via_bot = create(via_bot_filter) """Filter messages sent via inline bots""" # endregion # region voice_chat_started_filter async def voice_chat_started_filter(_, __, m: Message): return bool(m.voice_chat_started) voice_chat_started = create(voice_chat_started_filter) """Filter messages for started voice chats""" # endregion # region voice_chat_ended_filter async def voice_chat_ended_filter(_, __, m: Message): return bool(m.voice_chat_ended) voice_chat_ended = create(voice_chat_ended_filter) """Filter messages for ended voice chats""" # endregion # region voice_chat_members_invited_filter async def voice_chat_members_invited_filter(_, __, m: Message): return bool(m.voice_chat_members_invited) voice_chat_members_invited = create(voice_chat_members_invited_filter) """Filter messages for voice chat invited members""" # endregion # region service_filter async def service_filter(_, __, m: Message): return bool(m.service) service = create(service_filter) """Filter service messages. A service message contains any of the following fields set: *left_chat_member*, *new_chat_title*, *new_chat_photo*, *delete_chat_photo*, *group_chat_created*, *supergroup_chat_created*, *channel_chat_created*, *migrate_to_chat_id*, *migrate_from_chat_id*, *pinned_message*, *game_score*, *voice_chat_started*, *voice_chat_ended*, *voice_chat_members_invited*. """ # endregion # region media_filter async def media_filter(_, __, m: Message): return bool(m.media) media = create(media_filter) """Filter media messages. A media message contains any of the following fields set: *audio*, *document*, *photo*, *sticker*, *video*, *animation*, *voice*, *video_note*, *contact*, *location*, *venue*, *poll*. """ # endregion # region scheduled_filter async def scheduled_filter(_, __, m: Message): return bool(m.scheduled) scheduled = create(scheduled_filter) """Filter messages that have been scheduled (not yet sent).""" # endregion # region from_scheduled_filter async def from_scheduled_filter(_, __, m: Message): return bool(m.from_scheduled) from_scheduled = create(from_scheduled_filter) """Filter new automatically sent messages that were previously scheduled.""" # endregion # region linked_channel_filter async def linked_channel_filter(_, __, m: Message): return bool(m.forward_from_chat and not m.from_user) linked_channel = create(linked_channel_filter) """Filter messages that are automatically forwarded from the linked channel to the group chat.""" # endregion # region command_filter # Used by the command filter below username = None def command(commands: Union[str, List[str]], prefixes: Union[str, List[str]] = "/", case_sensitive: bool = False): """Filter commands, i.e.: text messages starting with "/" or any other custom prefix. Parameters: commands (``str`` | ``list``): The command or list of commands as string the filter should look for. Examples: "start", ["start", "help", "settings"]. When a message text containing a command arrives, the command itself and its arguments will be stored in the *command* field of the :obj:`~pyrogram.types.Message`. prefixes (``str`` | ``list``, *optional*): A prefix or a list of prefixes as string the filter should look for. Defaults to "/" (slash). Examples: ".", "!", ["/", "!", "."], list(".:!"). Pass None or "" (empty string) to allow commands with no prefix at all. case_sensitive (``bool``, *optional*): Pass True if you want your command(s) to be case sensitive. Defaults to False. Examples: when True, command="Start" would trigger /Start but not /start. """ command_re = re.compile(r"([\"'])(.*?)(?`_ are stored in the ``matches`` field of the update object itself. Parameters: pattern (``str`` | ``Pattern``): The regex pattern as string or as pre-compiled pattern. flags (``int``, *optional*): Regex flags. """ async def func(flt, _, update: Update): if isinstance(update, Message): value = update.text or update.caption elif isinstance(update, CallbackQuery): value = update.data elif isinstance(update, InlineQuery): value = update.query else: raise ValueError(f"Regex filter doesn't work with {type(update)}") if value: update.matches = list(flt.p.finditer(value)) or None return bool(update.matches) return create( func, "RegexFilter", p=pattern if isinstance(pattern, Pattern) else re.compile(pattern, flags) ) # noinspection PyPep8Naming class user(Filter, set): """Filter messages coming from one or more users. You can use `set bound methods `_ to manipulate the users container. Parameters: users (``int`` | ``str`` | ``list``): Pass one or more user ids/usernames to filter users. For you yourself, "me" or "self" can be used as well. Defaults to None (no users). """ def __init__(self, users: Union[int, str, List[Union[int, str]]] = None): users = [] if users is None else users if isinstance(users, list) else [users] super().__init__( "me" if u in ["me", "self"] else u.lower().strip("@") if isinstance(u, str) else u for u in users ) async def __call__(self, _, message: Message): return (message.from_user and (message.from_user.id in self or (message.from_user.username and message.from_user.username.lower() in self) or ("me" in self and message.from_user.is_self))) # noinspection PyPep8Naming class chat(Filter, set): """Filter messages coming from one or more chats. You can use `set bound methods `_ to manipulate the chats container. Parameters: chats (``int`` | ``str`` | ``list``): Pass one or more chat ids/usernames to filter chats. For your personal cloud (Saved Messages) you can simply use "me" or "self". Defaults to None (no chats). """ def __init__(self, chats: Union[int, str, List[Union[int, str]]] = None): chats = [] if chats is None else chats if isinstance(chats, list) else [chats] super().__init__( "me" if c in ["me", "self"] else c.lower().strip("@") if isinstance(c, str) else c for c in chats ) async def __call__(self, _, message: Message): return (message.chat and (message.chat.id in self or (message.chat.username and message.chat.username.lower() in self) or ("me" in self and message.from_user and message.from_user.is_self and not message.outgoing)))