Delete some plugins

This commit is contained in:
xtaodada 2023-07-01 20:23:16 +08:00
parent d7eda15640
commit 2a75572349
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
4 changed files with 0 additions and 752 deletions

View File

@ -1,120 +0,0 @@
""" https://github.com/A-kirami/nonebot-plugin-chatgpt """
import contextlib
import threading
import uuid
from collections import defaultdict
from typing import Optional
from pagermaid.services import scheduler, sqlite
from pagermaid.enums import Message
from pagermaid.listener import listener
from pagermaid.utils import pip_install
pip_install("revChatGPT", "==0.0.33.2")
from asyncChatGPT.asyncChatGPT import Chatbot
class AsyncChatbot:
def __init__(self) -> None:
self.config = {"session_token": self.get_token()} if self.get_token() else {}
self.bot = Chatbot(config=self.config, refresh=False)
def __call__(
self, conversation_id: Optional[str] = None, parent_id: Optional[str] = None
):
self.bot.conversation_id = conversation_id
self.bot.parent_id = parent_id or self.id
return self
@property
def id(self) -> str:
return str(uuid.uuid4())
def set_token(self, token: str) -> None:
sqlite["chatgbt_token"] = token
self.config["session_token"] = token
@staticmethod
def get_token() -> str:
return sqlite.get("chatgbt_token", None)
@staticmethod
def del_token():
del sqlite["chatgbt_token"]
async def get_chat_response(self, prompt: str) -> str:
return (await self.bot.get_chat_response(prompt)).get("message", "")
async def refresh_session(self) -> None:
if not self.get_token():
return
self.bot.refresh_session()
self.set_token(self.bot.config["session_token"])
chat_bot = AsyncChatbot()
chat_bot_session = defaultdict(dict)
chat_bot_lock = threading.Lock()
chat_bot_help = (
"使用 ChatGPT 聊天\n\n"
"参数:\n\n- 无参数:进入聊天模式\n"
"- reset重置聊天状态\n"
"- set <session_token>:设置 ChatGPT 会话令牌,获取令牌: https://t.me/PagerMaid_Modify/212 \n"
"- del删除 ChatGPT 会话令牌"
)
@scheduler.scheduled_job("interval", minutes=30)
async def refresh_session() -> None:
await chat_bot.refresh_session()
@listener(
command="chatgpt",
description=chat_bot_help,
)
async def chat_bot_func(message: Message):
if not message.arguments:
return await message.edit(chat_bot_help)
from_id = message.from_user.id if message.from_user else 0
from_id = message.sender_chat.id if message.sender_chat else from_id
if not from_id:
from_id = message.chat.id
if len(message.parameter) == 2 and message.parameter[0] == "set":
token = message.parameter[1]
if not token.startswith("ey"):
return await message.edit("无效的 token。")
chat_bot.set_token(message.parameter[1])
try:
await chat_bot.refresh_session()
except Exception as e:
return await message.edit(f"设置失败:{e}")
return await message.edit("设置 Token 成功,可以开始使用了。")
elif message.arguments == "reset":
with contextlib.suppress(KeyError):
del chat_bot_session[from_id]
return await message.edit("已重置聊天状态。")
elif message.arguments == "del":
if not chat_bot.get_token():
return await message.edit("没有设置 Token。")
chat_bot.del_token()
return await message.edit("已删除 Token。")
if not chat_bot.get_token():
return await message.edit("请先通过参数 `set [session_token]` 设置 OpenAI API Token。")
with chat_bot_lock:
try:
msg = await chat_bot(**chat_bot_session[from_id]).get_chat_response(
message.arguments
)
except Exception as e:
msg = f"可能是 Session Token 过期了,请重新设置。\n{str(e)}"
if not msg:
msg = "无法获取到回复,可能是网络波动,请稍后再试。"
with contextlib.suppress(Exception):
await message.edit(msg)
chat_bot_session[from_id]["conversation_id"] = chat_bot.bot.conversation_id
chat_bot_session[from_id]["parent_id"] = chat_bot.bot.parent_id

View File

@ -1,65 +0,0 @@
""" PagerMaid module that 历史上的今天 """
from pyrogram.enums import ParseMode
from pagermaid import scheduler, bot
from pagermaid.listener import listener
from pagermaid.utils import client, Message, check_manage_subs, edit_delete
from pagermaid.sub_utils import Sub
today_in_history_sub = Sub("today_in_history")
async def get_history() -> str:
resp = await client.get("https://api.iyk0.com/lishi/")
if resp.is_error:
raise ValueError(f"历史上的今天 数据获取失败,错误码:{resp.status_code}")
content = resp.json()
text = f"历史上的今天 {list(content.keys())[0]}\n\n"
for item in list(content.values())[0]:
text += f"{item['year']} {item['title']}"
return text
@scheduler.scheduled_job("cron", hour="8", id="today_in_history.push")
async def today_in_history_subscribe() -> None:
text = await get_history()
for gid in today_in_history_sub.get_subs():
try:
await bot.send_message(gid, text)
except Exception as e: # noqa
today_in_history_sub.del_id(gid)
@listener(
command="today_in_history",
parameters="订阅/退订",
description="查看历史上的今天,支持订阅/退订每天上午八点定时发送",
)
async def today_in_history(message: Message):
if not message.arguments:
try:
text = await get_history()
except ValueError as e:
return await message.edit(e.__str__())
await message.edit(text)
elif message.arguments == "订阅":
if check_manage_subs(message):
if today_in_history_sub.check_id(message.chat.id):
return await edit_delete(
message, "❌ 你已经订阅了历史上的今天", parse_mode=ParseMode.HTML
)
today_in_history_sub.add_id(message.chat.id)
await message.edit("你已经成功订阅了历史上的今天")
else:
await edit_delete(message, "❌ 权限不足,无法订阅历史上的今天", parse_mode=ParseMode.HTML)
elif message.arguments == "退订":
if check_manage_subs(message):
if not today_in_history_sub.check_id(message.chat.id):
return await edit_delete(
message, "❌ 你还没有订阅摸历史上的今天", parse_mode=ParseMode.HTML
)
today_in_history_sub.del_id(message.chat.id)
await message.edit("你已经成功退订了历史上的今天")
else:
await edit_delete(message, "❌ 权限不足,无法退订历史上的今天", parse_mode=ParseMode.HTML)

View File

@ -1,37 +0,0 @@
# Trace
### 用法
```
Reply to a message:
Trace : .trace 👍👎🥰
Untrace : .trace
Trace keyword: .trace kw add 👍👎🥰
Del keyword : .trace kw del
List all : .trace status
Untrace all: .trace clean
Keep log : .trace log [true|false]
```
#### 踩的坑
+ 对于一些emoji的字符长度为1在SPECIAL_EMOJI中定义不知道有没有写完欢迎添加。
#### 接下来可能加的东西(更有可能摆)
+ 在某一段时间内随机react
+ 优先级keyword和user
+ 通过id/username添加
+ 安全词(对方说了就停)
+ 历史记录react
+ big/sleep长度参数的设置
+ 定时器
+ 群组权限
+ 黑白名单
+ 还有什么好玩的么?
#### Bugs
+ disabled reaction检测
+ `count_offset` 对非2/1长度emoji的计数比如国旗emoji的长度为4

View File

@ -1,530 +0,0 @@
# -*- coding: utf-8 -*-
import contextlib
from typing import Optional, List, Dict, Tuple, Union
from functools import reduce
from pyrogram.enums import MessageEntityType, ParseMode
from pyrogram.raw.functions.messages import SendReaction
from pyrogram.raw.types import ReactionEmoji, ReactionCustomEmoji, User
from pyrogram.types import MessageEntity
from pagermaid.listener import listener
from pagermaid.enums import Client, Message
from pagermaid.utils import sleep, pip_install
from pagermaid.single_utils import sqlite
pip_install("emoji")
import emoji
NATIVE_EMOJI = b"\xf0\x9f\x91\x8d\xf0\x9f\x91\x8e\xe2\x9d\xa4\xef\xb8\x8f\xf0\x9f\x94\xa5\xf0\x9f\xa5\xb0\xf0\x9f\x91\x8f\xf0\x9f\x98\x81\xf0\x9f\xa4\x94\xf0\x9f\xa4\xaf\xf0\x9f\x98\xb1\xf0\x9f\xa4\xac\xf0\x9f\x98\xa2\xf0\x9f\x8e\x89\xf0\x9f\xa4\xa9\xf0\x9f\xa4\xae\xf0\x9f\x92\xa9\xf0\x9f\x99\x8f\xf0\x9f\x91\x8c\xf0\x9f\x95\x8a\xf0\x9f\xa4\xa1\xf0\x9f\xa5\xb1\xf0\x9f\xa5\xb4\xf0\x9f\x98\x8d\xf0\x9f\x90\xb3\xe2\x9d\xa4\xef\xb8\x8f\xe2\x80\x8d\xf0\x9f\x94\xa5\xf0\x9f\x8c\x9a\xf0\x9f\x8c\xad\xf0\x9f\x92\xaf\xf0\x9f\xa4\xa3\xe2\x9a\xa1\xef\xb8\x8f\xf0\x9f\x8d\x8c\xf0\x9f\x8f\x86\xf0\x9f\x92\x94\xf0\x9f\xa4\xa8\xf0\x9f\x98\x90\xf0\x9f\x8d\x93\xf0\x9f\x8d\xbe\xf0\x9f\x92\x8b\xf0\x9f\x96\x95\xf0\x9f\x98\x88\xf0\x9f\x98\x82\xf0\x9f\x98\xad".decode()
SPECIAL_EMOJI = "❤⬅↔➡⬆↕⬇" # TO BE ADDED
USAGE = f"""```Usage:
Reply to a message:
Trace : .trace 👍👎🥰
Untrace : .trace
Trace keyword: .trace kw add 👍👎🥰
Del keyword : .trace kw del
List all : .trace status
Untrace all: .trace clean
Keep log : .trace log [true|false]
Toggle big : .trace big [true|false]
Use with caution:
Reset all : .trace resettrace
```
**Available native emojis:** {NATIVE_EMOJI}
"""
cached_sqlite = {
key: value for key, value in sqlite.items() if key.startswith("trace.")
}
if cached_sqlite.get("trace.config.keep_log", None) is None:
sqlite["trace.config.keep_log"] = True
cached_sqlite["trace.config.keep_log"] = True
if cached_sqlite.get("trace.config.big", None) is None:
sqlite["trace.config.big"] = True
cached_sqlite["trace.config.big"] = True
async def edit_and_delete(
message: Message,
text: str,
entities: List[MessageEntity] = None,
seconds=5,
parse_mode: ParseMode = ParseMode.DEFAULT,
):
if entities is None:
entities = []
await message.edit(text, entities=entities, parse_mode=parse_mode)
if seconds == -1 or cached_sqlite["trace.config.keep_log"]:
return
await sleep(seconds)
return await message.delete()
async def print_usage(message: Message):
return await edit_and_delete(message, USAGE, [], 15, ParseMode.MARKDOWN)
async def get_users_by_userids(client: Client, uids) -> List[User]:
return await client.get_users(uids)
async def get_all_traced(client: Client) -> Dict:
uid_reactions = {
int(key.split("trace.user_id.")[1]): {"reactions": value}
for key, value in cached_sqlite.items()
if key.startswith("trace.user_id.")
}
user_info = await get_users_by_userids(client, uid_reactions.keys())
for user in user_info:
uid_reactions[user.id]["user"] = user
return uid_reactions
def count_offset(text: str) -> int:
return sum(
1
if c in SPECIAL_EMOJI or c not in SPECIAL_EMOJI and not emoji.is_emoji(c)
else 2
for c in text
)
def append_emoji_to_text(
text: str,
reaction_list: List[Union[ReactionEmoji, ReactionCustomEmoji]],
entities: List[MessageEntity],
):
if reaction_list is None:
return text, entities
text += "["
for reaction in reaction_list:
if type(reaction) is ReactionEmoji:
text += f"{reaction.emoticon}, "
elif type(reaction) is ReactionCustomEmoji:
entities.append(
MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=count_offset(text),
length=2,
custom_emoji_id=reaction.document_id,
)
)
text += "👋, "
else: # Would it reach here?
text += str(reaction)
text = text[:-2] + "]\n"
return text, entities
def get_keyword_emojis_from_message(message) -> Tuple[str, List[Union[str, int]]]:
return (message.parameter[0], get_emojis_from_message(message)) if message else None
def get_emojis_from_message(message: Message) -> Optional[List[Union[str, int]]]:
if not message:
return None
emoji_list = []
index = 0
entity_i = 0
# Parse input to preserve order.
# TODO: Can be more elegant
for c in message.text:
if len(emoji_list) == 3:
break
if emoji.is_emoji(c):
if (
message.entities
and len(message.entities) - 1 >= entity_i
and message.entities[entity_i].type == MessageEntityType.CUSTOM_EMOJI
and message.entities[entity_i].offset == index
):
emoji_list.append(message.entities[entity_i].custom_emoji_id)
entity_i += 1
else:
emoji_list.append(c)
index += 2
if c in SPECIAL_EMOJI:
index -= 1
else:
index += 1
return emoji_list
def get_name_and_username_from_message(message: Message):
other_name = ""
if message.reply_to_message.from_user.first_name:
other_name += message.reply_to_message.from_user.first_name
if message.reply_to_message.from_user.last_name:
other_name += message.reply_to_message.from_user.last_name
other_username = message.reply_to_message.from_user.username
return other_name, other_username
def append_username_to_text(
text: str,
other_name: str,
other_username: str,
entities: List[MessageEntity],
message: Message,
user: Optional[User] = None,
):
if other_username:
entities.append(
MessageEntity(
type=MessageEntityType.MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_username),
)
)
text += f" @{other_username}"
elif other_name:
if user:
entities.append(
MessageEntity(
type=MessageEntityType.TEXT_MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_name),
user=user,
)
)
else:
entities.append(
MessageEntity(
type=MessageEntityType.TEXT_MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_name),
user=message.reply_to_message.from_user,
)
)
text += f" {other_name}"
else:
text += "Some unknown ghost"
text += ": "
return text, entities
def new_bold_string_entities(text: str) -> Tuple[str, List[MessageEntity]]:
return append_bold_string("", text, [])
def append_bold_string(
text: str, append_text: str, entities: List[MessageEntity]
) -> Tuple[str, List[MessageEntity]]:
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=count_offset(append_text),
)
)
text += append_text
return text, entities
async def gen_reaction_list(emojis, bot: Client):
me = bot.me or await bot.get_me()
reaction_list = []
if not me.is_premium: # Remove custom emojis if not premium (will it happen?)
emojis = [x for x in emojis if type(x) is not int]
emojis = reduce(
lambda x, y: x if y in x else x + [y],
[
[],
]
+ emojis,
) # Remove replicated
for emoji in emojis:
if type(emoji) is int:
reaction_list.append(ReactionCustomEmoji(document_id=emoji))
elif type(emoji) is str and emoji in NATIVE_EMOJI:
reaction_list.append(ReactionEmoji(emoticon=emoji))
return reaction_list
def append_config(
text: str, entities: List[MessageEntity]
) -> Tuple[str, List[MessageEntity]]:
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=len(f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}"),
)
)
text += f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}"
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=len(f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}"),
)
)
text += f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}"
return text, entities
@listener(command="trace", need_admin=True, diagnostics=False, description=USAGE)
async def trace(bot: Client, message: Message):
"""
# For debug use
if len(message.parameter) and message.parameter[0] == "magicword":
return await message.edit(str(message))
"""
if len(message.parameter) == 0: # Either untrace someone or throw error
if (
message.reply_to_message is None
or message.reply_to_message.from_user is None
):
return await print_usage(message)
other_id = message.reply_to_message.from_user.id
if not cached_sqlite.get(f"trace.user_id.{other_id}", None):
return await edit_and_delete(
message, "This user is not in the traced list."
)
prev_emojis = cached_sqlite.get(f"trace.user_id.{other_id}", None)
del sqlite[f"trace.user_id.{other_id}"]
del cached_sqlite[f"trace.user_id.{other_id}"]
text, entities = new_bold_string_entities("Successfully untraced: \n")
other_name, other_username = get_name_and_username_from_message(message)
text, entities = append_username_to_text(
text, other_name, other_username, entities, message
)
text, entities = append_emoji_to_text(text, prev_emojis, entities)
return await edit_and_delete(
message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN
)
elif len(message.parameter) == 1:
if message.parameter[0] in ["status", "clean"]: # Get all traced info
traced_uids = await get_all_traced(bot)
text, entities = (
new_bold_string_entities("Traced userlist:\n")
if message.parameter[0] == "status"
else new_bold_string_entities("Successfully untraced: \n")
)
for traced_uid in traced_uids.keys():
other_name = ""
if traced_uids[traced_uid]["user"].first_name:
other_name += traced_uids[traced_uid]["user"].first_name
if traced_uids[traced_uid]["user"].last_name:
other_name += traced_uids[traced_uid]["user"].last_name
other_username = traced_uids[traced_uid]["user"].username
text, entities = append_username_to_text(
text,
other_name,
other_username,
entities,
message,
traced_uids[traced_uid]["user"],
)
text, entities = append_emoji_to_text(
text, traced_uids[traced_uid]["reactions"], entities
)
text, entities = append_bold_string(text, "\nTraced keywords: \n", entities)
if traced_keywords := cached_sqlite.get("trace.keywordlist", None):
for keyword in traced_keywords:
reaction_list = cached_sqlite.get(
f"trace.keyword.{keyword.encode().hex()}", None
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, reaction_list, entities)
if message.parameter[0] == "status":
text, entities = append_config(text, entities)
if message.parameter[0] == "clean":
for k, v in cached_sqlite:
if k.startswith("trace."):
del cached_sqlite[k]
del sqlite[k]
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
elif message.parameter[0] == "resettrace":
for k, v in cached_sqlite:
if k.startswith("trace."):
del cached_sqlite[k]
del sqlite[k]
return await edit_and_delete(
message,
"**Database has been reset.**",
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
else:
if emojis := get_emojis_from_message(message):
reaction_list = await gen_reaction_list(emojis, bot)
if reaction_list:
sqlite[
f"trace.user_id.{message.reply_to_message.from_user.id}"
] = reaction_list
cached_sqlite[
f"trace.user_id.{message.reply_to_message.from_user.id}"
] = reaction_list
await bot.invoke(
SendReaction(
peer=await bot.resolve_peer(int(message.chat.id)),
msg_id=message.reply_to_message_id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)
text = "Successfully traced: \n"
# TODO: Add username
text, entities = append_emoji_to_text(text, reaction_list, [])
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
return await edit_and_delete(message, "No valid emojis found!")
return await print_usage(message)
elif len(message.parameter) == 2: # log t|f; kw del
if message.parameter[0] == "log":
if message.parameter[1] == "true":
sqlite["trace.config.keep_log"] = True
cached_sqlite["trace.config.keep_log"] = True
elif message.parameter[1] == "false":
sqlite["trace.config.keep_log"] = False
cached_sqlite["trace.config.keep_log"] = False
else:
return await print_usage(message)
return await message.edit(
str(f"**Keep log: \n {cached_sqlite['trace.config.keep_log']}**")
)
if message.parameter[0] == "big":
if message.parameter[1] == "true":
sqlite["trace.config.big"] = True
cached_sqlite["trace.config.big"] = True
elif message.parameter[1] == "false":
sqlite["trace.config.big"] = False
cached_sqlite["trace.config.big"] = False
else:
return await print_usage(message)
return await message.edit(
str(f"**Use big : \n {cached_sqlite['trace.config.big']}**")
)
elif message.parameter[1] == "del":
keyword = message.parameter[0]
keyword_encoded_hex = keyword.encode().hex()
keywordlist = cached_sqlite["trace.keywordlist"]
if keyword not in keywordlist:
return await edit_and_delete(
message, f'Keyword "{keyword}" is not traced.\n{keywordlist}'
)
if not cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}"):
return await edit_and_delete(
message, f'Keyword "{keyword}" is not traced.'
)
prev_emojis = cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}")
keywordlist.remove(keyword)
sqlite["trace.keywordlist"] = keywordlist
cached_sqlite["trace.keywordlist"] = keywordlist
del sqlite[f"trace.keyword.{keyword_encoded_hex}"]
del cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"]
text, entities = new_bold_string_entities(
"Successfully untraced keyword: \n"
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, prev_emojis, entities)
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
else:
return await print_usage(message)
elif len(message.parameter) == 3:
keyword, emojis = get_keyword_emojis_from_message(message)
keyword_encoded_hex = keyword.encode().hex()
if keyword and len(emojis) != 0:
reaction_list = await gen_reaction_list(emojis, bot)
if reaction_list:
sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
if cached_sqlite.get("trace.keywordlist", None) is None:
sqlite["trace.keywordlist"] = [keyword]
cached_sqlite["trace.keywordlist"] = [keyword]
elif keyword not in cached_sqlite["trace.keywordlist"]:
cached_sqlite["trace.keywordlist"].append(keyword)
sqlite["trace.keywordlist"] = cached_sqlite["trace.keywordlist"]
text, entities = new_bold_string_entities(
"Successfully traced keyword: \n"
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, reaction_list, entities)
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
return await edit_and_delete(message, "No valid emojis found!")
else:
return await print_usage(message)
@listener(incoming=True, outgoing=False, ignore_edited=True)
async def trace_user(client: Client, message: Message):
if message.from_user is None:
return
with contextlib.suppress(Exception):
if reaction_list := cached_sqlite.get(
f"trace.user_id.{message.from_user.id}", None
):
await client.invoke(
SendReaction(
peer=await client.resolve_peer(int(message.chat.id)),
msg_id=message.id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)
@listener(incoming=True, outgoing=True, ignore_edited=True)
async def trace_keyword(client: Client, message: Message):
if message.from_user is None:
return
with contextlib.suppress(Exception):
if message.text:
if keyword_list := cached_sqlite.get("trace.keywordlist", None):
for keyword in keyword_list:
if keyword in message.text:
if reaction_list := cached_sqlite.get(
f"trace.keyword.{keyword.encode().hex()}", None
):
await client.invoke(
SendReaction(
peer=await client.resolve_peer(
int(message.chat.id)
),
msg_id=message.id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)