Merge develop -> asyncio

This commit is contained in:
Dan 2020-05-07 13:00:03 +02:00
commit 74674cd615
7 changed files with 104 additions and 130 deletions

View File

@ -23,7 +23,6 @@ import os
import re
import shutil
import tempfile
import time
from configparser import ConfigParser
from hashlib import sha256, md5
from importlib import import_module, reload
@ -154,6 +153,12 @@ class Client(Methods, BaseClient):
download_media, ...) are less prone to throw FloodWait exceptions.
Only available for users, bots will ignore this parameter.
Defaults to False (normal session).
sleep_threshold (``int``, *optional*):
Set a sleep threshold for flood wait exceptions happening globally in this client instance, below which any
request that raises a flood wait will be automatically invoked again after sleeping for the required amount
of time. Flood wait exceptions requiring higher waiting times will be raised.
Defaults to 60 (seconds).
"""
def __init__(
@ -178,7 +183,8 @@ class Client(Methods, BaseClient):
config_file: str = BaseClient.CONFIG_FILE,
plugins: dict = None,
no_updates: bool = None,
takeout: bool = None
takeout: bool = None,
sleep_threshold: int = 60
):
super().__init__()
@ -204,6 +210,7 @@ class Client(Methods, BaseClient):
self.plugins = plugins
self.no_updates = no_updates
self.takeout = takeout
self.sleep_threshold = sleep_threshold
if isinstance(session_name, str):
if session_name == ":memory:" or len(session_name) >= MemoryStorage.SESSION_STRING_SIZE:
@ -1403,13 +1410,31 @@ class Client(Methods, BaseClient):
if not self.is_connected:
raise ConnectionError("Client has not been started yet")
# Some raw methods that expect a query as argument are used here.
# Keep the original request query because is needed.
unwrapped_data = data
if self.no_updates:
data = functions.InvokeWithoutUpdates(query=data)
if self.takeout_id:
data = functions.InvokeWithTakeout(takeout_id=self.takeout_id, query=data)
r = await self.session.send(data, retries, timeout)
while True:
try:
r = await self.session.send(data, retries, timeout)
except FloodWait as e:
amount = e.x
if amount > self.sleep_threshold:
raise
log.warning('[{}] Sleeping for {}s (required by "{}")'.format(
self.session_name, amount, ".".join(unwrapped_data.QUALNAME.split(".")[1:])))
await asyncio.sleep(amount)
else:
break
self.fetch_peers(getattr(r, "users", []))
self.fetch_peers(getattr(r, "chats", []))

View File

@ -16,13 +16,11 @@
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
from typing import Union, List
import pyrogram
from pyrogram.api import functions, types
from pyrogram.errors import FloodWait
from ...ext import BaseClient
log = logging.getLogger(__name__)
@ -136,24 +134,19 @@ class GetChatMembers(BaseClient):
else:
raise ValueError("Invalid filter \"{}\"".format(filter))
while True:
try:
r = await self.send(
functions.channels.GetParticipants(
channel=peer,
filter=filter,
offset=offset,
limit=limit,
hash=0
)
)
r = await self.send(
functions.channels.GetParticipants(
channel=peer,
filter=filter,
offset=offset,
limit=limit,
hash=0
)
)
members = r.participants
users = {i.id: i for i in r.users}
members = r.participants
users = {i.id: i for i in r.users}
return pyrogram.List(pyrogram.ChatMember._parse(self, member, users) for member in members)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
return pyrogram.List(pyrogram.ChatMember._parse(self, member, users) for member in members)
else:
raise ValueError("The chat_id \"{}\" belongs to a user".format(chat_id))

View File

@ -16,13 +16,11 @@
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
from typing import List
import pyrogram
from pyrogram.api import functions, types
from pyrogram.errors import FloodWait
from ...ext import BaseClient, utils
log = logging.getLogger(__name__)
@ -66,26 +64,19 @@ class GetDialogs(BaseClient):
app.get_dialogs(pinned_only=True)
"""
while True:
try:
if pinned_only:
r = await self.send(functions.messages.GetPinnedDialogs(folder_id=0))
else:
r = await self.send(
functions.messages.GetDialogs(
offset_date=offset_date,
offset_id=0,
offset_peer=types.InputPeerEmpty(),
limit=limit,
hash=0,
exclude_pinned=True
)
)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
if pinned_only:
r = await self.send(functions.messages.GetPinnedDialogs(folder_id=0))
else:
r = await self.send(
functions.messages.GetDialogs(
offset_date=offset_date,
offset_id=0,
offset_peer=types.InputPeerEmpty(),
limit=limit,
hash=0,
exclude_pinned=True
)
)
users = {i.id: i for i in r.users}
chats = {i.id: i for i in r.chats}

View File

@ -85,28 +85,21 @@ class GetHistory(BaseClient):
offset_id = offset_id or (1 if reverse else 0)
while True:
try:
messages = await utils.parse_messages(
self,
await self.send(
functions.messages.GetHistory(
peer=await self.resolve_peer(chat_id),
offset_id=offset_id,
offset_date=offset_date,
add_offset=offset * (-1 if reverse else 1) - (limit if reverse else 0),
limit=limit,
max_id=0,
min_id=0,
hash=0
)
)
messages = await utils.parse_messages(
self,
await self.send(
functions.messages.GetHistory(
peer=await self.resolve_peer(chat_id),
offset_id=offset_id,
offset_date=offset_date,
add_offset=offset * (-1 if reverse else 1) - (limit if reverse else 0),
limit=limit,
max_id=0,
min_id=0,
hash=0
)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
)
)
if reverse:
messages.reverse()

View File

@ -112,14 +112,7 @@ class GetMessages(BaseClient):
else:
rpc = functions.messages.GetMessages(id=ids)
while True:
try:
r = await self.send(rpc)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
r = await self.send(rpc)
messages = await utils.parse_messages(self, r, replies=replies)

View File

@ -78,21 +78,14 @@ class SendMediaGroup(BaseClient):
for i in media:
if isinstance(i, pyrogram.InputMediaPhoto):
if os.path.exists(i.media):
while True:
try:
media = await self.send(
functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=types.InputMediaUploadedPhoto(
file=await self.save_file(i.media)
)
)
media = await self.send(
functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=types.InputMediaUploadedPhoto(
file=await self.save_file(i.media)
)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
)
)
media = types.InputMediaPhoto(
id=types.InputPhoto(
@ -122,32 +115,25 @@ class SendMediaGroup(BaseClient):
media = utils.get_input_media_from_file_id(i.media, i.file_ref, 2)
elif isinstance(i, pyrogram.InputMediaVideo):
if os.path.exists(i.media):
while True:
try:
media = await self.send(
functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=types.InputMediaUploadedDocument(
file=await self.save_file(i.media),
thumb=None if i.thumb is None else self.save_file(i.thumb),
mime_type=self.guess_mime_type(i.media) or "video/mp4",
attributes=[
types.DocumentAttributeVideo(
supports_streaming=i.supports_streaming or None,
duration=i.duration,
w=i.width,
h=i.height
),
types.DocumentAttributeFilename(file_name=os.path.basename(i.media))
]
)
)
media = await self.send(
functions.messages.UploadMedia(
peer=await self.resolve_peer(chat_id),
media=types.InputMediaUploadedDocument(
file=await self.save_file(i.media),
thumb=None if i.thumb is None else self.save_file(i.thumb),
mime_type=self.guess_mime_type(i.media) or "video/mp4",
attributes=[
types.DocumentAttributeVideo(
supports_streaming=i.supports_streaming or None,
duration=i.duration,
w=i.width,
h=i.height
),
types.DocumentAttributeFilename(file_name=os.path.basename(i.media))
]
)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
)
)
media = types.InputMediaDocument(
id=types.InputDocument(
@ -184,21 +170,14 @@ class SendMediaGroup(BaseClient):
)
)
while True:
try:
r = await self.send(
functions.messages.SendMultiMedia(
peer=await self.resolve_peer(chat_id),
multi_media=multi_media,
silent=disable_notification or None,
reply_to_msg_id=reply_to_message_id
)
)
except FloodWait as e:
log.warning("Sleeping for {}s".format(e.x))
await asyncio.sleep(e.x)
else:
break
r = await self.send(
functions.messages.SendMultiMedia(
peer=await self.resolve_peer(chat_id),
multi_media=multi_media,
silent=disable_notification or None,
reply_to_msg_id=reply_to_message_id
)
)
return await utils.parse_messages(
self,

View File

@ -50,13 +50,13 @@ class UpdateProfile(BaseClient):
.. code-block:: python
# Update your first name only
app.update_bio(first_name="Pyrogram")
app.update_profile(first_name="Pyrogram")
# Update first name and bio
app.update_bio(first_name="Pyrogram", bio="https://docs.pyrogram.org/")
app.update_profile(first_name="Pyrogram", bio="https://docs.pyrogram.org/")
# Remove the last name
app.update_bio(last_name="")
app.update_profile(last_name="")
"""
return bool(