PamGram/utils/mysql.py
2022-07-26 18:07:31 +08:00

72 lines
2.4 KiB
Python

import asyncio
import aiomysql
from logger import Log
class MySQL:
def __init__(self, host: str = "127.0.0.1", port: int = 3306, user: str = "root",
password: str = "", database: str = "", loop=None):
self.database = database
self.password = password
self.user = user
self.port = port
self.host = host
self._loop = loop
self._sql_pool = None
Log.debug(f'获取数据库配置 [host]: {self.host}')
Log.debug(f'获取数据库配置 [port]: {self.port}')
Log.debug(f'获取数据库配置 [user]: {self.user}')
Log.debug(f'获取数据库配置 [password][len]: {len(self.password)}')
Log.debug(f'获取数据库配置 [db]: {self.database}')
if self._loop is None:
self._loop = asyncio.get_event_loop()
try:
Log.info("正在创建数据库LOOP")
self._loop.run_until_complete(self.create_pool())
Log.info("创建数据库LOOP成功")
except (KeyboardInterrupt, SystemExit):
pass
except Exception as exc:
Log.error("创建数据库LOOP发生严重错误")
raise exc
async def wait_closed(self):
if self._sql_pool is None:
return
pool = self._sql_pool
pool.close()
await pool.wait_closed()
async def create_pool(self):
self._sql_pool = await aiomysql.create_pool(
host=self.host, port=self.port,
user=self.user, password=self.password,
db=self.database, loop=self._loop)
async def _get_pool(self):
if self._sql_pool is None:
raise RuntimeError("mysql pool is none")
return self._sql_pool
async def executemany(self, query, query_args):
pool = await self._get_pool()
async with pool.acquire() as conn:
sql_cur = await conn.cursor()
await sql_cur.executemany(query, query_args)
rowcount = sql_cur.rowcount
await sql_cur.close()
await conn.commit()
return rowcount
async def execute_and_fetchall(self, query, query_args):
pool = await self._get_pool()
async with pool.acquire() as conn:
sql_cur = await conn.cursor()
await sql_cur.execute(query, query_args)
result = await sql_cur.fetchall()
await sql_cur.close()
await conn.commit()
return result