enhance queue algorithm

This commit is contained in:
BennyThink 2022-07-10 22:21:54 +08:00
parent 11a7175672
commit 8391cbd21b
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481

View File

@ -8,6 +8,7 @@
__author__ = "Benny <benny.think@gmail.com>"
import logging
import math
import os
import pathlib
import random
@ -412,10 +413,11 @@ def async_task(task_name, *args):
inspect = app.control.inspect()
worker_stats = inspect.stats()
route_queues = []
padding = math.ceil(sum([i['pool']['max-concurrency'] for i in worker_stats.values()]) / len(worker_stats))
for worker_name, stats in worker_stats.items():
route = worker_name.split('@')[1]
concurrency = stats['pool']['max-concurrency']
route_queues.extend([route] * concurrency)
route_queues.extend([route] * (concurrency + padding))
destination = random.choice(route_queues)
logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0)
task_name.apply_async(args=args, queue=destination)