Add Message.copy bound method

This commit is contained in:
Dan 2020-12-05 01:54:07 +01:00
parent c606f836d4
commit 2f3bcd7ee5
3 changed files with 194 additions and 102 deletions

View File

@ -432,6 +432,7 @@ def pyrogram_api():
Message.delete
Message.download
Message.forward
Message.copy
Message.pin
Message.edit_text
Message.edit_caption

View File

@ -17,8 +17,7 @@
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import logging
from functools import partial
from typing import Union, Iterable, List
from typing import Union, List
from pyrogram import types
from pyrogram.scaffold import Scaffold
@ -31,7 +30,7 @@ class CopyMessage(Scaffold):
self,
chat_id: Union[int, str],
from_chat_id: Union[int, str],
message_id: Union[int, Iterable[int]],
message_id: int,
caption: str = None,
parse_mode: Union[str, None] = object,
caption_entities: List["types.MessageEntity"] = None,
@ -105,102 +104,13 @@ class CopyMessage(Scaffold):
"""
message: types.Message = await self.get_messages(from_chat_id, message_id)
if message.service:
log.warning(f"Service messages cannot be copied. "
f"chat_id: {message.chat.id}, message_id: {message.message_id}")
elif message.game and not await self.storage.is_bot():
log.warning(f"Users cannot send messages with Game media type. "
f"chat_id: {message.chat.id}, message_id: {message.message_id}")
elif message.text:
return await self.send_message(
chat_id,
text=message.text,
entities=message.entities,
disable_web_page_preview=not message.web_page,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif message.media:
send_media = partial(
self.send_cached_media,
return await message.copy(
chat_id=chat_id,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities,
disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id,
schedule_date=schedule_date,
reply_markup=reply_markup
)
if message.photo:
file_id = message.photo.file_id
elif message.audio:
file_id = message.audio.file_id
elif message.document:
file_id = message.document.file_id
elif message.video:
file_id = message.video.file_id
elif message.animation:
file_id = message.animation.file_id
elif message.voice:
file_id = message.voice.file_id
elif message.sticker:
file_id = message.sticker.file_id
elif message.video_note:
file_id = message.video_note.file_id
elif message.contact:
return await self.send_contact(
chat_id,
phone_number=message.contact.phone_number,
first_name=message.contact.first_name,
last_name=message.contact.last_name,
vcard=message.contact.vcard,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif message.location:
return await self.send_location(
chat_id,
latitude=message.location.latitude,
longitude=message.location.longitude,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif message.venue:
return await self.send_venue(
chat_id,
latitude=message.venue.location.latitude,
longitude=message.venue.location.longitude,
title=message.venue.title,
address=message.venue.address,
foursquare_id=message.venue.foursquare_id,
foursquare_type=message.venue.foursquare_type,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif message.poll:
return await self.send_poll(
chat_id,
question=message.poll.question,
options=[opt.text for opt in message.poll.options],
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif message.game:
return await self.send_game(
chat_id,
game_short_name=message.game.short_name,
disable_notification=disable_notification
)
else:
raise ValueError("Unknown media type")
if message.sticker or message.video_note: # Sticker and VideoNote should have no caption
return await send_media(file_id=file_id)
else:
return await send_media(
file_id=file_id,
caption=caption if caption is not None else message.caption,
parse_mode=parse_mode,
caption_entities=caption_entities or message.caption_entities
)
else:
raise ValueError("Can't copy this message")

View File

@ -17,6 +17,7 @@
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import logging
from functools import partial
from typing import List, Match, Union, BinaryIO
import pyrogram
@ -2751,6 +2752,186 @@ class Message(Object, Update):
schedule_date=schedule_date
)
async def copy(
self,
chat_id: Union[int, str],
caption: str = None,
parse_mode: Union[str, None] = object,
caption_entities: List["types.MessageEntity"] = None,
disable_notification: bool = None,
reply_to_message_id: int = None,
schedule_date: int = None,
reply_markup: Union[
"types.InlineKeyboardMarkup",
"types.ReplyKeyboardMarkup",
"types.ReplyKeyboardRemove",
"types.ForceReply"
] = None
) -> Union["types.Message", List["types.Message"]]:
"""Bound method *copy* of :obj:`~pyrogram.types.Message`.
Use as a shortcut for:
.. code-block:: python
client.copy_message(
chat_id=chat_id,
from_chat_id=message.chat.id,
message_ids=message.message_id
)
Example:
.. code-block:: python
message.copy(chat_id)
Parameters:
chat_id (``int`` | ``str``):
Unique identifier (int) or username (str) of the target chat.
For your personal cloud (Saved Messages) you can simply use "me" or "self".
For a contact that exists in your Telegram address book you can use his phone number (str).
caption (``string``, *optional*):
New caption for media, 0-1024 characters after entities parsing.
If not specified, the original caption is kept.
Pass "" (empty string) to remove the caption.
parse_mode (``str``, *optional*):
By default, texts are parsed using both Markdown and HTML styles.
You can combine both syntaxes together.
Pass "markdown" or "md" to enable Markdown-style parsing only.
Pass "html" to enable HTML-style parsing only.
Pass None to completely disable style parsing.
caption_entities (List of :obj:`~pyrogram.types.MessageEntity`):
List of special entities that appear in the new caption, which can be specified instead of __parse_mode__.
disable_notification (``bool``, *optional*):
Sends the message silently.
Users will receive a notification with no sound.
reply_to_message_id (``int``, *optional*):
If the message is a reply, ID of the original message.
schedule_date (``int``, *optional*):
Date when the message will be automatically sent. Unix time.
reply_markup (:obj:`~pyrogram.types.InlineKeyboardMarkup` | :obj:`~pyrogram.types.ReplyKeyboardMarkup` | :obj:`~pyrogram.types.ReplyKeyboardRemove` | :obj:`~pyrogram.types.ForceReply`, *optional*):
Additional interface options. An object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
Returns:
:obj:`~pyrogram.types.Message`: On success, the copied message is returned.
Raises:
RPCError: In case of a Telegram RPC error.
"""
if self.service:
log.warning(f"Service messages cannot be copied. "
f"chat_id: {self.chat.id}, message_id: {self.message_id}")
elif self.game and not await self._client.storage.is_bot():
log.warning(f"Users cannot send messages with Game media type. "
f"chat_id: {self.chat.id}, message_id: {self.message_id}")
elif self.text:
return await self._client.send_message(
chat_id,
text=self.text,
entities=self.entities,
disable_web_page_preview=not self.web_page,
disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id,
schedule_date=schedule_date,
reply_markup=reply_markup
)
elif self.media:
send_media = partial(
self._client.send_cached_media,
chat_id=chat_id,
disable_notification=disable_notification,
reply_to_message_id=reply_to_message_id,
schedule_date=schedule_date,
reply_markup=reply_markup
)
if self.photo:
file_id = self.photo.file_id
elif self.audio:
file_id = self.audio.file_id
elif self.document:
file_id = self.document.file_id
elif self.video:
file_id = self.video.file_id
elif self.animation:
file_id = self.animation.file_id
elif self.voice:
file_id = self.voice.file_id
elif self.sticker:
file_id = self.sticker.file_id
elif self.video_note:
file_id = self.video_note.file_id
elif self.contact:
return await self._client.send_contact(
chat_id,
phone_number=self.contact.phone_number,
first_name=self.contact.first_name,
last_name=self.contact.last_name,
vcard=self.contact.vcard,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif self.location:
return await self._client.send_location(
chat_id,
latitude=self.location.latitude,
longitude=self.location.longitude,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif self.venue:
return await self._client.send_venue(
chat_id,
latitude=self.venue.location.latitude,
longitude=self.venue.location.longitude,
title=self.venue.title,
address=self.venue.address,
foursquare_id=self.venue.foursquare_id,
foursquare_type=self.venue.foursquare_type,
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif self.poll:
return await self._client.send_poll(
chat_id,
question=self.poll.question,
options=[opt.text for opt in self.poll.options],
disable_notification=disable_notification,
schedule_date=schedule_date
)
elif self.game:
return await self._client.send_game(
chat_id,
game_short_name=self.game.short_name,
disable_notification=disable_notification
)
else:
raise ValueError("Unknown media type")
if self.sticker or self.video_note: # Sticker and VideoNote should have no caption
return await send_media(file_id=file_id)
else:
if caption is None:
caption = self.caption
caption_entities = self.caption_entities
return await send_media(
file_id=file_id,
caption=caption,
parse_mode=parse_mode,
caption_entities=caption_entities
)
else:
raise ValueError("Can't copy this message")
async def delete(self, revoke: bool = True):
"""Bound method *delete* of :obj:`~pyrogram.types.Message`.