From 8391cbd21bd1a8b7a61e28da516fbc830dcf68b0 Mon Sep 17 00:00:00 2001 From: BennyThink Date: Sun, 10 Jul 2022 22:21:54 +0800 Subject: [PATCH] enhance queue algorithm --- ytdlbot/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ytdlbot/tasks.py b/ytdlbot/tasks.py index b5f1e55..aebeaf8 100644 --- a/ytdlbot/tasks.py +++ b/ytdlbot/tasks.py @@ -8,6 +8,7 @@ __author__ = "Benny " import logging +import math import os import pathlib import random @@ -412,10 +413,11 @@ def async_task(task_name, *args): inspect = app.control.inspect() worker_stats = inspect.stats() route_queues = [] + padding = math.ceil(sum([i['pool']['max-concurrency'] for i in worker_stats.values()]) / len(worker_stats)) for worker_name, stats in worker_stats.items(): route = worker_name.split('@')[1] concurrency = stats['pool']['max-concurrency'] - route_queues.extend([route] * concurrency) + route_queues.extend([route] * (concurrency + padding)) destination = random.choice(route_queues) logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0) task_name.apply_async(args=args, queue=destination)