diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index 99ba1805..e7da9117 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -17,9 +17,7 @@ # along with Pyrogram. If not, see . import asyncio -import base64 import inspect -import json import logging import math import mimetypes @@ -325,7 +323,12 @@ class Client(Methods, BaseClient): await self.session.stop() raise e - self.updates_worker_task = asyncio.ensure_future(self.updates_worker()) + for _ in range(Client.UPDATES_WORKERS): + self.updates_worker_tasks.append( + asyncio.ensure_future(self.updates_worker()) + ) + + log.info("Started {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS)) for _ in range(Client.DOWNLOAD_WORKERS): self.download_worker_tasks.append( @@ -367,8 +370,15 @@ class Client(Methods, BaseClient): log.info("Stopped {} DownloadWorkerTasks".format(Client.DOWNLOAD_WORKERS)) - self.updates_queue.put_nowait(None) - await self.updates_worker_task + for _ in range(Client.UPDATES_WORKERS): + self.updates_queue.put_nowait(None) + + for task in self.updates_worker_tasks: + await task + + self.updates_worker_tasks.clear() + + log.info("Stopped {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS)) for media_session in self.media_sessions.values(): await media_session.stop() @@ -862,8 +872,6 @@ class Client(Methods, BaseClient): done.set() async def updates_worker(self): - log.info("UpdatesWorkerTask started") - while True: updates = await self.updates_queue.get() @@ -946,8 +954,6 @@ class Client(Methods, BaseClient): except Exception as e: log.error(e, exc_info=True) - log.info("UpdatesWorkerTask stopped") - async def send(self, data: TLObject, retries: int = Session.MAX_RETRIES, diff --git a/pyrogram/client/ext/base_client.py b/pyrogram/client/ext/base_client.py index 0b1f9bff..3d513031 100644 --- a/pyrogram/client/ext/base_client.py +++ b/pyrogram/client/ext/base_client.py @@ -24,7 +24,6 @@ import sys from pathlib import Path from pyrogram import __version__ - from ..style import Markdown, HTML from ...session.internals import MsgId @@ -105,7 +104,7 @@ class BaseClient: self.takeout_id = None self.updates_queue = asyncio.Queue() - self.updates_worker_task = None + self.updates_worker_tasks = [] self.download_queue = asyncio.Queue() self.download_worker_tasks = []