Clean up dispatcher and fix workers not being stopped correctly

This commit is contained in:
Dan 2018-06-18 13:06:07 +02:00
parent 1bc599e26c
commit 9a5ce0fe2d

View File

@ -45,16 +45,28 @@ class Dispatcher:
self.client = client
self.workers = workers
self.update_worker_task = None
self.update_worker_tasks = []
self.updates = asyncio.Queue()
self.groups = OrderedDict()
async def start(self):
self.update_worker_task = asyncio.ensure_future(self.update_worker())
for i in range(self.workers):
self.update_worker_tasks.append(
asyncio.ensure_future(self.update_worker())
)
log.info("Started {} UpdateWorkerTasks".format(self.workers))
async def stop(self):
for i in range(self.workers):
self.updates.put_nowait(None)
await self.update_worker_task
for i in self.update_worker_tasks:
await i
self.update_worker_tasks.clear()
log.info("Stopped {} UpdateWorkerTasks".format(self.workers))
def add_handler(self, handler, group: int):
if group not in self.groups:
@ -106,8 +118,6 @@ class Dispatcher:
await asyncio.gather(*tasks)
async def update_worker(self):
log.info("UpdateWorkerTask started")
while True:
tasks = []
update = await self.updates.get()
@ -173,5 +183,3 @@ class Dispatcher:
await asyncio.gather(*tasks)
except Exception as e:
log.error(e, exc_info=True)
log.info("UpdateWorkerTask stopped")