mirror of
https://github.com/Xtao-Labs/misskey2telegram.git
synced 2024-11-29 16:01:42 +00:00
152 lines
4.6 KiB
Python
152 lines
4.6 KiB
Python
|
from typing import Optional
|
|||
|
|
|||
|
from mipac import ChatMessage, File
|
|||
|
from mipac.models.lite import LiteUser
|
|||
|
from pyrogram.errors import MediaEmpty
|
|||
|
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
|
|||
|
|
|||
|
from glover import misskey_host
|
|||
|
from init import bot, request
|
|||
|
from scheduler import add_delete_file_job, delete_file
|
|||
|
|
|||
|
|
|||
|
def get_user_link(user: LiteUser) -> str:
|
|||
|
if user.host:
|
|||
|
return f"https://{user.host}/@{user.username}"
|
|||
|
return f"https://{misskey_host}/@{user.username}"
|
|||
|
|
|||
|
|
|||
|
def get_source_link(message: ChatMessage) -> str:
|
|||
|
return (
|
|||
|
f"https://{misskey_host}/my/messaging/{message.user.username}?cid={message.user.id}"
|
|||
|
if not message.group and message.user
|
|||
|
else f"https://{misskey_host}/my/messaging/group/{message.group.id}"
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
def gen_button(message: ChatMessage):
|
|||
|
author = get_user_link(message.user)
|
|||
|
source = get_source_link(message)
|
|||
|
first_line = [
|
|||
|
InlineKeyboardButton(text="Chat", url=source),
|
|||
|
InlineKeyboardButton(text="Author", url=author),
|
|||
|
]
|
|||
|
return InlineKeyboardMarkup([first_line])
|
|||
|
|
|||
|
|
|||
|
def get_content(message: ChatMessage) -> str:
|
|||
|
content = message.text or ""
|
|||
|
content = content[:768]
|
|||
|
user = f"<a href=\"{get_user_link(message.user)}\">{message.user.nickname}</a>"
|
|||
|
if message.group:
|
|||
|
group = f"<a href=\"{get_source_link(message)}\">{message.group.name}</a>"
|
|||
|
user += f" ( {group} )"
|
|||
|
return f"""<b>Misskey Message</b>
|
|||
|
|
|||
|
{user}: <code>{content}</code>"""
|
|||
|
|
|||
|
|
|||
|
async def send_text(cid: int, message: ChatMessage, reply_to_message_id: int):
|
|||
|
await bot.send_message(
|
|||
|
cid,
|
|||
|
get_content(message),
|
|||
|
reply_to_message_id=reply_to_message_id,
|
|||
|
reply_markup=gen_button(message),
|
|||
|
disable_web_page_preview=True,
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
def deprecated_to_text(func):
|
|||
|
async def wrapper(*args, **kwargs):
|
|||
|
try:
|
|||
|
return await func(*args, **kwargs)
|
|||
|
except MediaEmpty:
|
|||
|
return await send_text(args[0], args[2], args[3])
|
|||
|
|
|||
|
return wrapper
|
|||
|
|
|||
|
|
|||
|
@deprecated_to_text
|
|||
|
async def send_photo(cid: int, url: str, message: ChatMessage, reply_to_message_id: int):
|
|||
|
if not url:
|
|||
|
return await send_text(cid, message, reply_to_message_id)
|
|||
|
await bot.send_photo(
|
|||
|
cid,
|
|||
|
url,
|
|||
|
reply_to_message_id=reply_to_message_id,
|
|||
|
caption=get_content(message),
|
|||
|
reply_markup=gen_button(message),
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
@deprecated_to_text
|
|||
|
async def send_video(cid: int, url: str, message: ChatMessage, reply_to_message_id: int):
|
|||
|
if not url:
|
|||
|
return await send_text(cid, message, reply_to_message_id)
|
|||
|
await bot.send_video(
|
|||
|
cid,
|
|||
|
url,
|
|||
|
reply_to_message_id=reply_to_message_id,
|
|||
|
caption=get_content(message),
|
|||
|
reply_markup=gen_button(message),
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
@deprecated_to_text
|
|||
|
async def send_audio(cid: int, url: str, message: ChatMessage, reply_to_message_id: int):
|
|||
|
if not url:
|
|||
|
return await send_text(cid, message, reply_to_message_id)
|
|||
|
await bot.send_audio(
|
|||
|
cid,
|
|||
|
url,
|
|||
|
reply_to_message_id=reply_to_message_id,
|
|||
|
caption=get_content(message),
|
|||
|
reply_markup=gen_button(message),
|
|||
|
)
|
|||
|
|
|||
|
|
|||
|
async def fetch_document(file: File) -> Optional[str]:
|
|||
|
file_name = f"downloads/{file.name}"
|
|||
|
file_url = file.url
|
|||
|
if file.size > 10 * 1024 * 1024:
|
|||
|
return file_url
|
|||
|
if not file_url:
|
|||
|
return file_url
|
|||
|
req = await request.get(file_url)
|
|||
|
if req.status_code != 200:
|
|||
|
return file_url
|
|||
|
with open(file_name, "wb") as f:
|
|||
|
f.write(req.content)
|
|||
|
add_delete_file_job(file_name)
|
|||
|
return file_name
|
|||
|
|
|||
|
|
|||
|
@deprecated_to_text
|
|||
|
async def send_document(cid: int, file: File, message: ChatMessage, reply_to_message_id: int):
|
|||
|
file = await fetch_document(file)
|
|||
|
if not file:
|
|||
|
return await send_text(cid, message, reply_to_message_id)
|
|||
|
await bot.send_document(
|
|||
|
cid,
|
|||
|
file,
|
|||
|
reply_to_message_id=reply_to_message_id,
|
|||
|
caption=get_content(message),
|
|||
|
reply_markup=gen_button(message),
|
|||
|
)
|
|||
|
await delete_file(file)
|
|||
|
|
|||
|
|
|||
|
async def send_chat_message(cid: int, message: ChatMessage, topic_id: int):
|
|||
|
if not message.file:
|
|||
|
return await send_text(cid, message, topic_id)
|
|||
|
file_url = message.file.url
|
|||
|
file_type = message.file.type
|
|||
|
if file_type.startswith("image"):
|
|||
|
await send_photo(cid, file_url, message, topic_id)
|
|||
|
elif file_type.startswith("video"):
|
|||
|
await send_video(cid, file_url, message, topic_id)
|
|||
|
elif file_type.startswith("audio"):
|
|||
|
await send_audio(cid, file_url, message, topic_id)
|
|||
|
else:
|
|||
|
await send_document(cid, message.file, message, topic_id)
|