Merge pull request #331 from pyrogram/I329

Closes #329
This commit is contained in:
Dan 2019-10-27 09:38:06 +01:00 committed by GitHub
commit 9afd244eb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 86 additions and 38 deletions

View File

@ -328,12 +328,18 @@ class Client(Methods, BaseClient):
self.is_initialized = True self.is_initialized = True
def terminate(self): def terminate(self, block: bool = True):
"""Terminate the client by shutting down workers. """Terminate the client by shutting down workers.
This method does the opposite of :meth:`~Client.initialize`. This method does the opposite of :meth:`~Client.initialize`.
It will stop the dispatcher and shut down updates and download workers. It will stop the dispatcher and shut down updates and download workers.
Parameters:
block (``bool``, *optional*):
Blocks the code execution until the client has been terminated. It is useful with ``block=False`` in
case you want to terminate the own client *within* an handler in order not to cause a deadlock.
Defaults to True.
Raises: Raises:
ConnectionError: In case you try to terminate a client that is already terminated. ConnectionError: In case you try to terminate a client that is already terminated.
""" """
@ -345,7 +351,7 @@ class Client(Methods, BaseClient):
log.warning("Takeout session {} finished".format(self.takeout_id)) log.warning("Takeout session {} finished".format(self.takeout_id))
Syncer.remove(self) Syncer.remove(self)
self.dispatcher.stop() self.dispatcher.stop(block)
for _ in range(self.DOWNLOAD_WORKERS): for _ in range(self.DOWNLOAD_WORKERS):
self.download_queue.put(None) self.download_queue.put(None)
@ -840,11 +846,17 @@ class Client(Methods, BaseClient):
self.initialize() self.initialize()
return self return self
def stop(self): def stop(self, block: bool = True):
"""Stop the Client. """Stop the Client.
This method disconnects the client from Telegram and stops the underlying tasks. This method disconnects the client from Telegram and stops the underlying tasks.
Parameters:
block (``bool``, *optional*):
Blocks the code execution until the client has been stopped. It is useful with ``block=False`` in case
you want to stop the own client *within* an handler in order not to cause a deadlock.
Defaults to True.
Returns: Returns:
:obj:`Client`: The stopped client itself. :obj:`Client`: The stopped client itself.
@ -864,17 +876,23 @@ class Client(Methods, BaseClient):
app.stop() app.stop()
""" """
self.terminate() self.terminate(block)
self.disconnect() self.disconnect()
return self return self
def restart(self): def restart(self, block: bool = True):
"""Restart the Client. """Restart the Client.
This method will first call :meth:`~Client.stop` and then :meth:`~Client.start` in a row in order to restart This method will first call :meth:`~Client.stop` and then :meth:`~Client.start` in a row in order to restart
a client using a single method. a client using a single method.
Parameters:
block (``bool``, *optional*):
Blocks the code execution until the client has been restarted. It is useful with ``block=False`` in case
you want to restart the own client *within* an handler in order not to cause a deadlock.
Defaults to True.
Returns: Returns:
:obj:`Client`: The restarted client itself. :obj:`Client`: The restarted client itself.
@ -898,7 +916,7 @@ class Client(Methods, BaseClient):
app.stop() app.stop()
""" """
self.stop() self.stop(block)
self.start() self.start()
return self return self
@ -985,7 +1003,7 @@ class Client(Methods, BaseClient):
Client.idle() Client.idle()
self.stop() self.stop()
def add_handler(self, handler: Handler, group: int = 0): def add_handler(self, handler: Handler, group: int = 0, block: bool = True):
"""Register an update handler. """Register an update handler.
You can register multiple handlers, but at most one handler within a group will be used for a single update. You can register multiple handlers, but at most one handler within a group will be used for a single update.
@ -1000,6 +1018,11 @@ class Client(Methods, BaseClient):
group (``int``, *optional*): group (``int``, *optional*):
The group identifier, defaults to 0. The group identifier, defaults to 0.
block (``bool``, *optional*):
Blocks the code execution until the handler has been added. It is useful with ``block=False`` in case
you want to register a new handler *within* another handler in order not to cause a deadlock.
Defaults to True.
Returns: Returns:
``tuple``: A tuple consisting of *(handler, group)*. ``tuple``: A tuple consisting of *(handler, group)*.
@ -1021,11 +1044,11 @@ class Client(Methods, BaseClient):
if isinstance(handler, DisconnectHandler): if isinstance(handler, DisconnectHandler):
self.disconnect_handler = handler.callback self.disconnect_handler = handler.callback
else: else:
self.dispatcher.add_handler(handler, group) self.dispatcher.add_handler(handler, group, block)
return handler, group return handler, group
def remove_handler(self, handler: Handler, group: int = 0): def remove_handler(self, handler: Handler, group: int = 0, block: bool = True):
"""Remove a previously-registered update handler. """Remove a previously-registered update handler.
Make sure to provide the right group where the handler was added in. You can use the return value of the Make sure to provide the right group where the handler was added in. You can use the return value of the
@ -1038,6 +1061,13 @@ class Client(Methods, BaseClient):
group (``int``, *optional*): group (``int``, *optional*):
The group identifier, defaults to 0. The group identifier, defaults to 0.
block (``bool``, *optional*):
Blocks the code execution until the handler has been removed. It is useful with ``block=False`` in case
you want to remove a previously registered handler *within* another handler in order not to cause a
deadlock.
Defaults to True.
Example: Example:
.. code-block:: python .. code-block:: python
:emphasize-lines: 11 :emphasize-lines: 11
@ -1059,7 +1089,7 @@ class Client(Methods, BaseClient):
if isinstance(handler, DisconnectHandler): if isinstance(handler, DisconnectHandler):
self.disconnect_handler = None self.disconnect_handler = None
else: else:
self.dispatcher.remove_handler(handler, group) self.dispatcher.remove_handler(handler, group, block)
def stop_transmission(self): def stop_transmission(self):
"""Stop downloading or uploading a file. """Stop downloading or uploading a file.

View File

@ -118,43 +118,61 @@ class Dispatcher:
self.workers_list[-1].start() self.workers_list[-1].start()
def stop(self): def stop(self, block: bool = True):
for _ in range(self.workers): def do_it():
self.updates_queue.put(None) for _ in range(self.workers):
self.updates_queue.put(None)
for worker in self.workers_list: for worker in self.workers_list:
worker.join() worker.join()
self.workers_list.clear() self.workers_list.clear()
self.locks_list.clear() self.locks_list.clear()
self.groups.clear() self.groups.clear()
def add_handler(self, handler, group: int): if block:
for lock in self.locks_list: do_it()
lock.acquire() else:
Thread(target=do_it).start()
try: def add_handler(self, handler, group: int, block: bool = True):
if group not in self.groups: def do_it():
self.groups[group] = []
self.groups = OrderedDict(sorted(self.groups.items()))
self.groups[group].append(handler)
finally:
for lock in self.locks_list: for lock in self.locks_list:
lock.release() lock.acquire()
def remove_handler(self, handler, group: int): try:
for lock in self.locks_list: if group not in self.groups:
lock.acquire() self.groups[group] = []
self.groups = OrderedDict(sorted(self.groups.items()))
try: self.groups[group].append(handler)
if group not in self.groups: finally:
raise ValueError("Group {} does not exist. Handler was not removed.".format(group)) for lock in self.locks_list:
lock.release()
self.groups[group].remove(handler) if block:
finally: do_it()
else:
Thread(target=do_it).start()
def remove_handler(self, handler, group: int, block: bool = True):
def do_it():
for lock in self.locks_list: for lock in self.locks_list:
lock.release() lock.acquire()
try:
if group not in self.groups:
raise ValueError("Group {} does not exist. Handler was not removed.".format(group))
self.groups[group].remove(handler)
finally:
for lock in self.locks_list:
lock.release()
if block:
do_it()
else:
Thread(target=do_it).start()
def update_worker(self, lock): def update_worker(self, lock):
name = threading.current_thread().name name = threading.current_thread().name