This commit is contained in:
xtaodada 2023-04-23 14:08:36 +08:00
parent d9c59ffb59
commit cdf4f91901
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
30 changed files with 72 additions and 846 deletions

20
apis/main/get_tips.py Normal file
View File

@ -0,0 +1,20 @@
import secrets
from defs import app
from models.services.tip import TipAction
from typing import Dict, Optional
system_random = secrets.SystemRandom()
async def get_random_tips() -> Optional[Dict]:
tips = await TipAction.get_tips()
if not tips:
return None
num = system_random.randint(0, len(tips) - 1)
return tips[num].dict_tip()
@app.get("/get_tips")
async def get_tips():
return {"code": 200, "msg": "success", "data": await get_random_tips()}

View File

@ -1,22 +0,0 @@
from fastapi import Request
from fastapi.responses import JSONResponse
from defs import app, need_admin_routes
from models.services.user import UserAction
@app.middleware("http")
async def check_admin_middleware(request: Request, call_next):
if request.url.path not in need_admin_routes:
return await call_next(request)
uid = request.cookies.get("uid")
session = request.cookies.get("session")
try:
if not uid or not session:
raise ValueError
uid = int(uid)
if not await UserAction.check_admin(uid):
raise ValueError
except ValueError:
return JSONResponse(status_code=403, content={"code": 403, "msg": "此操作需要管理员权限"})
return await call_next(request)

View File

@ -1,27 +0,0 @@
from fastapi import Request
from fastapi.responses import JSONResponse
from defs import app, need_auth_routes, need_auth_uid_only_routes
from models.services.session import SessionAction
@app.middleware("http")
async def check_session_middleware(request: Request, call_next):
if request.url.path not in need_auth_routes:
return await call_next(request)
uid = request.cookies.get("uid")
session = request.cookies.get("session")
try:
if not uid or not session:
raise ValueError
uid = int(uid)
session = str(session)
auth_success = await SessionAction.check_session(uid, session)
if not auth_success:
raise ValueError
except ValueError:
if request.url.path in need_auth_uid_only_routes:
request.cookies["uid"] = ""
else:
return JSONResponse(status_code=401, content={"code": 401, "msg": "Cookie 无效"})
return await call_next(request)

View File

@ -1,47 +0,0 @@
from pydantic import BaseModel
from defs import app, need_auth_routes
from errors.post import *
from fastapi import Request
from models.services.post import PostAction
from models.services.topic import TopicAction
from models.services.user import UserAction
class CreatePost(BaseModel):
tid: int
title: str
content: str
async def create_post_func(model: CreatePost, uid: int):
topic = await TopicAction.get_topic_by_tid(model.tid)
if topic is None:
raise PostTopicNotValidException()
if len(model.title) > 100:
raise PostTitleTooLongException()
if len(model.content) > 5000:
raise PostContentTooLongException()
if topic.need_admin:
if not await UserAction.check_admin(uid):
raise PostTopicNeedAdminException()
post = PostAction.gen_new_post(
model.tid,
uid,
model.title,
model.content,
)
await PostAction.add_post(post)
@app.post("/create_post")
async def create_post(post: CreatePost, request: Request):
uid = int(request.cookies.get("uid"))
try:
await create_post_func(post, uid)
except PostException as e:
return {"code": 400, "msg": e.message}
return {"code": 200, "msg": "创建成功"}
need_auth_routes.append("/create_post")

View File

@ -1,38 +0,0 @@
from fastapi import Request
from pydantic import BaseModel
from defs import app, need_auth_routes
from errors.post import *
from models.services.post import PostAction
from models.services.user import UserAction
class DeletePost(BaseModel):
pid: int
async def delete_post_func(model: DeletePost, uid: int) -> None:
admin = await UserAction.check_admin(uid)
post = await PostAction.get_post_by_pid(model.pid)
if not post:
raise PostNotExistException()
if post.is_delete:
raise PostNotExistException()
if not admin:
if post.uid != uid:
raise PostTopicNeedAdminException()
post.is_delete = True
await PostAction.update_post(post)
@app.post("/delete_post")
async def delete_post(model: DeletePost, request: Request):
uid = int(request.cookies.get("uid"))
try:
await delete_post_func(model, uid)
except PostException as e:
return {"code": 403, "msg": e.message}
return {"code": 200, "msg": "删除成功"}
need_auth_routes.append("/delete_post")

View File

@ -1,46 +0,0 @@
from pydantic import BaseModel
from defs import app, need_auth_routes
from errors.post import *
from fastapi import Request
from models.services.post import PostAction
from models.services.user import UserAction
import time
class EditPost(BaseModel):
pid: int
title: str
content: str
async def edit_post_func(model: EditPost, uid: int):
post = await PostAction.get_post_by_pid(model.pid)
if post is None:
raise PostNotExistException()
admin = await UserAction.check_admin(uid)
if not admin:
if post.uid != uid:
raise PostTopicNeedAdminException()
if len(model.title) > 100:
raise PostTitleTooLongException()
if len(model.content) > 5000:
raise PostContentTooLongException()
post.title = model.title
post.content = model.content
post.update_time = int(time.time())
await PostAction.update_post(post)
@app.post("/edit_post")
async def edit_post(post: EditPost, request: Request):
uid = int(request.cookies.get("uid"))
try:
await edit_post_func(post, uid)
except PostException as e:
return {"code": 400, "msg": e.message}
return {"code": 200, "msg": "修改成功"}
need_auth_routes.append("/edit_post")

View File

@ -1,55 +0,0 @@
from typing import List, Dict
from pydantic import BaseModel
from defs import app, need_auth_routes, need_auth_uid_only_routes
from errors.post import *
from fastapi import Request
from models.services.post import PostAction
from models.services.topic import TopicAction
from models.services.user import UserAction
class GetPost(BaseModel):
tid: int
async def get_post_func(model: GetPost = None, uid: int = None) -> List[Dict]:
tid = model.tid if model else None
admin = await UserAction.check_admin(uid) if uid else False
if tid is not None:
topic = await TopicAction.get_topic_by_tid(tid)
if not topic:
raise PostTopicNotValidException()
if topic.need_admin:
if not admin:
raise PostTopicNeedAdminException()
posts = await PostAction.get_posts_by_tid(tid, admin)
return [post.dict_post() for post in posts]
@app.get("/get_posts")
async def get_posts_get(request: Request):
uid = request.cookies.get("uid")
if uid is not None:
uid = int(uid)
data = await get_post_func(uid=uid)
return {"code": 200, "msg": "获取成功", "data": data}
@app.post("/get_posts_post")
async def get_posts_post(model: GetPost, request: Request):
uid = request.cookies.get("uid")
if uid is not None:
uid = int(uid)
try:
data = await get_post_func(model, uid)
except PostTopicNeedAdminException:
return {"code": 200, "msg": "获取成功", "data": []}
except PostException as e:
return {"code": 403, "msg": e.message}
return {"code": 200, "msg": "获取成功", "data": data}
need_auth_routes.append("/get_posts_post")
need_auth_uid_only_routes.append("/get_posts_post")

View File

@ -1,38 +0,0 @@
from fastapi import Request
from pydantic import BaseModel
from defs import app, need_auth_routes
from errors.post import *
from models.services.post import PostAction
from models.services.user import UserAction
class RestorePost(BaseModel):
pid: int
async def restore_post_func(model: RestorePost, uid: int) -> None:
admin = await UserAction.check_admin(uid)
post = await PostAction.get_post_by_pid(model.pid)
if not post:
raise PostNotExistException()
if not post.is_delete:
raise PostNotDeleteException()
if not admin:
if post.uid != uid:
raise PostTopicNeedAdminException()
post.is_delete = False
await PostAction.update_post(post)
@app.post("/restore_post")
async def restore_post(model: RestorePost, request: Request):
uid = int(request.cookies.get("uid"))
try:
await restore_post_func(model, uid)
except PostException as e:
return {"code": 403, "msg": e.message}
return {"code": 200, "msg": "恢复成功"}
need_auth_routes.append("/delete_post")

View File

@ -1,24 +0,0 @@
from pydantic import BaseModel
from defs import app, need_auth_routes, need_admin_routes
from models.services.topic import TopicAction
class CreateTopic(BaseModel):
title: str
need_admin: bool
async def create_topic_func(title: str, need_admin: bool):
topic = TopicAction.gen_new_topic(title, need_admin=need_admin)
await TopicAction.add_topic(topic)
@app.post("/create_topic")
async def create_topic(topic: CreateTopic):
await create_topic_func(topic.title, topic.need_admin)
return {"code": 200, "msg": "创建成功"}
need_auth_routes.append("/create_topic")
need_admin_routes.append("/create_topic")

View File

@ -1,15 +0,0 @@
from typing import List, Dict, Any
from defs import app
from models.services.topic import TopicAction
async def get_topics_func() -> List[Dict[str, Any]]:
topics = await TopicAction.get_all_topic()
return [topic.dict_topic() for topic in topics]
@app.get("/get_topics")
async def create_topic():
data = await get_topics_func()
return {"code": 200, "msg": "获取成功", "data": data}

View File

@ -1,34 +0,0 @@
from errors.user import UserPasswordIncorrectError, UserNotFoundError
from pydantic import BaseModel
from defs import app
from models.services.user import UserAction
class ChangeUser(BaseModel):
username: str
old_password: str
new_password: str
async def change_user_password(change_user: ChangeUser):
user = await UserAction.get_user_by_username(change_user.username)
if user is None:
raise UserNotFoundError
if user.password != change_user.old_password:
raise UserPasswordIncorrectError
user.password = change_user.new_password
await UserAction.update_user(user)
@app.post("/change_password")
async def change_password(change_user: ChangeUser):
if change_user.old_password == change_user.new_password:
return {"code": 403, "msg": "新密码不能与旧密码相同"}
try:
await change_user_password(change_user)
except UserNotFoundError:
return {"code": 403, "msg": "用户不存在"}
except UserPasswordIncorrectError:
return {"code": 403, "msg": "用户名或密码错误"}
return {"code": 200, "msg": "密码修改成功"}

View File

@ -1,8 +0,0 @@
from defs import app, need_auth_routes
@app.get("/check_login")
async def check_login():
return {"code": 200, "msg": "登录状态有效"}
need_auth_routes.append("/check_login")

View File

@ -1,19 +0,0 @@
from typing import Dict
from defs import app, need_auth_routes
from fastapi import Request
from models.services.user import UserAction
async def get_me_func(uid: int) -> Dict:
user = await UserAction.get_user_by_id(uid)
return user.dict_user()
@app.get("/get_me")
async def get_me(request: Request):
uid = int(request.cookies.get("uid"))
user = await get_me_func(uid)
return {"code": 200, "msg": "获取成功", "data": user}
need_auth_routes.append("/check_login")

View File

@ -1,34 +0,0 @@
from errors.user import UserNotFoundError, UserPasswordIncorrectError
from fastapi import Response, Form
from defs import app
from models.services.session import SessionAction
from models.services.user import UserAction
async def authenticate_user(username: str, password: str) -> int:
user = await UserAction.get_user_by_username(username)
if user is None:
raise UserNotFoundError
if user.password != password:
raise UserPasswordIncorrectError
return user.uid
async def update_session(uid: int, session: str):
await SessionAction.update_session(uid, session)
@app.post("/login")
async def login(response: Response, username: str = Form(...), password: str = Form(...)):
try:
uid = await authenticate_user(username, password)
except UserNotFoundError:
return {"code": 403, "msg": "用户不存在"}
except UserPasswordIncorrectError:
return {"code": 403, "msg": "用户名或密码错误"}
session = SessionAction.gen_session()
await update_session(uid, session)
response.set_cookie(key="uid", value=str(uid))
response.set_cookie(key="session", value=session)
return {"code": 200, "msg": "登录成功", "data": {"uid": str(uid), "session": session}}

View File

@ -1,26 +0,0 @@
from fastapi import Form
from errors.user import UserAlreadyExistsError
from defs import app
from models.services.user import UserAction
async def reg_user(username: str, password: str) -> int:
user = await UserAction.get_user_by_username(username)
if user:
raise UserAlreadyExistsError
user = UserAction.gen_new_user(
username,
password,
)
await UserAction.add_user(user)
@app.post("/reg")
async def reg(username: str = Form(...), password: str = Form(...)):
try:
await reg_user(username, password)
except UserAlreadyExistsError:
return {"code": 409, "msg": "用户已存在"}
return {"code": 200, "msg": "注册成功"}

View File

View File

@ -1,33 +0,0 @@
class PostException(Exception):
def __init__(self, message: str = ""):
self.message = message
class PostTitleTooLongException(PostException):
def __init__(self, message: str = "标题过长"):
super().__init__(message)
class PostContentTooLongException(PostException):
def __init__(self, message: str = "内容过长"):
super().__init__(message)
class PostTopicNotValidException(PostException):
def __init__(self, message: str = "主题不存在"):
super().__init__(message)
class PostTopicNeedAdminException(PostException):
def __init__(self, message: str = "需要管理员权限"):
super().__init__(message)
class PostNotExistException(PostException):
def __init__(self, message: str = "文章不存在"):
super().__init__(message)
class PostNotDeleteException(PostException):
def __init__(self, message: str = "文章未删除"):
super().__init__(message)

View File

@ -1,18 +0,0 @@
class UserException(Exception):
def __init__(self, message):
self.message = message
class UserNotFoundError(UserException):
def __init__(self, message="User not found"):
super().__init__(message)
class UserAlreadyExistsError(UserException):
def __init__(self, message="User already exists"):
super().__init__(message)
class UserPasswordIncorrectError(UserException):
def __init__(self, message="User password incorrect"):
super().__init__(message)

View File

@ -1,30 +0,0 @@
from typing import Dict
from sqlmodel import SQLModel, Field
class Post(SQLModel, table=True):
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
pid: int = Field(primary_key=True, default=None)
tid: int = Field(default=None)
uid: int = Field(default=None)
content: str = Field(default="")
title: str = Field(default="")
create_time: int = Field(default="")
update_time: int = Field(default="")
is_hidden: bool = Field(default=False)
is_delete: bool = Field(default=False)
def dict_post(self) -> Dict:
return {
"pid": self.pid,
"tid": self.tid,
"uid": self.uid,
"title": self.title,
"content": self.content,
"create_time": self.create_time,
"update_time": self.update_time,
"is_hidden": self.is_hidden,
"is_delete": self.is_delete
}

16
models/models/tip.py Normal file
View File

@ -0,0 +1,16 @@
from typing import Dict
from sqlmodel import SQLModel, Field
class Tip(SQLModel, table=True):
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
id: int = Field(primary_key=True, default=None)
content: str = Field(default="")
def dict_tip(self) -> Dict:
return {
"id": self.id,
"content": self.content,
}

View File

@ -1,20 +0,0 @@
from typing import Dict
from sqlmodel import SQLModel, Field
class Topic(SQLModel, table=True):
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
tid: int = Field(primary_key=True, default=None)
title: str = Field(default="")
create_time: int = Field(default="")
need_admin: bool = Field(default=False)
def dict_topic(self) -> Dict:
return {
"tid": self.tid,
"title": self.title,
"create_time": self.create_time,
"need_admin": self.need_admin,
}

View File

@ -1,24 +0,0 @@
from typing import Dict
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
uid: int = Field(primary_key=True, default=None)
username: str = Field(default="")
password: str = Field(default="")
is_admin: bool = Field(default=False)
register_time: int = Field(default="")
last_login_time: int = Field(default="")
session: str = Field(default="")
def dict_user(self) -> Dict:
return {
"uid": self.uid,
"username": self.username,
"is_admin": self.is_admin,
"register_time": self.register_time,
"last_login_time": self.last_login_time,
}

View File

@ -1,82 +0,0 @@
import time
from typing import cast, Optional, List
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from defs import sqlite
from models.models.post import Post
class PostAction:
@staticmethod
async def add_post(post: Post):
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(post)
await session.commit()
@staticmethod
async def update_post(old_post: Post, new_post: Post = None):
if new_post:
old_post.tid = new_post.tid
old_post.uid = new_post.uid
old_post.title = new_post.title
old_post.content = new_post.content
old_post.create_time = new_post.create_time
old_post.update_time = new_post.update_time
old_post.is_hidden = new_post.is_hidden
old_post.is_delete = new_post.is_delete
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(old_post)
await session.commit()
await session.refresh(old_post)
@staticmethod
async def get_post_by_pid(pid: int) -> Post:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
query = select(Post).where(Post.pid == pid)
results = await session.execute(query)
return post[0] if (post := results.first()) else None
@staticmethod
async def get_posts_by_tid(tid: int = None, admin: bool = False) -> List[Post]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
query = select(Post)
if tid:
query = query.where(Post.tid == tid)
if not admin:
query = query.where(
Post.is_delete == False
).where(
Post.is_hidden == False
)
results = await session.execute(query)
return results.scalars().all()
@staticmethod
def gen_new_post(
tid: int,
uid: int,
title: str,
content: str,
create_time: Optional[int] = None,
update_time: Optional[int] = None,
is_hidden: Optional[bool] = False,
is_delete: Optional[bool] = False,
) -> Post:
if not create_time:
create_time = int(time.time())
return Post(
tid=tid,
uid=uid,
title=title,
content=content,
create_time=create_time,
update_time=update_time,
is_hidden=is_hidden,
is_delete=is_delete,
)

View File

@ -1,45 +0,0 @@
import secrets
import string
import time
from typing import cast
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from defs import sqlite
from models.models.user import User
class SessionAction:
@staticmethod
async def check_session(uid: int, session_value: str) -> bool:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
statement = select(User).where(User.uid == uid)
results = await session.exec(statement)
user_: User = user[0] if (user := results.first()) else None
return False if user_ is None else user_.session == session_value
@staticmethod
async def update_session(uid: int, session_value: str) -> None:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
statement = select(User).where(User.uid == uid)
results = await session.exec(statement)
user_: User = user[0] if (user := results.first()) else None
if user_ is None:
return
user_.last_login_time = int(time.time())
user_.session = session_value
await session.commit()
await session.refresh(user_)
@staticmethod
def gen_session() -> str:
return ''.join(
secrets.choice(
string.ascii_uppercase + string.ascii_lowercase + string.digits
)
for _ in range(30)
)

34
models/services/tip.py Normal file
View File

@ -0,0 +1,34 @@
from typing import cast, Optional, List
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from defs import sqlite
from models.models.tip import Tip
class TipAction:
@staticmethod
async def add_tip(tip: Tip):
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(tip)
await session.commit()
@staticmethod
async def update_tip(old_tip: Tip, new_tip: Tip = None):
if new_tip:
old_tip.content = new_tip.content
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(old_tip)
await session.commit()
await session.refresh(old_tip)
@staticmethod
async def get_tips() -> List[Optional[Tip]]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
statement = select(Tip)
results = await session.exec(statement)
return results.scalars().all()

View File

@ -1,64 +0,0 @@
import time
from typing import cast, Optional, List
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from defs import sqlite
from models.models.topic import Topic
class TopicAction:
@staticmethod
async def add_topic(topic: Topic):
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(topic)
await session.commit()
@staticmethod
async def update_topic(old_topic: Topic, new_topic: Topic = None):
if new_topic:
old_topic.title = new_topic.title
old_topic.create_time = new_topic.create_time
old_topic.need_admin = new_topic.need_admin
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(old_topic)
await session.commit()
await session.refresh(old_topic)
@staticmethod
async def get_topic_by_tid(tid: int) -> Optional[Topic]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
query = select(Topic).where(Topic.tid == tid)
results = await session.execute(query)
return topic[0] if (topic := results.first()) else None
@staticmethod
async def check_need_admin(tid: int) -> bool:
topic = await TopicAction.get_topic_by_tid(tid)
return topic.need_admin if topic else False
@staticmethod
async def get_all_topic() -> List[Topic]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
query = select(Topic)
results = await session.execute(query)
return results.scalars().all()
@staticmethod
def gen_new_topic(
title: str,
create_time: Optional[int] = None,
need_admin: Optional[bool] = None,
) -> Topic:
if not create_time:
create_time = int(time.time())
return Topic(
title=title,
create_time=create_time,
need_admin=need_admin,
)

View File

@ -1,89 +0,0 @@
import time
from typing import cast, Optional
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from defs import sqlite
from models.models.user import User
class UserAction:
@staticmethod
async def add_user(user: User):
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(user)
await session.commit()
@staticmethod
async def get_user_by_username(username: str) -> Optional[User]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
statement = select(User).where(User.username == username)
results = await session.exec(statement)
return user[0] if (user := results.first()) else None
@staticmethod
async def get_user_by_id(uid: int) -> Optional[User]:
async with sqlite.session() as session:
session = cast(AsyncSession, session)
statement = select(User).where(User.uid == uid)
results = await session.exec(statement)
return user[0] if (user := results.first()) else None
@staticmethod
async def check_admin(uid: int) -> bool:
user = await UserAction.get_user_by_id(uid)
return user.is_admin if user else False
@staticmethod
async def update_user(old_user: User, new_user: User = None):
if new_user:
old_user.username = new_user.username
old_user.password = new_user.password
old_user.is_admin = new_user.is_admin
old_user.register_time = new_user.register_time
old_user.last_login_time = new_user.last_login_time
old_user.session = new_user.session
async with sqlite.session() as session:
session = cast(AsyncSession, session)
session.add(old_user)
await session.commit()
await session.refresh(old_user)
@staticmethod
async def add_or_update_user(user: User):
if old_user := await UserAction.get_user_by_username(user.username):
await UserAction.update_user(old_user, user)
else:
await UserAction.add_user(user)
@staticmethod
async def change_user_password(username: str, password: str) -> bool:
user = await UserAction.get_user_by_username(username)
if not user:
return False
user.password = password
await UserAction.update_user(user)
return True
@staticmethod
def gen_new_user(
username: str,
password: str,
is_admin: bool = False,
register_time: int = 0,
last_login_time: int = 0,
session: str = "",
) -> User:
if not register_time:
register_time = int(time.time())
return User(
username=username,
password=password,
is_admin=is_admin,
register_time=register_time,
last_login_time=last_login_time,
session=session,
)

View File

@ -1,9 +1,9 @@
from sqlmodel import SQLModel
from models.models.user import User
from models.models.tip import Tip
from pathlib import Path
__all__ = ["User", "Sqlite"]
__all__ = ["Tip", "Sqlite"]
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker

View File

View File

@ -1,6 +0,0 @@
from pydantic import BaseModel
class User(BaseModel):
username: str
password: str