Enable scheduling of more than 1 updates worker

This commit is contained in:
Dan 2019-06-24 17:33:33 +02:00
parent 3237847ce1
commit 656aa4a7ca
2 changed files with 16 additions and 11 deletions

View File

@ -17,9 +17,7 @@
# along with Pyrogram. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import base64
import inspect
import json
import logging
import math
import mimetypes
@ -325,7 +323,12 @@ class Client(Methods, BaseClient):
await self.session.stop()
raise e
self.updates_worker_task = asyncio.ensure_future(self.updates_worker())
for _ in range(Client.UPDATES_WORKERS):
self.updates_worker_tasks.append(
asyncio.ensure_future(self.updates_worker())
)
log.info("Started {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS))
for _ in range(Client.DOWNLOAD_WORKERS):
self.download_worker_tasks.append(
@ -367,8 +370,15 @@ class Client(Methods, BaseClient):
log.info("Stopped {} DownloadWorkerTasks".format(Client.DOWNLOAD_WORKERS))
self.updates_queue.put_nowait(None)
await self.updates_worker_task
for _ in range(Client.UPDATES_WORKERS):
self.updates_queue.put_nowait(None)
for task in self.updates_worker_tasks:
await task
self.updates_worker_tasks.clear()
log.info("Stopped {} UpdatesWorkerTasks".format(Client.UPDATES_WORKERS))
for media_session in self.media_sessions.values():
await media_session.stop()
@ -862,8 +872,6 @@ class Client(Methods, BaseClient):
done.set()
async def updates_worker(self):
log.info("UpdatesWorkerTask started")
while True:
updates = await self.updates_queue.get()
@ -946,8 +954,6 @@ class Client(Methods, BaseClient):
except Exception as e:
log.error(e, exc_info=True)
log.info("UpdatesWorkerTask stopped")
async def send(self,
data: TLObject,
retries: int = Session.MAX_RETRIES,

View File

@ -24,7 +24,6 @@ import sys
from pathlib import Path
from pyrogram import __version__
from ..style import Markdown, HTML
from ...session.internals import MsgId
@ -105,7 +104,7 @@ class BaseClient:
self.takeout_id = None
self.updates_queue = asyncio.Queue()
self.updates_worker_task = None
self.updates_worker_tasks = []
self.download_queue = asyncio.Queue()
self.download_worker_tasks = []