ping from dashboard

This commit is contained in:
BennyThink 2022-02-10 14:38:23 +08:00
parent ad1ba27a6f
commit d7bc2a2711
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481
4 changed files with 30 additions and 36 deletions

View File

@ -11,9 +11,10 @@ import time
from config import (AFD_LINK, COFFEE_LINK, ENABLE_CELERY, ENABLE_VIP, EX,
MULTIPLY, REQUIRED_MEMBERSHIP, USD2CNY)
from db import InfluxDB
from downloader import sizeof_fmt
from limit import QUOTA, VIP
from utils import get_func_queue, get_queue_stat
from utils import get_func_queue
class BotText:
@ -115,11 +116,6 @@ Sending format: **{1}**
else:
return ""
@staticmethod
def queue_stats():
_, _, _, stats = get_queue_stat()
return stats
@staticmethod
def get_receive_link_text():
reserved = get_func_queue("reserved")
@ -129,3 +125,17 @@ Sending format: **{1}**
text = "Your task was added to active queue.\nProcessing...\n\n"
return text
@staticmethod
def ping_worker():
workers = InfluxDB().extract_dashboard_data()
text = ""
for worker in workers:
fields = worker["fields"]
hostname = worker["tags"]["hostname"]
status = {True: ""}.get(fields["status"], "")
active = fields["active"]
load = "{},{},{}".format(fields["load1"], fields["load5"], fields["load15"])
text += f"{status}{hostname} **{active}** {load}\n"
return text

View File

@ -222,6 +222,7 @@ class MySQL:
class InfluxDB:
def __init__(self):
self.client = InfluxDBClient(host=os.getenv("INFLUX_HOST", "192.168.7.233"), database="celery")
self.data = self.get_worker_data()
def __del__(self):
self.client.close()
@ -234,9 +235,9 @@ class InfluxDB:
headers = {"Authorization": f"Basic {token}"}
return requests.get("https://celery.dmesg.app/dashboard?json=1", headers=headers).json()
def __fill_worker_data(self, data):
def extract_dashboard_data(self):
json_body = []
for worker in data["data"]:
for worker in self.data["data"]:
load1, load5, load15 = worker["loadavg"]
t = {
"measurement": "tasks",
@ -251,16 +252,21 @@ class InfluxDB:
"task-succeeded": worker.get("task-succeeded", 0),
"task-failed": worker.get("task-failed", 0),
"active": worker.get("active", 0),
"status": worker.get("status", False),
"load1": load1,
"load5": load5,
"load15": load15,
}
}
json_body.append(t)
return json_body
def __fill_worker_data(self):
json_body = self.extract_dashboard_data()
self.client.write_points(json_body)
def __fill_overall_data(self, data):
active = sum([i["active"] for i in data["data"]])
def __fill_overall_data(self):
active = sum([i["active"] for i in self.data["data"]])
json_body = [
{
"measurement": "active",
@ -291,8 +297,7 @@ class InfluxDB:
def collect_data(self):
with contextlib.suppress(Exception):
data = self.get_worker_data()
self.__fill_worker_data(data)
self.__fill_overall_data(data)
self.__fill_worker_data()
self.__fill_overall_data()
self.__fill_redis_metrics()
logging.debug("InfluxDB data was collected.")

View File

@ -129,27 +129,6 @@ def get_func_queue(func) -> int:
return 0
def get_queue_stat() -> (int, int, int, str):
concurrency = 0
if ENABLE_CELERY is False:
return 0, 0, 0, ""
stats = inspect.stats()
if stats is None:
err = "No worker is running."
logging.error(err)
return 0, 0, 0, err
for _, stat in stats.items():
concurrency += stat["pool"]["max-concurrency"]
active = get_func_queue("active")
reserved = get_func_queue("reserved")
ping = inspect.ping()
stats = f"concurrency {concurrency}, active {active}, reserved {reserved}.\n\n{ping}"
return concurrency, active, reserved, stats
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""

View File

@ -151,8 +151,8 @@ def ping_handler(client: "Client", message: "types.Message"):
else:
bot_info = get_runtime("ytdlbot_ytdl_1", "YouTube-dl")
if message.chat.username == OWNER:
stats = bot_text.queue_stats()
client.send_document(chat_id, Redis().generate_file(), caption=f"{bot_info}\n{stats}")
stats = bot_text.ping_worker()
client.send_document(chat_id, Redis().generate_file(), caption=f"{bot_info}\n\n{stats}")
else:
client.send_message(chat_id, f"{bot_info}")