add fakemysql

This commit is contained in:
BennyThink 2022-02-20 22:44:44 +08:00
parent 2ce135d549
commit ae21e0eb31
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481
2 changed files with 60 additions and 3 deletions

View File

@ -10,14 +10,12 @@ __author__ = "Benny <benny.think@gmail.com>"
import base64
import contextlib
import datetime
import json
import logging
import os
import re
import subprocess
import time
from io import BytesIO
from unittest.mock import MagicMock
import fakeredis
import pymysql
@ -27,6 +25,7 @@ from beautifultable import BeautifulTable
from influxdb import InfluxDBClient
from config import MYSQL_HOST, MYSQL_PASS, MYSQL_USER, QUOTA, REDIS
from fakemysql import FakeMySQL
class Redis:
@ -205,7 +204,7 @@ class MySQL:
self.con = pymysql.connect(host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASS, db="ytdl",
charset="utf8mb4")
else:
self.con = MagicMock()
self.con = FakeMySQL()
self.cur = self.con.cursor()
self.init_db()

58
ytdlbot/fakemysql.py Normal file
View File

@ -0,0 +1,58 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - fakemysql.py
# 2/20/22 20:08
#
__author__ = "Benny <benny.think@gmail.com>"
import re
import sqlite3
init_con = sqlite3.connect(":memory:", check_same_thread=False)
class FakeMySQL:
@staticmethod
def cursor() -> "Cursor":
return Cursor()
def commit(self):
pass
def close(self):
pass
class Cursor:
def __init__(self):
self.con = init_con
self.cur = self.con.cursor()
def execute(self, *args, **kwargs):
sql = self.sub(args[0])
new_args = (sql,) + args[1:]
return self.cur.execute(*new_args, **kwargs)
def fetchall(self):
return self.cur.fetchall()
def fetchone(self):
return self.cur.fetchone()
@staticmethod
def sub(sql):
sql = re.sub(r"CHARSET.*|charset.*", "", sql, re.IGNORECASE)
sql = sql.replace("%s", "?")
return sql
if __name__ == '__main__':
con = FakeMySQL()
cur = con.cursor()
cur.execute("create table user(id int, name varchar(20))")
cur.execute("insert into user values(%s,%s)", (1, "benny"))
cur.execute("select * from user")
data = cur.fetchall()
print(data)