Add Influxdb as core dep

This commit is contained in:
xtaodada 2024-06-16 20:48:35 +08:00
parent c47d04c350
commit 22d31659f5
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
2 changed files with 48 additions and 0 deletions

View File

@ -33,6 +33,16 @@ class DatabaseConfig(Settings):
env_prefix = "db_" env_prefix = "db_"
class InfluxDBConfig(Settings):
host: Optional[str] = None
port: Optional[int] = None
token: Optional[str] = None
org: Optional[str] = None
class Config(Settings.Config):
env_prefix = "influxdb_"
class RedisConfig(Settings): class RedisConfig(Settings):
host: str = "127.0.0.1" host: str = "127.0.0.1"
port: int = 6379 port: int = 6379
@ -161,6 +171,7 @@ class ApplicationConfig(Settings):
reload: ReloadConfig = ReloadConfig() reload: ReloadConfig = ReloadConfig()
database: DatabaseConfig = DatabaseConfig() database: DatabaseConfig = DatabaseConfig()
influxdb: InfluxDBConfig = InfluxDBConfig()
logger: LoggerConfig = LoggerConfig() logger: LoggerConfig = LoggerConfig()
webserver: WebServerConfig = WebServerConfig() webserver: WebServerConfig = WebServerConfig()
redis: RedisConfig = RedisConfig() redis: RedisConfig = RedisConfig()

37
dependence/influxdb.py Normal file
View File

@ -0,0 +1,37 @@
import contextlib
from typing import Optional
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from typing_extensions import Self
from gram_core.base_service import BaseService
from gram_core.config import ApplicationConfig
__all__ = ("InfluxDatabase",)
class InfluxDatabase(BaseService.Dependence):
@classmethod
def from_config(cls, config: ApplicationConfig) -> Self:
return cls(**config.influxdb.dict())
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
token: Optional[str] = None,
org: Optional[str] = None,
):
self.host = host
self.port = port
self.token = token
self.org = org
self.url = f"http://{host}:{port}"
self.db_client = InfluxDBClientAsync(url=self.url, token=self.token, org=self.org)
@contextlib.asynccontextmanager
async def client(self) -> InfluxDBClientAsync:
yield self.db_client
async def shutdown(self):
await self.db_client.close()