PaiGram/modules/errorpush/pb.py

53 lines
1.5 KiB
Python
Raw Normal View History

from typing import Optional
import httpx
2022-10-20 14:21:26 +00:00
from utils.log import logger
2022-11-21 06:57:12 +00:00
__all__ = ["PbClient", "PbClientException"]
class PbClientException(Exception):
pass
class PbClient:
2022-11-21 06:57:12 +00:00
def __init__(self, pb_url: Optional[str] = None, pb_sunset: Optional[int] = None, pb_max_lines: int = 1000):
"""PbClient
:param pb_url:
:param pb_sunset: 自动销毁时间 单位为秒
:param pb_max_lines:
"""
self.client = httpx.AsyncClient()
2022-11-21 06:57:12 +00:00
self.PB_API = pb_url
self.sunset: int = pb_sunset
self.private: bool = True
2022-11-21 06:57:12 +00:00
self.max_lines: int = pb_max_lines
@property
def enabled(self) -> bool:
return bool(self.PB_API)
async def create_pb(self, content: str) -> Optional[str]:
2022-11-21 06:57:12 +00:00
try:
return await self._create_pb(content)
except Exception as exc:
raise PbClientException from exc
async def _create_pb(self, content: str) -> Optional[str]:
if not self.PB_API:
return None
2022-11-21 06:57:12 +00:00
content = "\n".join(content.splitlines()[-self.max_lines :]) + "\n"
data = {
"c": content,
}
if self.private:
data["p"] = "1"
if self.sunset:
data["sunset"] = self.sunset
result = await self.client.post(self.PB_API, data=data)
if result.is_error:
logger.warning("上传日记到 pb 失败 status_code[%s]", result.status_code)
return None
return result.headers.get("location")