rewrite status plugin with template support

This commit is contained in:
Ju4tCode 2022-05-23 06:01:19 +00:00 committed by GitHub
parent 3ef1f0a8c3
commit e94f1fa601
29 changed files with 598 additions and 841 deletions

13
.env
View File

@ -20,10 +20,15 @@ REDIS_PASSWORD=
REDIS_USERNAME=
# nonebot-plugin-status
SERVER_STATUS_CPU=true
SERVER_STATUS_PER_CPU=false
SERVER_STATUS_MEMORY=true
SERVER_STATUS_DISK=true
SERVER_STATUS_TEMPLATE="
CPU: {{ '%02d' % cpu_usage }}%
Memory: {{ '%02d' % memory_usage }}%
Disk:
{%- for name, usage in disk_usage.items() %}
{{ name }}: {{ '%02d' % usage.percent }}%
{%- endfor %}
Uptime: {{ uptime }}
"
# nonebot-plugin-sentry
# leave sentry_dsn empty to disable sentry bug trace

1038
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,10 +8,10 @@ license = "MIT"
[tool.poetry.dependencies]
python = "^3.8"
psutil = "^5.7.2"
httpx = "^0.21.0"
httpx = "^0.22.0"
Jinja2 = "^3.0.0"
unidiff = "^0.7.0"
humanize = "^3.5.0"
humanize = "^4.0.0"
pydantic = "^1.9.0"
Markdown = "^3.3.4"
sentry-sdk = "^1.0.0"
@ -23,8 +23,7 @@ redis = { version = "^4.0.0", extras = ["hiredis"] }
[tool.poetry.dev-dependencies]
isort = "^5.9.3"
black = "^21.8b0"
nb-cli = "^0.6.0"
black = "^22.1.0"
nonebug = "^0.2.0"
# [[tool.poetry.source]]
@ -33,13 +32,13 @@ nonebug = "^0.2.0"
# default = true
[tool.black]
line-length = 80
line-length = 88
extend-exclude = '''
'''
[tool.isort]
profile = "black"
line_length = 80
line_length = 88
length_sort = true
skip_gitignore = true
force_sort_within_sections = true

View File

@ -53,20 +53,14 @@ class Github:
return await self._requester.close()
@overload
async def get_repo(
self, full_name: str, lazy: Literal[True]
) -> LazyRepository:
async def get_repo(self, full_name: str, lazy: Literal[True]) -> LazyRepository:
...
@overload
async def get_repo(
self, full_name: str, lazy: Literal[False]
) -> Repository:
async def get_repo(self, full_name: str, lazy: Literal[False]) -> Repository:
...
async def get_repo(
self, full_name: str, lazy: bool = False
) -> LazyRepository:
async def get_repo(self, full_name: str, lazy: bool = False) -> LazyRepository:
"""
GET /repos/:owner/:repo
@ -74,17 +68,11 @@ class Github:
"""
url = f"/repos/{full_name}"
if lazy:
return LazyRepository(
full_name=full_name, requester=self._requester
)
return LazyRepository(full_name=full_name, requester=self._requester)
response = await self._requester.request_json("GET", url)
return Repository.parse_obj(
{"requester": self._requester, **response.json()}
)
return Repository.parse_obj({"requester": self._requester, **response.json()})
async def render_markdown(
self, text: str, context: Optional[Repository] = None
):
async def render_markdown(self, text: str, context: Optional[Repository] = None):
"""
POST /markdown
@ -95,9 +83,7 @@ class Github:
# if context:
# data["mode"] = "gfm"
# data["context"] = context._identity
response = await self._requester.request_json(
"POST", "/markdown", json=data
)
response = await self._requester.request_json("POST", "/markdown", json=data)
return response.text
# def get_repos(

View File

@ -58,12 +58,7 @@ C = TypeVar("C", bound=BaseModel)
class PaginatedList(AsyncIterator, Generic[C]):
def __init__(
self,
cls: Type[C],
requester: Requester,
*args,
per_page: int = 30,
**kwargs
self, cls: Type[C], requester: Requester, *args, per_page: int = 30, **kwargs
):
self.cls = cls
self.requester = requester
@ -85,11 +80,7 @@ class PaginatedList(AsyncIterator, Generic[C]):
def __aiter__(self) -> "PaginatedList[C]":
return PaginatedList(
self.cls,
self.requester,
*self.args,
per_page=self.per_page,
**self.kwargs
self.cls, self.requester, *self.args, per_page=self.per_page, **self.kwargs
)
async def _get_next_page(self) -> List[C]:

View File

@ -159,9 +159,7 @@ class Issue(BaseModel):
response = await self.requester.request(
"GET", self.pull_request.url, headers=headers
)
return PullRequest.parse_obj(
{"requester": self.requester, **response.json()}
)
return PullRequest.parse_obj({"requester": self.requester, **response.json()})
async def get_diff(self) -> str:
"""
@ -170,7 +168,5 @@ class Issue(BaseModel):
if not self.pull_request:
raise RuntimeError(f"Issue {self.number} is not a pull request")
response = await self.requester.request(
"GET", self.pull_request.diff_url
)
response = await self.requester.request("GET", self.pull_request.diff_url)
return response.text

View File

@ -30,9 +30,7 @@ class LazyRepository(BaseModel):
https://docs.github.com/en/rest/reference/issues#get-an-issue
"""
headers = {
"Accept": "application/vnd.github.mockingbird-preview.full+json"
}
headers = {"Accept": "application/vnd.github.mockingbird-preview.full+json"}
response = await self.requester.request(
"GET", f"/repos/{self.full_name}/issues/{number}", headers=headers
)
@ -109,9 +107,7 @@ class LazyRepository(BaseModel):
"""
data = {}
data["config"] = config.dict(exclude_unset=True)
data["config"]["insecure_ssl"] = (
"1" if data["config"]["insecure_ssl"] else "0"
)
data["config"]["insecure_ssl"] = "1" if data["config"]["insecure_ssl"] else "0"
if events is not None:
data["events"] = events
if active is not None:

View File

@ -216,6 +216,4 @@ async def config(
:param meta_tag_prefix: the prefix for ``pdfkit`` specific meta tags
"""
return await Config(
wkhtmltoimage=wkhtmltoimage, meta_tag_prefix=meta_tag_prefix
)
return await Config(wkhtmltoimage=wkhtmltoimage, meta_tag_prefix=meta_tag_prefix)

View File

@ -86,9 +86,7 @@ class Config(object):
@property
def xvfb(self) -> Union[str, bytes]:
if not self._xvfb:
raise RuntimeError(
f"xvfb not installed or Config {self} is never awaited!"
)
raise RuntimeError(f"xvfb not installed or Config {self} is never awaited!")
return self._xvfb
@xvfb.setter

View File

@ -96,9 +96,7 @@ class IMGKit(object):
def xvfb(self):
return self.config.xvfb
def _gegetate_args(
self, options: OPTION_TYPE
) -> Generator[str, None, None]:
def _gegetate_args(self, options: OPTION_TYPE) -> Generator[str, None, None]:
"""
Generator of args parts based on options specification.
"""
@ -157,9 +155,7 @@ class IMGKit(object):
if "</head>" in source:
self.source = StringSource(
inp.replace(
"</head>", self._style_tag(css_data) + "</head>"
)
inp.replace("</head>", self._style_tag(css_data) + "</head>")
)
else:
self.source = StringSource(self._style_tag(css_data) + inp)

View File

@ -21,9 +21,7 @@ from src.libs import html2img
async def from_string(
string: str,
extensions: Optional[
Sequence[Union[str, markdown.extensions.Extension]]
] = None,
extensions: Optional[Sequence[Union[str, markdown.extensions.Extension]]] = None,
extension_configs: Optional[Mapping[str, Mapping[str, Any]]] = None,
output_format: Optional[Literal["xhtml", "html"]] = None,
tab_length: Optional[int] = None,
@ -41,9 +39,7 @@ async def from_string(
async def from_file(
input: Optional[Union[str, BinaryIO]] = None,
encoding: Optional[str] = None,
extensions: Optional[
Sequence[Union[str, markdown.extensions.Extension]]
] = None,
extensions: Optional[Sequence[Union[str, markdown.extensions.Extension]]] = None,
extension_configs: Optional[Mapping[str, Mapping[str, Any]]] = None,
output_format: Optional[Literal["xhtml", "html"]] = None,
tab_length: Optional[int] = None,

View File

@ -26,8 +26,6 @@ _sub_plugins = set()
# load all github plugin config from global config
github_config = Config(**nonebot.get_driver().config.dict())
_sub_plugins |= nonebot.load_plugins(
str((Path(__file__).parent / "plugins").resolve())
)
_sub_plugins |= nonebot.load_plugins(str((Path(__file__).parent / "plugins").resolve()))
from . import apis

View File

@ -51,9 +51,7 @@ async def create_hook(
) -> Hook:
async with Github(token) as g:
repo = await g.get_repo(repo, True) if isinstance(repo, str) else repo
config = (
HookConfig.parse_obj(config) if isinstance(config, dict) else config
)
config = HookConfig.parse_obj(config) if isinstance(config, dict) else config
return await repo.create_hook(config, events, active)

View File

@ -48,9 +48,7 @@ async def _gen_image(
html: str, width: int, height: int, wkhtmltoimage: bool = False
) -> Optional[bytes]:
if not wkhtmltoimage:
async with get_new_page(
viewport={"width": width, "height": height}
) as page:
async with get_new_page(viewport={"width": width, "height": height}) as page:
await page.set_content(html)
img = await page.screenshot(full_page=True)
return img

View File

@ -12,18 +12,8 @@ __author__ = "yanyongyu"
from ... import redis
from .state import get_state_bind_user, set_state_bind_user
from .hook import (
get_repo_hook,
set_repo_hook,
delete_repo_hook,
exists_repo_hook,
)
from .token import (
get_user_token,
set_user_token,
delete_user_token,
exists_user_token,
)
from .hook import get_repo_hook, set_repo_hook, delete_repo_hook, exists_repo_hook
from .token import get_user_token, set_user_token, delete_user_token, exists_user_token
from .bind import (
get_group_bind_repo,
set_group_bind_repo,

View File

@ -35,34 +35,26 @@ class SubscribeConfig:
def set_subscribe(group_id: str, repo_name: str, **kwargs) -> Optional[bool]:
return redis.set(
SUBSCRIBE_GROUP_REPO_FORMAT.format(
group_id=group_id, repo_name=repo_name
),
SUBSCRIBE_GROUP_REPO_FORMAT.format(group_id=group_id, repo_name=repo_name),
json.dumps(asdict(SubscribeConfig(**kwargs))),
)
def delete_subscribe(group_id: str, repo_name: str) -> int:
return redis.delete(
SUBSCRIBE_GROUP_REPO_FORMAT.format(
group_id=group_id, repo_name=repo_name
)
SUBSCRIBE_GROUP_REPO_FORMAT.format(group_id=group_id, repo_name=repo_name)
)
def exists_subscribe(group_id: str, repo_name: str) -> int:
return redis.exists(
SUBSCRIBE_GROUP_REPO_FORMAT.format(
group_id=group_id, repo_name=repo_name
)
SUBSCRIBE_GROUP_REPO_FORMAT.format(group_id=group_id, repo_name=repo_name)
)
def get_subscribe(group_id: str, repo_name: str) -> Optional[SubscribeConfig]:
value = redis.get(
SUBSCRIBE_GROUP_REPO_FORMAT.format(
group_id=group_id, repo_name=repo_name
)
SUBSCRIBE_GROUP_REPO_FORMAT.format(group_id=group_id, repo_name=repo_name)
)
return value if value is None else SubscribeConfig(**json.loads(value))

View File

@ -42,9 +42,7 @@ try:
except ImportError:
get_user_token = None
REPO_REGEX: str = (
r"^(?P<owner>[a-zA-Z0-9][a-zA-Z0-9\-]*)/(?P<repo>[a-zA-Z0-9_\-\.]+)$"
)
REPO_REGEX: str = r"^(?P<owner>[a-zA-Z0-9][a-zA-Z0-9\-]*)/(?P<repo>[a-zA-Z0-9_\-\.]+)$"
bind = on_command(
"bind",
@ -83,9 +81,7 @@ async def check_exists(event: GroupMessageEvent, matcher: Matcher):
prompt="绑定仓库的全名?(e.g. owner/repo)",
parameterless=[Depends(allow_cancel)],
)
async def process_repo(
event: GroupMessageEvent, full_name: str = ArgPlainText()
):
async def process_repo(event: GroupMessageEvent, full_name: str = ArgPlainText()):
matched = re.match(REPO_REGEX, full_name)
if not matched:
await bind.reject(f"仓库名 {full_name} 不合法!请重新发送或取消")

View File

@ -96,9 +96,7 @@ async def handle(bot: Bot, event: MessageEvent, state: T_State):
owner,
repo,
number,
MessageSegment.image(
f"base64://{base64.b64encode(img).decode()}"
),
MessageSegment.image(f"base64://{base64.b64encode(img).decode()}"),
)
@ -147,7 +145,5 @@ async def handle_short(bot: Bot, event: GroupMessageEvent, state: T_State):
owner,
repo,
number,
MessageSegment.image(
f"base64://{base64.b64encode(img).decode()}"
),
MessageSegment.image(f"base64://{base64.b64encode(img).decode()}"),
)

View File

@ -57,9 +57,7 @@ async def handle_content(bot: Bot, event: MessageEvent, state: T_State):
)
try:
img = await issue_to_image(
message_info.owner, message_info.repo, issue_
)
img = await issue_to_image(message_info.owner, message_info.repo, issue_)
except TimeoutException:
await content.finish(f"获取issue数据超时请尝试重试")
except Error:
@ -71,7 +69,5 @@ async def handle_content(bot: Bot, event: MessageEvent, state: T_State):
message_info.owner,
message_info.repo,
message_info.number,
MessageSegment.image(
f"base64://{base64.b64encode(img).decode()}"
),
MessageSegment.image(f"base64://{base64.b64encode(img).decode()}"),
)

View File

@ -29,9 +29,7 @@ try:
except ImportError:
get_user_token = None
diff = on_command(
"diff", is_github_reply, priority=config.github_command_priority
)
diff = on_command("diff", is_github_reply, priority=config.github_command_priority)
diff.__doc__ = """
/diff
回复机器人一条github pr信息给出pr diff
@ -57,9 +55,7 @@ async def handle_diff(bot: Bot, event: MessageEvent, state: T_State):
)
try:
img = await issue_diff_to_image(
message_info.owner, message_info.repo, issue_
)
img = await issue_diff_to_image(message_info.owner, message_info.repo, issue_)
except TimeoutException:
await diff.finish(f"获取diff数据超时请尝试重试")
except Error:
@ -71,7 +67,5 @@ async def handle_diff(bot: Bot, event: MessageEvent, state: T_State):
message_info.owner,
message_info.repo,
message_info.number,
MessageSegment.image(
f"base64://{base64.b64encode(img).decode()}"
),
MessageSegment.image(f"base64://{base64.b64encode(img).decode()}"),
)

View File

@ -18,9 +18,7 @@ from ...libs.redis import MessageInfo
from ...utils import send_github_message
from . import KEY_GITHUB_REPLY, config, is_github_reply
link = on_command(
"link", is_github_reply, priority=config.github_command_priority
)
link = on_command("link", is_github_reply, priority=config.github_command_priority)
link.__doc__ = """
/link
回复机器人一条github信息给出对应链接

View File

@ -97,9 +97,7 @@ else:
elif e.response.status_code == 404:
await subscribe.reject(f"仓库名 {owner}/{repo_name} 不存在!请重新发送或取消")
return
logger.opt(colors=True, exception=e).error(
f"github_subscribe: create_hook"
)
logger.opt(colors=True, exception=e).error(f"github_subscribe: create_hook")
await subscribe.finish("订阅仓库时发生错误,请联系开发者或重试")
return

View File

@ -2,7 +2,7 @@
* @Author : yanyongyu
* @Date : 2020-11-15 14:40:25
* @LastEditors : yanyongyu
* @LastEditTime : 2022-01-13 16:32:41
* @LastEditTime : 2022-05-23 05:58:29
* @Description : None
* @GitHub : https://github.com/yanyongyu
-->
@ -58,26 +58,26 @@ OneBot:
> SUPERUSERS=["your qq id"]
> ```
### server_status_cpu
### server_status_template
- 类型: `bool`
- 默认: `True`
- 说明: 是否显示 CPU 占用百分比
- 类型: `str`
- 默认: 请参考示例
- 说明:发送的消息模板,支持的变量以及类型如下:
- cpu_usage (`float`): CPU 使用率
- memory_usage (`float`): 内存使用率
- disk_usage (`Dict[str, psutil._common.sdiskusage]`): 磁盘使用率,包含 total, used, free, percent 属性
- uptime (`timedelta`): 服务器运行时间
### server_status_per_cpu
配置文件示例(默认模板)
- 类型: `bool`
- 默认: `False`
- 说明: 是否显示每个 CPU 核心占用百分比
### server_status_memory
- 类型: `bool`
- 默认: `True`
- 说明: 是否显示 Memory 占用百分比
### server_status_disk
- 类型: `bool`
- 默认: `True`
- 说明: 是否显示磁盘占用百分比
```dotenv
SERVER_STATUS_TEMPLATE="
CPU: {{ '%02d' % cpu_usage }}%
Memory: {{ '%02d' % memory_usage }}%
Disk:
{%- for name, usage in disk_usage.items() %}
{{ name }}: {{ '%02d' % usage.percent }}%
{%- endfor %}
Uptime: {{ uptime }}
"
```

View File

@ -4,22 +4,26 @@
@Author : yanyongyu
@Date : 2020-09-18 00:00:13
@LastEditors : yanyongyu
@LastEditTime : 2022-01-13 21:01:33
@LastEditTime : 2022-05-23 05:41:46
@Description : None
@GitHub : https://github.com/yanyongyu
"""
__author__ = "yanyongyu"
from jinja2 import Environment
from nonebot.matcher import Matcher
from nonebot.permission import SUPERUSER
from nonebot import on_notice, get_driver, on_command, on_message
from .config import Config
from .data_source import cpu_status, disk_usage, memory_status, per_cpu_status
from .data_source import uptime, cpu_status, disk_usage, memory_status, per_cpu_status
global_config = get_driver().config
status_config = Config(**global_config.dict())
_ev = Environment(autoescape=False, enable_async=True)
_t = _ev.from_string(status_config.server_status_template)
command = on_command(
"状态",
permission=(status_config.server_status_only_superusers or None) and SUPERUSER,
@ -29,25 +33,14 @@ command = on_command(
@command.handle()
async def server_status(matcher: Matcher):
data = []
if status_config.server_status_cpu:
if status_config.server_status_per_cpu:
data.append("CPU:")
for index, per_cpu in enumerate(per_cpu_status()):
data.append(f" core{index + 1}: {int(per_cpu):02d}%")
else:
data.append(f"CPU: {int(cpu_status()):02d}%")
if status_config.server_status_memory:
data.append(f"Memory: {int(memory_status()):02d}%")
if status_config.server_status_disk:
data.append("Disk:")
for k, v in disk_usage().items():
data.append(f" {k}: {int(v.percent):02d}%")
await matcher.send(message="\n".join(data))
message = await _t.render_async(
cpu_usage=cpu_status(),
per_cpu_usage=per_cpu_status(),
memory_usage=memory_status(),
disk_usage=disk_usage(),
uptime=uptime(),
)
await matcher.send(message=message.strip("\n"))
try:

View File

@ -4,21 +4,73 @@
@Author : yanyongyu
@Date : 2020-10-04 16:32:00
@LastEditors : yanyongyu
@LastEditTime : 2021-09-10 12:42:16
@LastEditTime : 2022-05-23 05:46:55
@Description : None
@GitHub : https://github.com/yanyongyu
"""
__author__ = "yanyongyu"
from pydantic import BaseSettings
import warnings
from typing import Any, Dict
from pydantic import BaseSettings, root_validator
CPU_TEMPLATE = "CPU: {{ '%02d' % cpu_usage }}%"
PER_CPU_TEMPLATE = (
"CPU:\n"
"{%- for core in per_cpu_usage %}\n"
" core{{ loop.index }}: {{ '%02d' % core }}%\n"
"{%- endfor %}"
)
MEMORY_TEMPLATE = "Memory: {{ '%02d' % memory_usage }}%"
DISK_TEMPLATE = (
"Disk:\n"
"{%- for name, usage in disk_usage.items() %}\n"
" {{ name }}: {{ '%02d' % usage.percent }}%\n"
"{%- endfor %}"
)
UPTIME_TEMPLATE = "Uptime: {{ uptime }}"
class Config(BaseSettings):
server_status_only_superusers: bool = True
# Deprecated: legacy settings
server_status_cpu: bool = True
server_status_per_cpu: bool = False
server_status_memory: bool = True
server_status_disk: bool = True
# template
server_status_template: str = "\n".join(
(CPU_TEMPLATE, MEMORY_TEMPLATE, DISK_TEMPLATE, UPTIME_TEMPLATE)
)
class Config:
extra = "ignore"
@root_validator(pre=True)
def transform_legacy_settings(cls, value: Dict[str, Any]) -> Dict[str, Any]:
if "server_status_template" not in value and (
"server_status_cpu" in value
or "server_status_per_cpu" in value
or "server_status_memory" in value
or "server_status_disk" in value
):
warnings.warn(
"Settings for status plugin is deprecated, "
"please use `server_status_template` instead.",
DeprecationWarning,
)
templates = []
if value.get("server_status_cpu"):
templates.append(CPU_TEMPLATE)
if value.get("server_status_per_cpu"):
templates.append(PER_CPU_TEMPLATE)
if value.get("server_status_memory"):
templates.append(MEMORY_TEMPLATE)
if value.get("server_status_disk"):
templates.append(DISK_TEMPLATE)
value.setdefault("server_status_template", "\n".join(templates))
return value

View File

@ -4,13 +4,15 @@
@Author : yanyongyu
@Date : 2020-09-18 00:15:21
@LastEditors : yanyongyu
@LastEditTime : 2021-03-16 16:59:22
@LastEditTime : 2022-05-23 04:38:37
@Description : None
@GitHub : https://github.com/yanyongyu
"""
__author__ = "yanyongyu"
import time
from typing import Dict, List
from datetime import timedelta
import psutil
@ -29,10 +31,12 @@ def memory_status() -> float:
def disk_usage() -> Dict[str, psutil._common.sdiskusage]:
disk_parts = psutil.disk_partitions()
disk_usages = {
d.mountpoint: psutil.disk_usage(d.mountpoint) for d in disk_parts
}
return disk_usages
return {d.mountpoint: psutil.disk_usage(d.mountpoint) for d in disk_parts}
def uptime() -> timedelta:
diff = time.time() - psutil.boot_time()
return timedelta(seconds=diff)
if __name__ == "__main__":

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "nonebot-plugin-status"
version = "0.3.0"
version = "0.4.0"
description = "Check your server status (CPU, Memory, Disk Usage) via nonebot"
license = "MIT"
authors = ["yanyongyu <yanyongyu_1@126.com>"]
@ -16,6 +16,7 @@ packages = [
[tool.poetry.dependencies]
python = "^3.7.3"
psutil = "^5.7.2"
Jinja2 = "^3.0.0"
nonebot2 = "^2.0.0-beta.1"
nonebot-adapter-onebot = { version = "^2.0.0-beta.1", optional = true }

View File

@ -52,9 +52,7 @@ def get_cache(sign: str) -> Any:
def save_cache(sign: str, cache: Any, ex: Optional[timedelta] = None) -> None:
redis_client.set(
CACHE_KEY_FORMAT.format(signature=sign), pickle.dumps(cache), ex
)
redis_client.set(CACHE_KEY_FORMAT.format(signature=sign), pickle.dumps(cache), ex)
# Export something for other plugin

View File

@ -1,4 +1,5 @@
from typing import Dict, List
from datetime import timedelta
import psutil
import pytest
@ -15,11 +16,7 @@ async def test_status(app: App, monkeypatch: pytest.MonkeyPatch):
PrivateMessageEvent,
)
from src.plugins.nonebot_plugin_status import (
command,
group_poke,
status_config,
)
from src.plugins.nonebot_plugin_status import command, group_poke
driver = nonebot.get_driver()
@ -36,18 +33,18 @@ async def test_status(app: App, monkeypatch: pytest.MonkeyPatch):
def _per_cpu_status() -> List[float]:
return [10.0, 11.0]
monkeypatch.setattr(
"src.plugins.nonebot_plugin_status.cpu_status", _cpu_status
)
monkeypatch.setattr(
"src.plugins.nonebot_plugin_status.disk_usage", _disk_usage
)
def _uptime() -> timedelta:
return timedelta(days=1, seconds=1)
monkeypatch.setattr("src.plugins.nonebot_plugin_status.cpu_status", _cpu_status)
monkeypatch.setattr("src.plugins.nonebot_plugin_status.disk_usage", _disk_usage)
monkeypatch.setattr(
"src.plugins.nonebot_plugin_status.memory_status", _memory_status
)
monkeypatch.setattr(
"src.plugins.nonebot_plugin_status.per_cpu_status", _per_cpu_status
)
monkeypatch.setattr("src.plugins.nonebot_plugin_status.uptime", _uptime)
with monkeypatch.context() as m:
m.setattr(driver.config, "superusers", {"123"})
@ -78,6 +75,7 @@ async def test_status(app: App, monkeypatch: pytest.MonkeyPatch):
"Memory: 10%",
"Disk:",
" test: 00%",
"Uptime: 1 day, 0:00:01",
]
),
True,
@ -105,41 +103,7 @@ async def test_status(app: App, monkeypatch: pytest.MonkeyPatch):
"Memory: 10%",
"Disk:",
" test: 00%",
]
),
True,
)
m.setattr(status_config, "server_status_per_cpu", True)
async with app.test_matcher(command) as ctx:
adapter = ctx.create_adapter(base=Adapter)
bot = ctx.create_bot(base=Bot, adapter=adapter)
event = PrivateMessageEvent(
time=0,
self_id=0,
post_type="message",
sub_type="friend",
user_id=123,
message_type="private",
message_id=0,
message=Message("/状态"),
raw_message="/状态",
font=0,
sender=Sender(),
)
ctx.receive_event(bot, event)
ctx.should_call_send(
event,
"\n".join(
[
"CPU:",
" core1: 10%",
" core2: 11%",
"Memory: 10%",
"Disk:",
" test: 00%",
"Uptime: 1 day, 0:00:01",
]
),
True,