First step of Client conversion using asyncio

This commit is contained in:
Dan 2018-06-13 20:00:19 +02:00
parent e2546e77ae
commit e333e8dada

View File

@ -34,7 +34,6 @@ from configparser import ConfigParser
from datetime import datetime from datetime import datetime
from hashlib import sha256, md5 from hashlib import sha256, md5
from signal import signal, SIGINT, SIGTERM, SIGABRT from signal import signal, SIGINT, SIGTERM, SIGABRT
from threading import Thread
from pyrogram.api import functions, types from pyrogram.api import functions, types
from pyrogram.api.core import Object from pyrogram.api.core import Object
@ -169,7 +168,7 @@ class Client(Methods, BaseClient):
self._proxy["enabled"] = True self._proxy["enabled"] = True
self._proxy.update(value) self._proxy.update(value)
def start(self, debug: bool = False): async def start(self, debug: bool = False):
"""Use this method to start the Client after creating it. """Use this method to start the Client after creating it.
Requires no parameters. Requires no parameters.
@ -200,7 +199,7 @@ class Client(Methods, BaseClient):
client=self client=self
) )
self.session.start() await self.session.start()
self.is_started = True self.is_started = True
if self.user_id is None: if self.user_id is None:
@ -224,66 +223,66 @@ class Client(Methods, BaseClient):
self.send(functions.messages.GetPinnedDialogs()) self.send(functions.messages.GetPinnedDialogs())
self.get_dialogs_chunk(0) self.get_dialogs_chunk(0)
else: else:
self.send(functions.updates.GetState()) await self.send(functions.updates.GetState())
for i in range(self.UPDATES_WORKERS): # for i in range(self.UPDATES_WORKERS):
self.updates_workers_list.append( # self.updates_workers_list.append(
Thread( # Thread(
target=self.updates_worker, # target=self.updates_worker,
name="UpdatesWorker#{}".format(i + 1) # name="UpdatesWorker#{}".format(i + 1)
) # )
) # )
#
self.updates_workers_list[-1].start() # self.updates_workers_list[-1].start()
#
for i in range(self.DOWNLOAD_WORKERS): # for i in range(self.DOWNLOAD_WORKERS):
self.download_workers_list.append( # self.download_workers_list.append(
Thread( # Thread(
target=self.download_worker, # target=self.download_worker,
name="DownloadWorker#{}".format(i + 1) # name="DownloadWorker#{}".format(i + 1)
) # )
) # )
#
self.download_workers_list[-1].start() # self.download_workers_list[-1].start()
#
self.dispatcher.start() # self.dispatcher.start()
mimetypes.init() mimetypes.init()
Syncer.add(self) # Syncer.add(self)
def stop(self): async def stop(self):
"""Use this method to manually stop the Client. """Use this method to manually stop the Client.
Requires no parameters. Requires no parameters.
""" """
if not self.is_started: if not self.is_started:
raise ConnectionError("Client is already stopped") raise ConnectionError("Client is already stopped")
Syncer.remove(self) # Syncer.remove(self)
self.dispatcher.stop() # self.dispatcher.stop()
#
for _ in range(self.DOWNLOAD_WORKERS): # for _ in range(self.DOWNLOAD_WORKERS):
self.download_queue.put(None) # self.download_queue.put(None)
#
for i in self.download_workers_list: # for i in self.download_workers_list:
i.join() # i.join()
#
self.download_workers_list.clear() # self.download_workers_list.clear()
#
for _ in range(self.UPDATES_WORKERS): # for _ in range(self.UPDATES_WORKERS):
self.updates_queue.put(None) # self.updates_queue.put(None)
#
for i in self.updates_workers_list: # for i in self.updates_workers_list:
i.join() # i.join()
#
self.updates_workers_list.clear() # self.updates_workers_list.clear()
#
for i in self.media_sessions.values(): # for i in self.media_sessions.values():
i.stop() # i.stop()
#
self.media_sessions.clear() # self.media_sessions.clear()
self.is_started = False self.is_started = False
self.session.stop() await self.session.stop()
def add_handler(self, handler, group: int = 0): def add_handler(self, handler, group: int = 0):
"""Use this method to register an update handler. """Use this method to register an update handler.
@ -812,7 +811,7 @@ class Client(Methods, BaseClient):
self.stop() self.stop()
def send(self, data: Object): async def send(self, data: Object):
"""Use this method to send Raw Function queries. """Use this method to send Raw Function queries.
This method makes possible to manually call every single Telegram API method in a low-level manner. This method makes possible to manually call every single Telegram API method in a low-level manner.
@ -829,7 +828,7 @@ class Client(Methods, BaseClient):
if not self.is_started: if not self.is_started:
raise ConnectionError("Client has not been started") raise ConnectionError("Client has not been started")
r = self.session.send(data) r = await self.session.send(data)
self.fetch_peers(getattr(r, "users", [])) self.fetch_peers(getattr(r, "users", []))
self.fetch_peers(getattr(r, "chats", [])) self.fetch_peers(getattr(r, "chats", []))