# Pyrogram - Telegram MTProto API Client Library for Python # Copyright (C) 2017-2019 Dan Tès # # This file is part of Pyrogram. # # Pyrogram is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Pyrogram is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with Pyrogram. If not, see . import binascii import os import struct import time from datetime import datetime from threading import Event from typing import Union import pyrogram from pyrogram.client.ext import BaseClient, FileData, utils from pyrogram.errors import FileIdInvalid DEFAULT_DOWNLOAD_DIR = "downloads/" class DownloadMedia(BaseClient): def download_media( self, message: Union["pyrogram.Message", str], file_name: str = DEFAULT_DOWNLOAD_DIR, block: bool = True, progress: callable = None, progress_args: tuple = () ) -> Union[str, None]: """Download the media from a message. Parameters: message (:obj:`Message` | ``str``): Pass a Message containing the media, the media itself (message.audio, message.video, ...) or the file id as string. file_name (``str``, *optional*): A custom *file_name* to be used instead of the one provided by Telegram. By default, all files are downloaded in the *downloads* folder in your working directory. You can also specify a path for downloading files in a custom location: paths that end with "/" are considered directories. All non-existent folders will be created automatically. block (``bool``, *optional*): Blocks the code execution until the file has been downloaded. Defaults to True. progress (``callable``): Pass a callback function to view the download progress. The function must take *(client, current, total, \*args)* as positional arguments (look at the section below for a detailed description). progress_args (``tuple``): Extra custom arguments for the progress callback function. Useful, for example, if you want to pass a chat_id and a message_id in order to edit a message with the updated progress. Other Parameters: client (:obj:`Client`): The Client itself, useful when you want to call other API methods inside the callback function. current (``int``): The amount of bytes downloaded so far. total (``int``): The size of the file. *args (``tuple``, *optional*): Extra custom arguments as defined in the *progress_args* parameter. You can either keep *\*args* or add every single extra argument in your function signature. Returns: ``str`` | ``None``: On success, the absolute path of the downloaded file is returned, otherwise, in case the download failed or was deliberately stopped with :meth:`~Client.stop_transmission`, None is returned. Raises: RPCError: In case of a Telegram RPC error. ``ValueError`` if the message doesn't contain any downloadable media """ error_message = "This message doesn't contain any downloadable media" available_media = ("audio", "document", "photo", "sticker", "animation", "video", "voice", "video_note") media_file_name = None file_size = None mime_type = None date = None if isinstance(message, pyrogram.Message): for kind in available_media: media = getattr(message, kind, None) if media is not None: break else: raise ValueError(error_message) else: media = message if isinstance(media, str): file_id_str = media else: file_id_str = media.file_id media_file_name = getattr(media, "file_name", "") file_size = getattr(media, "file_size", None) mime_type = getattr(media, "mime_type", None) date = getattr(media, "date", None) data = FileData( file_name=media_file_name, file_size=file_size, mime_type=mime_type, date=date ) def get_existing_attributes() -> dict: return dict(filter(lambda x: x[1] is not None, data.__dict__.items())) try: decoded = utils.decode(file_id_str) media_type = decoded[0] if media_type == 1: unpacked = struct.unpack("