From 8e9e8b4ac46edadaa7cb2bd1c151af7ad7fe2ba9 Mon Sep 17 00:00:00 2001 From: Dan <14043624+delivrance@users.noreply.github.com> Date: Sun, 27 Oct 2019 09:35:33 +0100 Subject: [PATCH] Allow stop, restart and add/remove_handler to be non-blocking --- pyrogram/client/client.py | 50 ++++++++++++++++----- pyrogram/client/ext/dispatcher.py | 74 +++++++++++++++++++------------ 2 files changed, 86 insertions(+), 38 deletions(-) diff --git a/pyrogram/client/client.py b/pyrogram/client/client.py index fcb17289..10b3ba2a 100644 --- a/pyrogram/client/client.py +++ b/pyrogram/client/client.py @@ -328,12 +328,18 @@ class Client(Methods, BaseClient): self.is_initialized = True - def terminate(self): + def terminate(self, block: bool = True): """Terminate the client by shutting down workers. This method does the opposite of :meth:`~Client.initialize`. It will stop the dispatcher and shut down updates and download workers. + Parameters: + block (``bool``, *optional*): + Blocks the code execution until the client has been terminated. It is useful with ``block=False`` in + case you want to terminate the own client *within* an handler in order not to cause a deadlock. + Defaults to True. + Raises: ConnectionError: In case you try to terminate a client that is already terminated. """ @@ -345,7 +351,7 @@ class Client(Methods, BaseClient): log.warning("Takeout session {} finished".format(self.takeout_id)) Syncer.remove(self) - self.dispatcher.stop() + self.dispatcher.stop(block) for _ in range(self.DOWNLOAD_WORKERS): self.download_queue.put(None) @@ -840,11 +846,17 @@ class Client(Methods, BaseClient): self.initialize() return self - def stop(self): + def stop(self, block: bool = True): """Stop the Client. This method disconnects the client from Telegram and stops the underlying tasks. + Parameters: + block (``bool``, *optional*): + Blocks the code execution until the client has been stopped. It is useful with ``block=False`` in case + you want to stop the own client *within* an handler in order not to cause a deadlock. + Defaults to True. + Returns: :obj:`Client`: The stopped client itself. @@ -864,17 +876,23 @@ class Client(Methods, BaseClient): app.stop() """ - self.terminate() + self.terminate(block) self.disconnect() return self - def restart(self): + def restart(self, block: bool = True): """Restart the Client. This method will first call :meth:`~Client.stop` and then :meth:`~Client.start` in a row in order to restart a client using a single method. + Parameters: + block (``bool``, *optional*): + Blocks the code execution until the client has been restarted. It is useful with ``block=False`` in case + you want to restart the own client *within* an handler in order not to cause a deadlock. + Defaults to True. + Returns: :obj:`Client`: The restarted client itself. @@ -898,7 +916,7 @@ class Client(Methods, BaseClient): app.stop() """ - self.stop() + self.stop(block) self.start() return self @@ -985,7 +1003,7 @@ class Client(Methods, BaseClient): Client.idle() self.stop() - def add_handler(self, handler: Handler, group: int = 0): + def add_handler(self, handler: Handler, group: int = 0, block: bool = True): """Register an update handler. You can register multiple handlers, but at most one handler within a group will be used for a single update. @@ -1000,6 +1018,11 @@ class Client(Methods, BaseClient): group (``int``, *optional*): The group identifier, defaults to 0. + block (``bool``, *optional*): + Blocks the code execution until the handler has been added. It is useful with ``block=False`` in case + you want to register a new handler *within* another handler in order not to cause a deadlock. + Defaults to True. + Returns: ``tuple``: A tuple consisting of *(handler, group)*. @@ -1021,11 +1044,11 @@ class Client(Methods, BaseClient): if isinstance(handler, DisconnectHandler): self.disconnect_handler = handler.callback else: - self.dispatcher.add_handler(handler, group) + self.dispatcher.add_handler(handler, group, block) return handler, group - def remove_handler(self, handler: Handler, group: int = 0): + def remove_handler(self, handler: Handler, group: int = 0, block: bool = True): """Remove a previously-registered update handler. Make sure to provide the right group where the handler was added in. You can use the return value of the @@ -1038,6 +1061,13 @@ class Client(Methods, BaseClient): group (``int``, *optional*): The group identifier, defaults to 0. + block (``bool``, *optional*): + Blocks the code execution until the handler has been removed. It is useful with ``block=False`` in case + you want to remove a previously registered handler *within* another handler in order not to cause a + deadlock. + Defaults to True. + + Example: .. code-block:: python :emphasize-lines: 11 @@ -1059,7 +1089,7 @@ class Client(Methods, BaseClient): if isinstance(handler, DisconnectHandler): self.disconnect_handler = None else: - self.dispatcher.remove_handler(handler, group) + self.dispatcher.remove_handler(handler, group, block) def stop_transmission(self): """Stop downloading or uploading a file. diff --git a/pyrogram/client/ext/dispatcher.py b/pyrogram/client/ext/dispatcher.py index e9cd912e..29439564 100644 --- a/pyrogram/client/ext/dispatcher.py +++ b/pyrogram/client/ext/dispatcher.py @@ -118,43 +118,61 @@ class Dispatcher: self.workers_list[-1].start() - def stop(self): - for _ in range(self.workers): - self.updates_queue.put(None) + def stop(self, block: bool = True): + def do_it(): + for _ in range(self.workers): + self.updates_queue.put(None) - for worker in self.workers_list: - worker.join() + for worker in self.workers_list: + worker.join() - self.workers_list.clear() - self.locks_list.clear() - self.groups.clear() + self.workers_list.clear() + self.locks_list.clear() + self.groups.clear() - def add_handler(self, handler, group: int): - for lock in self.locks_list: - lock.acquire() + if block: + do_it() + else: + Thread(target=do_it).start() - try: - if group not in self.groups: - self.groups[group] = [] - self.groups = OrderedDict(sorted(self.groups.items())) - - self.groups[group].append(handler) - finally: + def add_handler(self, handler, group: int, block: bool = True): + def do_it(): for lock in self.locks_list: - lock.release() + lock.acquire() - def remove_handler(self, handler, group: int): - for lock in self.locks_list: - lock.acquire() + try: + if group not in self.groups: + self.groups[group] = [] + self.groups = OrderedDict(sorted(self.groups.items())) - try: - if group not in self.groups: - raise ValueError("Group {} does not exist. Handler was not removed.".format(group)) + self.groups[group].append(handler) + finally: + for lock in self.locks_list: + lock.release() - self.groups[group].remove(handler) - finally: + if block: + do_it() + else: + Thread(target=do_it).start() + + def remove_handler(self, handler, group: int, block: bool = True): + def do_it(): for lock in self.locks_list: - lock.release() + lock.acquire() + + try: + if group not in self.groups: + raise ValueError("Group {} does not exist. Handler was not removed.".format(group)) + + self.groups[group].remove(handler) + finally: + for lock in self.locks_list: + lock.release() + + if block: + do_it() + else: + Thread(target=do_it).start() def update_worker(self, lock): name = threading.current_thread().name