mirror of
https://github.com/PaiGramTeam/GramCore.git
synced 2024-11-24 15:19:21 +00:00
38 lines
1.0 KiB
Python
38 lines
1.0 KiB
Python
import contextlib
|
|
from typing import Optional
|
|
|
|
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
|
|
from typing_extensions import Self
|
|
|
|
from gram_core.base_service import BaseService
|
|
from gram_core.config import ApplicationConfig
|
|
|
|
__all__ = ("InfluxDatabase",)
|
|
|
|
|
|
class InfluxDatabase(BaseService.Dependence):
|
|
@classmethod
|
|
def from_config(cls, config: ApplicationConfig) -> Self:
|
|
return cls(**config.influxdb.dict())
|
|
|
|
def __init__(
|
|
self,
|
|
host: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
token: Optional[str] = None,
|
|
org: Optional[str] = None,
|
|
):
|
|
self.host = host
|
|
self.port = port
|
|
self.token = token
|
|
self.org = org
|
|
self.url = f"http://{host}:{port}"
|
|
self.db_client = InfluxDBClientAsync(url=self.url, token=self.token, org=self.org)
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def client(self) -> InfluxDBClientAsync:
|
|
yield self.db_client
|
|
|
|
async def shutdown(self):
|
|
await self.db_client.close()
|