2023-08-03 06:59:12 +00:00
|
|
|
import io
|
|
|
|
import traceback
|
2023-01-27 12:02:46 +00:00
|
|
|
import uuid
|
2023-08-03 06:59:12 +00:00
|
|
|
import PIL.Image
|
|
|
|
from httpx import AsyncClient
|
2023-08-09 06:26:20 +00:00
|
|
|
from fastapi import FastAPI, Query
|
|
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from starlette.requests import Request
|
|
|
|
from starlette.responses import RedirectResponse, StreamingResponse, HTMLResponse
|
2023-01-27 12:02:46 +00:00
|
|
|
|
|
|
|
from utils import gen_url, get_auth
|
|
|
|
|
|
|
|
app = FastAPI()
|
2023-08-03 06:59:12 +00:00
|
|
|
with open("default.jpg", "rb") as f:
|
|
|
|
default_jpg = io.BytesIO(f.read())
|
2023-08-09 06:26:20 +00:00
|
|
|
templates = Jinja2Templates(directory="public/templates")
|
2023-01-27 12:02:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
@app.get('/gen')
|
|
|
|
async def gen(
|
|
|
|
*,
|
|
|
|
username: str,
|
|
|
|
host: str,
|
|
|
|
back_host: str,
|
|
|
|
):
|
|
|
|
session = str(uuid.uuid4())
|
|
|
|
return RedirectResponse(gen_url(username, host, back_host, session))
|
|
|
|
|
|
|
|
|
2023-08-03 06:59:12 +00:00
|
|
|
@app.get('/1.jpg')
|
|
|
|
async def jpg(
|
|
|
|
*,
|
|
|
|
url: str,
|
|
|
|
) -> StreamingResponse:
|
|
|
|
if "/proxy/avatar" not in url:
|
|
|
|
return StreamingResponse(default_jpg, media_type="image/jpg")
|
|
|
|
url = url.replace("/proxy/avatar.webp", "/proxy/avatar.png")
|
|
|
|
# jpg png webp gif to jpg
|
|
|
|
try:
|
|
|
|
async with AsyncClient() as client:
|
|
|
|
print(f"get jpg {url}")
|
|
|
|
r = await client.get(url)
|
|
|
|
remote = PIL.Image.open(io.BytesIO(r.content))
|
|
|
|
remote = remote.convert("RGB")
|
|
|
|
file_like = io.BytesIO()
|
|
|
|
remote.save(file_like, "jpeg")
|
|
|
|
file_like.seek(0)
|
|
|
|
return StreamingResponse(file_like, media_type="image/jpg")
|
|
|
|
except Exception:
|
|
|
|
traceback.print_exc()
|
|
|
|
return StreamingResponse(default_jpg, media_type="image/jpg")
|
|
|
|
|
|
|
|
|
2023-01-27 12:02:46 +00:00
|
|
|
@app.get("/{username}/{host}")
|
|
|
|
async def back_to_telegram(
|
|
|
|
*,
|
|
|
|
username: str,
|
|
|
|
host: str,
|
|
|
|
session: str,
|
|
|
|
):
|
|
|
|
data = await get_auth(host, session)
|
|
|
|
if data.get("ok", False):
|
|
|
|
return RedirectResponse(f"https://t.me/{username}?start={data['token']}")
|
|
|
|
else:
|
|
|
|
return data
|
2023-08-09 06:26:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
@app.get("/config", response_class=HTMLResponse)
|
|
|
|
async def debug_config_page(
|
|
|
|
request: Request,
|
|
|
|
bot_data: str = Query(..., title="bot_data"),
|
|
|
|
):
|
|
|
|
user = {"command": "config", "bot_data": bot_data}
|
|
|
|
return templates.TemplateResponse(
|
|
|
|
"config.html", {"request": request, "user": user}
|
|
|
|
)
|