479 lines
15 KiB
Python
479 lines
15 KiB
Python
# Pyrogram - Telegram MTProto API Client Library for Python
|
|
# Copyright (C) 2017-2020 Dan <https://github.com/delivrance>
|
|
#
|
|
# This file is part of Pyrogram.
|
|
#
|
|
# Pyrogram is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Pyrogram is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import logging
|
|
import struct
|
|
from enum import IntEnum
|
|
from io import BytesIO
|
|
|
|
from pyrogram.raw.core import Bytes, String
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def b64_encode(s: bytes) -> str:
|
|
"""Encode bytes into a URL-safe Base64 string without padding
|
|
|
|
Parameters:
|
|
s (``bytes``):
|
|
Bytes to encode
|
|
|
|
Returns:
|
|
``str``: The encoded bytes
|
|
"""
|
|
return base64.urlsafe_b64encode(s).decode().strip("=")
|
|
|
|
|
|
def b64_decode(s: str) -> bytes:
|
|
"""Decode a URL-safe Base64 string without padding to bytes
|
|
|
|
Parameters:
|
|
s (``str``):
|
|
String to decode
|
|
|
|
Returns:
|
|
``bytes``: The decoded string
|
|
"""
|
|
return base64.urlsafe_b64decode(s + "=" * (-len(s) % 4))
|
|
|
|
|
|
def rle_encode(s: bytes) -> bytes:
|
|
"""Zero-value RLE encoder
|
|
|
|
Parameters:
|
|
s (``bytes``):
|
|
Bytes to encode
|
|
|
|
Returns:
|
|
``bytes``: The encoded bytes
|
|
"""
|
|
r = b""
|
|
n = 0
|
|
|
|
for b in s:
|
|
if b == 0:
|
|
n += 1
|
|
else:
|
|
if n > 0:
|
|
r += bytes([0, n])
|
|
n = 0
|
|
|
|
r += bytes([b])
|
|
|
|
if n > 0:
|
|
r += bytes([0, n])
|
|
|
|
return r
|
|
|
|
|
|
def rle_decode(s: bytes) -> bytes:
|
|
"""Zero-value RLE decoder
|
|
|
|
Parameters:
|
|
s (``bytes``):
|
|
Bytes to encode
|
|
|
|
Returns:
|
|
``bytes``: The encoded bytes
|
|
"""
|
|
r = b""
|
|
i = 0
|
|
|
|
while i < len(s):
|
|
if s[i] != 0:
|
|
r += bytes([s[i]])
|
|
else:
|
|
r += b"\x00" * s[i + 1]
|
|
i += 1
|
|
|
|
i += 1
|
|
|
|
return r
|
|
|
|
|
|
class FileType(IntEnum):
|
|
"""Known file types"""
|
|
THUMBNAIL = 0
|
|
CHAT_PHOTO = 1 # ProfilePhoto
|
|
PHOTO = 2
|
|
VOICE = 3 # VoiceNote
|
|
VIDEO = 4
|
|
DOCUMENT = 5
|
|
ENCRYPTED = 6
|
|
TEMP = 7
|
|
STICKER = 8
|
|
AUDIO = 9
|
|
ANIMATION = 10
|
|
ENCRYPTED_THUMBNAIL = 11
|
|
WALLPAPER = 12
|
|
VIDEO_NOTE = 13
|
|
SECURE_RAW = 14
|
|
SECURE = 15
|
|
BACKGROUND = 16
|
|
DOCUMENT_AS_FILE = 17
|
|
|
|
|
|
class ThumbnailSource(IntEnum):
|
|
"""Known thumbnail sources"""
|
|
LEGACY = 0
|
|
THUMBNAIL = 1
|
|
CHAT_PHOTO_SMALL = 2 # DialogPhotoSmall
|
|
CHAT_PHOTO_BIG = 3 # DialogPhotoBig
|
|
STICKER_SET_THUMBNAIL = 4
|
|
|
|
|
|
# Photo-like file ids are longer and contain extra info, the rest are all documents
|
|
PHOTO_TYPES = {FileType.THUMBNAIL, FileType.CHAT_PHOTO, FileType.PHOTO, FileType.WALLPAPER,
|
|
FileType.ENCRYPTED_THUMBNAIL}
|
|
DOCUMENT_TYPES = set(FileType) - PHOTO_TYPES
|
|
|
|
# Since the file type values are small enough to fit them in few bits, Telegram thought it would be a good idea to
|
|
# encode extra information about web url and file reference existence as flag inside the 4 bytes allocated for the field
|
|
WEB_LOCATION_FLAG = 1 << 24
|
|
FILE_REFERENCE_FLAG = 1 << 25
|
|
|
|
|
|
class FileId:
|
|
MAJOR = 4
|
|
MINOR = 30
|
|
|
|
def __init__(
|
|
self, *,
|
|
major: int = MAJOR,
|
|
minor: int = MINOR,
|
|
file_type: FileType,
|
|
dc_id: int,
|
|
file_reference: bytes = None,
|
|
url: str = None,
|
|
media_id: int = None,
|
|
access_hash: int = None,
|
|
volume_id: int = None,
|
|
thumbnail_source: ThumbnailSource = None,
|
|
thumbnail_file_type: str = None,
|
|
thumbnail_size: str = None,
|
|
secret: int = None,
|
|
local_id: str = None,
|
|
chat_id: int = None,
|
|
chat_access_hash: int = None,
|
|
sticker_set_id: int = None,
|
|
sticker_set_access_hash: int = None
|
|
):
|
|
self.major = major
|
|
self.minor = minor
|
|
self.file_type = file_type
|
|
self.dc_id = dc_id
|
|
self.file_reference = file_reference
|
|
self.url = url
|
|
self.media_id = media_id
|
|
self.access_hash = access_hash
|
|
self.volume_id = volume_id
|
|
self.thumbnail_source = thumbnail_source
|
|
self.thumbnail_file_type = thumbnail_file_type
|
|
self.thumbnail_size = thumbnail_size
|
|
self.secret = secret
|
|
self.local_id = local_id
|
|
self.chat_id = chat_id
|
|
self.chat_access_hash = chat_access_hash
|
|
self.sticker_set_id = sticker_set_id
|
|
self.sticker_set_access_hash = sticker_set_access_hash
|
|
|
|
@staticmethod
|
|
def decode(file_id: str):
|
|
decoded = rle_decode(b64_decode(file_id))
|
|
|
|
# region read version
|
|
# File id versioning. Major versions lower than 4 don't have a minor version
|
|
major = decoded[-1]
|
|
|
|
if major < 4:
|
|
minor = 0
|
|
buffer = BytesIO(decoded[:-1])
|
|
else:
|
|
minor = decoded[-2]
|
|
buffer = BytesIO(decoded[:-2])
|
|
# endregion
|
|
|
|
file_type, dc_id = struct.unpack("<ii", buffer.read(8))
|
|
|
|
# region media type flags
|
|
# Check for flags existence
|
|
has_web_location = bool(file_type & WEB_LOCATION_FLAG)
|
|
has_file_reference = bool(file_type & FILE_REFERENCE_FLAG)
|
|
|
|
# Remove flags to restore the actual type id value
|
|
file_type &= ~WEB_LOCATION_FLAG
|
|
file_type &= ~FILE_REFERENCE_FLAG
|
|
# endregion
|
|
|
|
try:
|
|
file_type = FileType(file_type)
|
|
except ValueError:
|
|
raise ValueError(f"Unknown file_type {file_type} of file_id {file_id}")
|
|
|
|
if has_web_location:
|
|
url = String.read(buffer)
|
|
access_hash, = struct.unpack("<q", buffer.read(8))
|
|
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
url=url,
|
|
access_hash=access_hash
|
|
)
|
|
|
|
file_reference = Bytes.read(buffer) if has_file_reference else None
|
|
media_id, access_hash = struct.unpack("<qq", buffer.read(16))
|
|
|
|
if file_type in PHOTO_TYPES:
|
|
volume_id, = struct.unpack("<q", buffer.read(8))
|
|
thumbnail_source, = (0,) if major < 4 else struct.unpack("<i", buffer.read(4))
|
|
|
|
try:
|
|
thumbnail_source = ThumbnailSource(thumbnail_source)
|
|
except ValueError:
|
|
raise ValueError(f"Unknown thumbnail_source {thumbnail_source} of file_id {file_id}")
|
|
|
|
if thumbnail_source == ThumbnailSource.LEGACY:
|
|
secret, local_id = struct.unpack("<qi", buffer.read(12))
|
|
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
file_reference=file_reference,
|
|
media_id=media_id,
|
|
access_hash=access_hash,
|
|
volume_id=volume_id,
|
|
thumbnail_source=thumbnail_source,
|
|
secret=secret,
|
|
local_id=local_id
|
|
)
|
|
|
|
if thumbnail_source == ThumbnailSource.THUMBNAIL:
|
|
thumbnail_file_type, thumbnail_size, local_id = struct.unpack("<iii", buffer.read(12))
|
|
thumbnail_size = chr(thumbnail_size)
|
|
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
file_reference=file_reference,
|
|
media_id=media_id,
|
|
access_hash=access_hash,
|
|
volume_id=volume_id,
|
|
thumbnail_source=thumbnail_source,
|
|
thumbnail_file_type=thumbnail_file_type,
|
|
thumbnail_size=thumbnail_size,
|
|
local_id=local_id
|
|
)
|
|
|
|
if thumbnail_source in (ThumbnailSource.CHAT_PHOTO_SMALL, ThumbnailSource.CHAT_PHOTO_BIG):
|
|
chat_id, chat_access_hash, local_id = struct.unpack("<qqi", buffer.read(20))
|
|
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
file_reference=file_reference,
|
|
media_id=media_id,
|
|
access_hash=access_hash,
|
|
volume_id=volume_id,
|
|
thumbnail_source=thumbnail_source,
|
|
chat_id=chat_id,
|
|
chat_access_hash=chat_access_hash,
|
|
local_id=local_id
|
|
)
|
|
|
|
if thumbnail_source == ThumbnailSource.STICKER_SET_THUMBNAIL:
|
|
sticker_set_id, sticker_set_access_hash, local_id = struct.unpack("<qqi", buffer.read(20))
|
|
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
file_reference=file_reference,
|
|
media_id=media_id,
|
|
access_hash=access_hash,
|
|
volume_id=volume_id,
|
|
thumbnail_source=thumbnail_source,
|
|
sticker_set_id=sticker_set_id,
|
|
sticker_set_access_hash=sticker_set_access_hash,
|
|
local_id=local_id
|
|
)
|
|
|
|
if file_type in DOCUMENT_TYPES:
|
|
return FileId(
|
|
major=major,
|
|
minor=minor,
|
|
file_type=file_type,
|
|
dc_id=dc_id,
|
|
file_reference=file_reference,
|
|
media_id=media_id,
|
|
access_hash=access_hash
|
|
)
|
|
|
|
def encode(self, *, major: int = None, minor: int = None):
|
|
major = major if major is not None else self.MAJOR
|
|
minor = minor if minor is not None else self.MINOR
|
|
|
|
buffer = BytesIO()
|
|
|
|
file_type = self.file_type
|
|
|
|
if self.url:
|
|
file_type |= WEB_LOCATION_FLAG
|
|
|
|
if self.file_reference:
|
|
file_type |= FILE_REFERENCE_FLAG
|
|
|
|
buffer.write(struct.pack("<ii", file_type, self.dc_id))
|
|
|
|
if self.url:
|
|
buffer.write(String(self.url))
|
|
|
|
if self.file_reference:
|
|
buffer.write(Bytes(self.file_reference))
|
|
|
|
buffer.write(struct.pack("<qq", self.media_id, self.access_hash))
|
|
|
|
if self.file_type in PHOTO_TYPES:
|
|
buffer.write(struct.pack("<q", self.volume_id))
|
|
|
|
if major >= 4:
|
|
buffer.write(struct.pack("<i", self.thumbnail_source))
|
|
|
|
if self.thumbnail_source == ThumbnailSource.LEGACY:
|
|
buffer.write(struct.pack("<qi", self.secret, self.local_id))
|
|
elif self.thumbnail_source == ThumbnailSource.THUMBNAIL:
|
|
buffer.write(struct.pack(
|
|
"<iii",
|
|
self.thumbnail_file_type,
|
|
ord(self.thumbnail_size),
|
|
self.local_id
|
|
))
|
|
elif self.thumbnail_source in (ThumbnailSource.CHAT_PHOTO_SMALL, ThumbnailSource.CHAT_PHOTO_BIG):
|
|
buffer.write(struct.pack(
|
|
"<qqi",
|
|
self.chat_id,
|
|
self.chat_access_hash,
|
|
self.local_id
|
|
))
|
|
elif self.thumbnail_source == ThumbnailSource.STICKER_SET_THUMBNAIL:
|
|
buffer.write(struct.pack(
|
|
"<qqi",
|
|
self.sticker_set_id,
|
|
self.sticker_set_access_hash,
|
|
self.local_id
|
|
))
|
|
elif file_type in DOCUMENT_TYPES:
|
|
buffer.write(struct.pack("<ii", minor, major))
|
|
|
|
buffer.write(struct.pack("<bb", minor, major))
|
|
|
|
return b64_encode(rle_encode(buffer.getvalue()))
|
|
|
|
def __str__(self):
|
|
return str({k: v for k, v in self.__dict__.items() if v is not None})
|
|
|
|
|
|
class FileUniqueType(IntEnum):
|
|
"""Known file unique types"""
|
|
WEB = 0
|
|
PHOTO = 1
|
|
DOCUMENT = 2
|
|
SECURE = 3
|
|
ENCRYPTED = 4
|
|
TEMP = 5
|
|
|
|
|
|
class FileUniqueId:
|
|
def __init__(
|
|
self, *,
|
|
file_unique_type: FileUniqueType,
|
|
url: str = None,
|
|
media_id: int = None,
|
|
volume_id: int = None,
|
|
local_id: int = None
|
|
):
|
|
self.file_unique_type = file_unique_type
|
|
self.url = url
|
|
self.media_id = media_id
|
|
self.volume_id = volume_id
|
|
self.local_id = local_id
|
|
|
|
@staticmethod
|
|
def decode(file_unique_id: str):
|
|
buffer = BytesIO(rle_decode(b64_decode(file_unique_id)))
|
|
file_unique_type, = struct.unpack("<i", buffer.read(4))
|
|
|
|
try:
|
|
file_unique_type = FileUniqueType(file_unique_type)
|
|
except ValueError:
|
|
raise ValueError(f"Unknown file_unique_type {file_unique_type} of file_unique_id {file_unique_id}")
|
|
|
|
if file_unique_type == FileUniqueType.WEB:
|
|
url = String.read(buffer)
|
|
|
|
return FileUniqueId(
|
|
file_unique_type=file_unique_type,
|
|
url=url
|
|
)
|
|
|
|
if file_unique_type == FileUniqueType.PHOTO:
|
|
volume_id, local_id = struct.unpack("<qi", buffer.read())
|
|
|
|
return FileUniqueId(
|
|
file_unique_type=file_unique_type,
|
|
volume_id=volume_id,
|
|
local_id=local_id
|
|
)
|
|
|
|
if file_unique_type == FileUniqueType.DOCUMENT:
|
|
media_id, = struct.unpack("<q", buffer.read())
|
|
|
|
return FileUniqueId(
|
|
file_unique_type=file_unique_type,
|
|
media_id=media_id
|
|
)
|
|
|
|
# TODO: Missing decoder for SECURE, ENCRYPTED and TEMP
|
|
raise ValueError(f"Unknown decoder for file_unique_type {file_unique_type} of file_unique_id {file_unique_id}")
|
|
|
|
def encode(self):
|
|
if self.file_unique_type == FileUniqueType.WEB:
|
|
string = struct.pack("<is", self.file_unique_type, String(self.url))
|
|
elif self.file_unique_type == FileUniqueType.PHOTO:
|
|
string = struct.pack("<iqi", self.file_unique_type, self.volume_id, self.local_id)
|
|
elif self.file_unique_type == FileUniqueType.DOCUMENT:
|
|
string = struct.pack("<iq", self.file_unique_type, self.media_id)
|
|
else:
|
|
# TODO: Missing encoder for SECURE, ENCRYPTED and TEMP
|
|
raise ValueError(f"Unknown encoder for file_unique_type {self.file_unique_type}")
|
|
|
|
return b64_encode(rle_encode(string))
|
|
|
|
def __str__(self):
|
|
return str({k: v for k, v in self.__dict__.items() if v is not None})
|