PagerMaid_Plugins_Pyro/alpha/trace/main.py
xtaodada 0ad42876cd
Alpha: trace 根据用户或者关键词自动点赞消息
Co-authored-by: Jiaqi Huang <jiaqihuang33@outlook.com>
2023-07-01 20:35:13 +08:00

531 lines
20 KiB
Python

# -*- coding: utf-8 -*-
import contextlib
from typing import Optional, List, Dict, Tuple, Union
from functools import reduce
from pyrogram.enums import MessageEntityType, ParseMode
from pyrogram.raw.functions.messages import SendReaction
from pyrogram.raw.types import ReactionEmoji, ReactionCustomEmoji, User
from pyrogram.types import MessageEntity
from pagermaid.listener import listener
from pagermaid.enums import Client, Message
from pagermaid.utils import sleep, pip_install
from pagermaid.single_utils import sqlite
pip_install("emoji")
import emoji
NATIVE_EMOJI = b"\xf0\x9f\x91\x8d\xf0\x9f\x91\x8e\xe2\x9d\xa4\xef\xb8\x8f\xf0\x9f\x94\xa5\xf0\x9f\xa5\xb0\xf0\x9f\x91\x8f\xf0\x9f\x98\x81\xf0\x9f\xa4\x94\xf0\x9f\xa4\xaf\xf0\x9f\x98\xb1\xf0\x9f\xa4\xac\xf0\x9f\x98\xa2\xf0\x9f\x8e\x89\xf0\x9f\xa4\xa9\xf0\x9f\xa4\xae\xf0\x9f\x92\xa9\xf0\x9f\x99\x8f\xf0\x9f\x91\x8c\xf0\x9f\x95\x8a\xf0\x9f\xa4\xa1\xf0\x9f\xa5\xb1\xf0\x9f\xa5\xb4\xf0\x9f\x98\x8d\xf0\x9f\x90\xb3\xe2\x9d\xa4\xef\xb8\x8f\xe2\x80\x8d\xf0\x9f\x94\xa5\xf0\x9f\x8c\x9a\xf0\x9f\x8c\xad\xf0\x9f\x92\xaf\xf0\x9f\xa4\xa3\xe2\x9a\xa1\xef\xb8\x8f\xf0\x9f\x8d\x8c\xf0\x9f\x8f\x86\xf0\x9f\x92\x94\xf0\x9f\xa4\xa8\xf0\x9f\x98\x90\xf0\x9f\x8d\x93\xf0\x9f\x8d\xbe\xf0\x9f\x92\x8b\xf0\x9f\x96\x95\xf0\x9f\x98\x88\xf0\x9f\x98\x82\xf0\x9f\x98\xad".decode()
SPECIAL_EMOJI = "❤⬅↔➡⬆↕⬇" # TO BE ADDED
USAGE = f"""```Usage:
Reply to a message:
Trace : .trace 👍👎🥰
Untrace : .trace
Trace keyword: .trace kw add 👍👎🥰
Del keyword : .trace kw del
List all : .trace status
Untrace all: .trace clean
Keep log : .trace log [true|false]
Toggle big : .trace big [true|false]
Use with caution:
Reset all : .trace resettrace
```
**Available native emojis:** {NATIVE_EMOJI}
"""
cached_sqlite = {
key: value for key, value in sqlite.items() if key.startswith("trace.")
}
if cached_sqlite.get("trace.config.keep_log", None) is None:
sqlite["trace.config.keep_log"] = True
cached_sqlite["trace.config.keep_log"] = True
if cached_sqlite.get("trace.config.big", None) is None:
sqlite["trace.config.big"] = True
cached_sqlite["trace.config.big"] = True
async def edit_and_delete(
message: Message,
text: str,
entities: List[MessageEntity] = None,
seconds=5,
parse_mode: ParseMode = ParseMode.DEFAULT,
):
if entities is None:
entities = []
await message.edit(text, entities=entities, parse_mode=parse_mode)
if seconds == -1 or cached_sqlite["trace.config.keep_log"]:
return
await sleep(seconds)
return await message.delete()
async def print_usage(message: Message):
return await edit_and_delete(message, USAGE, [], 15, ParseMode.MARKDOWN)
async def get_users_by_userids(client: Client, uids) -> List[User]:
return await client.get_users(uids)
async def get_all_traced(client: Client) -> Dict:
uid_reactions = {
int(key.split("trace.user_id.")[1]): {"reactions": value}
for key, value in cached_sqlite.items()
if key.startswith("trace.user_id.")
}
user_info = await get_users_by_userids(client, uid_reactions.keys())
for user in user_info:
uid_reactions[user.id]["user"] = user
return uid_reactions
def count_offset(text: str) -> int:
return sum(
1
if c in SPECIAL_EMOJI or c not in SPECIAL_EMOJI and not emoji.is_emoji(c)
else 2
for c in text
)
def append_emoji_to_text(
text: str,
reaction_list: List[Union[ReactionEmoji, ReactionCustomEmoji]],
entities: List[MessageEntity],
):
if reaction_list is None:
return text, entities
text += "["
for reaction in reaction_list:
if type(reaction) is ReactionEmoji:
text += f"{reaction.emoticon}, "
elif type(reaction) is ReactionCustomEmoji:
entities.append(
MessageEntity(
type=MessageEntityType.CUSTOM_EMOJI,
offset=count_offset(text),
length=2,
custom_emoji_id=reaction.document_id,
)
)
text += "👋, "
else: # Would it reach here?
text += str(reaction)
text = text[:-2] + "]\n"
return text, entities
def get_keyword_emojis_from_message(message) -> Tuple[str, List[Union[str, int]]]:
return (message.parameter[0], get_emojis_from_message(message)) if message else None
def get_emojis_from_message(message: Message) -> Optional[List[Union[str, int]]]:
if not message:
return None
emoji_list = []
index = 0
entity_i = 0
# Parse input to preserve order.
# TODO: Can be more elegant
for c in message.text:
if len(emoji_list) == 3:
break
if emoji.is_emoji(c):
if (
message.entities
and len(message.entities) - 1 >= entity_i
and message.entities[entity_i].type == MessageEntityType.CUSTOM_EMOJI
and message.entities[entity_i].offset == index
):
emoji_list.append(message.entities[entity_i].custom_emoji_id)
entity_i += 1
else:
emoji_list.append(c)
index += 2
if c in SPECIAL_EMOJI:
index -= 1
else:
index += 1
return emoji_list
def get_name_and_username_from_message(message: Message):
other_name = ""
if message.reply_to_message.from_user.first_name:
other_name += message.reply_to_message.from_user.first_name
if message.reply_to_message.from_user.last_name:
other_name += message.reply_to_message.from_user.last_name
other_username = message.reply_to_message.from_user.username
return other_name, other_username
def append_username_to_text(
text: str,
other_name: str,
other_username: str,
entities: List[MessageEntity],
message: Message,
user: Optional[User] = None,
):
if other_username:
entities.append(
MessageEntity(
type=MessageEntityType.MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_username),
)
)
text += f" @{other_username}"
elif other_name:
if user:
entities.append(
MessageEntity(
type=MessageEntityType.TEXT_MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_name),
user=user,
)
)
else:
entities.append(
MessageEntity(
type=MessageEntityType.TEXT_MENTION,
offset=count_offset(text) + 2,
length=count_offset(other_name),
user=message.reply_to_message.from_user,
)
)
text += f" {other_name}"
else:
text += "Some unknown ghost"
text += ": "
return text, entities
def new_bold_string_entities(text: str) -> Tuple[str, List[MessageEntity]]:
return append_bold_string("", text, [])
def append_bold_string(
text: str, append_text: str, entities: List[MessageEntity]
) -> Tuple[str, List[MessageEntity]]:
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=count_offset(append_text),
)
)
text += append_text
return text, entities
async def gen_reaction_list(emojis, bot: Client):
me = bot.me or await bot.get_me()
reaction_list = []
if not me.is_premium: # Remove custom emojis if not premium (will it happen?)
emojis = [x for x in emojis if type(x) is not int]
emojis = reduce(
lambda x, y: x if y in x else x + [y],
[
[],
]
+ emojis,
) # Remove replicated
for emoji in emojis:
if type(emoji) is int:
reaction_list.append(ReactionCustomEmoji(document_id=emoji))
elif type(emoji) is str and emoji in NATIVE_EMOJI:
reaction_list.append(ReactionEmoji(emoticon=emoji))
return reaction_list
def append_config(
text: str, entities: List[MessageEntity]
) -> Tuple[str, List[MessageEntity]]:
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=len(f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}"),
)
)
text += f"\nKeep log: \n {cached_sqlite['trace.config.keep_log']}"
entities.append(
MessageEntity(
type=MessageEntityType.BOLD,
offset=count_offset(text),
length=len(f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}"),
)
)
text += f"\nUse big : \n {cached_sqlite['trace.config.keep_log']}"
return text, entities
@listener(command="trace", need_admin=True, diagnostics=False, description=USAGE)
async def trace(bot: Client, message: Message):
"""
# For debug use
if len(message.parameter) and message.parameter[0] == "magicword":
return await message.edit(str(message))
"""
if len(message.parameter) == 0: # Either untrace someone or throw error
if (
message.reply_to_message is None
or message.reply_to_message.from_user is None
):
return await print_usage(message)
other_id = message.reply_to_message.from_user.id
if not cached_sqlite.get(f"trace.user_id.{other_id}", None):
return await edit_and_delete(
message, "This user is not in the traced list."
)
prev_emojis = cached_sqlite.get(f"trace.user_id.{other_id}", None)
del sqlite[f"trace.user_id.{other_id}"]
del cached_sqlite[f"trace.user_id.{other_id}"]
text, entities = new_bold_string_entities("Successfully untraced: \n")
other_name, other_username = get_name_and_username_from_message(message)
text, entities = append_username_to_text(
text, other_name, other_username, entities, message
)
text, entities = append_emoji_to_text(text, prev_emojis, entities)
return await edit_and_delete(
message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN
)
elif len(message.parameter) == 1:
if message.parameter[0] in ["status", "clean"]: # Get all traced info
traced_uids = await get_all_traced(bot)
text, entities = (
new_bold_string_entities("Traced userlist:\n")
if message.parameter[0] == "status"
else new_bold_string_entities("Successfully untraced: \n")
)
for traced_uid in traced_uids.keys():
other_name = ""
if traced_uids[traced_uid]["user"].first_name:
other_name += traced_uids[traced_uid]["user"].first_name
if traced_uids[traced_uid]["user"].last_name:
other_name += traced_uids[traced_uid]["user"].last_name
other_username = traced_uids[traced_uid]["user"].username
text, entities = append_username_to_text(
text,
other_name,
other_username,
entities,
message,
traced_uids[traced_uid]["user"],
)
text, entities = append_emoji_to_text(
text, traced_uids[traced_uid]["reactions"], entities
)
text, entities = append_bold_string(text, "\nTraced keywords: \n", entities)
if traced_keywords := cached_sqlite.get("trace.keywordlist", None):
for keyword in traced_keywords:
reaction_list = cached_sqlite.get(
f"trace.keyword.{keyword.encode().hex()}", None
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, reaction_list, entities)
if message.parameter[0] == "status":
text, entities = append_config(text, entities)
if message.parameter[0] == "clean":
for k, v in cached_sqlite:
if k.startswith("trace."):
del cached_sqlite[k]
del sqlite[k]
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
elif message.parameter[0] == "resettrace":
for k, v in cached_sqlite:
if k.startswith("trace."):
del cached_sqlite[k]
del sqlite[k]
return await edit_and_delete(
message,
"**Database has been reset.**",
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
else:
if emojis := get_emojis_from_message(message):
reaction_list = await gen_reaction_list(emojis, bot)
if reaction_list:
sqlite[
f"trace.user_id.{message.reply_to_message.from_user.id}"
] = reaction_list
cached_sqlite[
f"trace.user_id.{message.reply_to_message.from_user.id}"
] = reaction_list
await bot.invoke(
SendReaction(
peer=await bot.resolve_peer(int(message.chat.id)),
msg_id=message.reply_to_message_id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)
text = "Successfully traced: \n"
# TODO: Add username
text, entities = append_emoji_to_text(text, reaction_list, [])
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
return await edit_and_delete(message, "No valid emojis found!")
return await print_usage(message)
elif len(message.parameter) == 2: # log t|f; kw del
if message.parameter[0] == "log":
if message.parameter[1] == "true":
sqlite["trace.config.keep_log"] = True
cached_sqlite["trace.config.keep_log"] = True
elif message.parameter[1] == "false":
sqlite["trace.config.keep_log"] = False
cached_sqlite["trace.config.keep_log"] = False
else:
return await print_usage(message)
return await message.edit(
str(f"**Keep log: \n {cached_sqlite['trace.config.keep_log']}**")
)
if message.parameter[0] == "big":
if message.parameter[1] == "true":
sqlite["trace.config.big"] = True
cached_sqlite["trace.config.big"] = True
elif message.parameter[1] == "false":
sqlite["trace.config.big"] = False
cached_sqlite["trace.config.big"] = False
else:
return await print_usage(message)
return await message.edit(
str(f"**Use big : \n {cached_sqlite['trace.config.big']}**")
)
elif message.parameter[1] == "del":
keyword = message.parameter[0]
keyword_encoded_hex = keyword.encode().hex()
keywordlist = cached_sqlite["trace.keywordlist"]
if keyword not in keywordlist:
return await edit_and_delete(
message, f'Keyword "{keyword}" is not traced.\n{keywordlist}'
)
if not cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}"):
return await edit_and_delete(
message, f'Keyword "{keyword}" is not traced.'
)
prev_emojis = cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}")
keywordlist.remove(keyword)
sqlite["trace.keywordlist"] = keywordlist
cached_sqlite["trace.keywordlist"] = keywordlist
del sqlite[f"trace.keyword.{keyword_encoded_hex}"]
del cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"]
text, entities = new_bold_string_entities(
"Successfully untraced keyword: \n"
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, prev_emojis, entities)
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
else:
return await print_usage(message)
elif len(message.parameter) == 3:
keyword, emojis = get_keyword_emojis_from_message(message)
keyword_encoded_hex = keyword.encode().hex()
if keyword and len(emojis) != 0:
reaction_list = await gen_reaction_list(emojis, bot)
if reaction_list:
sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
if cached_sqlite.get("trace.keywordlist", None) is None:
sqlite["trace.keywordlist"] = [keyword]
cached_sqlite["trace.keywordlist"] = [keyword]
elif keyword not in cached_sqlite["trace.keywordlist"]:
cached_sqlite["trace.keywordlist"].append(keyword)
sqlite["trace.keywordlist"] = cached_sqlite["trace.keywordlist"]
text, entities = new_bold_string_entities(
"Successfully traced keyword: \n"
)
text += f" {keyword}: "
text, entities = append_emoji_to_text(text, reaction_list, entities)
return await edit_and_delete(
message,
text,
entities=entities,
seconds=5,
parse_mode=ParseMode.MARKDOWN,
)
return await edit_and_delete(message, "No valid emojis found!")
else:
return await print_usage(message)
@listener(incoming=True, outgoing=False, ignore_edited=True)
async def trace_user(client: Client, message: Message):
if message.from_user is None:
return
with contextlib.suppress(Exception):
if reaction_list := cached_sqlite.get(
f"trace.user_id.{message.from_user.id}", None
):
await client.invoke(
SendReaction(
peer=await client.resolve_peer(int(message.chat.id)),
msg_id=message.id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)
@listener(incoming=True, outgoing=True, ignore_edited=True)
async def trace_keyword(client: Client, message: Message):
if message.from_user is None:
return
with contextlib.suppress(Exception):
if message.text:
if keyword_list := cached_sqlite.get("trace.keywordlist", None):
for keyword in keyword_list:
if keyword in message.text:
if reaction_list := cached_sqlite.get(
f"trace.keyword.{keyword.encode().hex()}", None
):
await client.invoke(
SendReaction(
peer=await client.resolve_peer(
int(message.chat.id)
),
msg_id=message.id,
reaction=reaction_list,
big=cached_sqlite["trace.config.big"],
)
)