mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-25 15:42:18 +00:00
111 lines
3.9 KiB
Python
111 lines
3.9 KiB
Python
import contextlib
|
|
|
|
from pagermaid.services import sqlite
|
|
from pagermaid.scheduler import add_delete_message_job
|
|
from pagermaid.enums import Message
|
|
from pagermaid.listener import listener
|
|
from pagermaid.utils import alias_command
|
|
|
|
|
|
class DelTask:
|
|
@staticmethod
|
|
def check_time(time: str, min_value: int = None, max_value: int = None) -> str:
|
|
if max_value and int(time) > max_value:
|
|
raise ValueError(f"Time value {time} is too large")
|
|
if min_value and int(time) < min_value:
|
|
raise ValueError(f"Time value {time} is too small")
|
|
if int(time) < 0:
|
|
raise ValueError(f"Time value {time} is too small")
|
|
return time
|
|
|
|
@staticmethod
|
|
def parse_time(text: str) -> int:
|
|
data = text.strip().split(" ")
|
|
second, minute, hour, day = 0, 0, 0, 0
|
|
if len(data) % 2:
|
|
raise ValueError("Invalid task format")
|
|
for i in range(1, len(data)):
|
|
if data[i] == "seconds":
|
|
second = DelTask.check_time(data[i - 1], 0, 60)
|
|
elif data[i] == "minutes":
|
|
minute = DelTask.check_time(data[i - 1], 0, 60)
|
|
elif data[i] == "hours":
|
|
hour = DelTask.check_time(data[i - 1], 0, 24)
|
|
elif data[i] == "days":
|
|
day = DelTask.check_time(data[i - 1], 0, 31)
|
|
if second := int(second) + int(minute) * 60 + int(hour) * 3600 + int(day) * 86400:
|
|
return second
|
|
else:
|
|
raise ValueError("Invalid task format")
|
|
|
|
@staticmethod
|
|
def get_del_seconds(cid: int):
|
|
return sqlite.get(f"autodel.{cid}", 0)
|
|
|
|
@staticmethod
|
|
async def parse_task(message: Message):
|
|
cid = message.chat.id
|
|
text = message.arguments
|
|
if not text:
|
|
raise ValueError("Invalid task format")
|
|
if "global" in text:
|
|
cid = 0
|
|
text = text.replace("global", "")
|
|
if "cancel" in text:
|
|
del sqlite[f"autodel.{cid}"]
|
|
return
|
|
seconds = DelTask.parse_time(text)
|
|
sqlite[f"autodel.{cid}"] = seconds
|
|
|
|
@staticmethod
|
|
def add_task(message: Message):
|
|
if seconds := DelTask.get_del_seconds(0) or DelTask.get_del_seconds(message.chat.id):
|
|
add_delete_message_job(message, seconds)
|
|
|
|
@staticmethod
|
|
def get_list(cid: int):
|
|
seconds = DelTask.get_del_seconds(cid)
|
|
global_seconds = DelTask.get_del_seconds(0)
|
|
if seconds:
|
|
return f"Current chat autodel time is {seconds} seconds"
|
|
elif global_seconds:
|
|
return f"Current chat autodel time is global {global_seconds} seconds"
|
|
return "Current chat autodel time is not set"
|
|
|
|
|
|
auto_del_help_msg = f"""
|
|
定时删除消息。
|
|
|
|
,{alias_command("autodel")} 1 seconds
|
|
,{alias_command("autodel")} 1 minutes
|
|
,{alias_command("autodel")} 1 hours
|
|
,{alias_command("autodel")} 1 days
|
|
,{alias_command("autodel")} 1 seconds global
|
|
,{alias_command("autodel")} cancel
|
|
|
|
,{alias_command("autodel")} l - 查看自动删除任务
|
|
"""
|
|
|
|
|
|
@listener(command="autodel",
|
|
need_admin=True,
|
|
description=f"定时删除消息\n请使用 ,{alias_command('autodel')} h 查看可用命令")
|
|
async def auto_del(message: Message):
|
|
if message.arguments == "h" or len(message.parameter) == 0:
|
|
return await message.edit(auto_del_help_msg)
|
|
elif message.arguments == "l":
|
|
return await message.edit(DelTask.get_list(message.chat.id))
|
|
try:
|
|
await DelTask.parse_task(message)
|
|
await message.edit("设置自动删除任务成功。" if message.arguments != "cancel" else "取消自动删除任务成功。")
|
|
except ValueError as e:
|
|
await message.edit(f"开启失败:{str(e)}")
|
|
except KeyError:
|
|
await message.edit("未开启自动删除")
|
|
|
|
|
|
@listener(incoming=False, outgoing=True)
|
|
async def auto_del_task(message: Message):
|
|
with contextlib.suppress(Exception):
|
|
DelTask.add_task(message)
|