mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-25 11:58:06 +00:00
trace 根据用户或者关键词自动点赞消息 (Sourcery refactored) (#84)
Co-authored-by: Jiaqi Huang <jiaqihuang33@outlook.com> Co-authored-by: Sourcery AI <> Co-authored-by: omg-xtao <100690902+omg-xtao@users.noreply.github.com>
This commit is contained in:
parent
4fc6c1521d
commit
17a36b4c20
30
trace/README.md
Normal file
30
trace/README.md
Normal file
@ -0,0 +1,30 @@
|
||||
# Trace
|
||||
|
||||
### 用法
|
||||
|
||||
```
|
||||
Reply to a message:
|
||||
Trace : .trace 👍👎🥰
|
||||
Untrace : .trace
|
||||
Trace keyword: .trace kw add 👍👎🥰
|
||||
Del keyword : .trace kw del
|
||||
|
||||
List all : .trace status
|
||||
Untrace all: .trace clean
|
||||
Keep log : .trace log [true|false]
|
||||
```
|
||||
|
||||
#### 踩的坑
|
||||
|
||||
+ 对于一些emoji的字符长度为1,在SPECIAL_EMOJI中定义,不知道有没有写完,欢迎添加。
|
||||
|
||||
#### 接下来可能加的东西(更有可能摆)
|
||||
|
||||
+ 在某一段时间内随机react
|
||||
+ 优先级(keyword和user)
|
||||
+ 通过id/username添加
|
||||
+ 安全词(对方说了就停)
|
||||
+ 历史记录react
|
||||
+ big/sleep长度参数的设置
|
||||
+ 定时器
|
||||
+ 还有什么好玩的么?
|
395
trace/trace.py
Normal file
395
trace/trace.py
Normal file
@ -0,0 +1,395 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import contextlib
|
||||
from typing import Optional, List, Dict, Tuple, Union
|
||||
from functools import reduce
|
||||
|
||||
from pyrogram.enums import MessageEntityType, ParseMode
|
||||
from pyrogram.raw.functions.messages import SendReaction
|
||||
from pyrogram.raw.types import ReactionEmoji, ReactionCustomEmoji, User
|
||||
from pyrogram.types import MessageEntity
|
||||
|
||||
from pagermaid.listener import listener
|
||||
from pagermaid.enums import Client, Message
|
||||
from pagermaid.utils import sleep, pip_install
|
||||
from pagermaid.single_utils import sqlite
|
||||
|
||||
pip_install("emoji")
|
||||
import emoji
|
||||
|
||||
NATIVE_EMOJI = b'\xf0\x9f\x91\x8d\xf0\x9f\x91\x8e\xe2\x9d\xa4\xef\xb8\x8f\xf0\x9f\x94\xa5\xf0\x9f\xa5\xb0\xf0\x9f\x91\x8f\xf0\x9f\x98\x81\xf0\x9f\xa4\x94\xf0\x9f\xa4\xaf\xf0\x9f\x98\xb1\xf0\x9f\xa4\xac\xf0\x9f\x98\xa2\xf0\x9f\x8e\x89\xf0\x9f\xa4\xa9\xf0\x9f\xa4\xae\xf0\x9f\x92\xa9\xf0\x9f\x99\x8f\xf0\x9f\x91\x8c\xf0\x9f\x95\x8a\xf0\x9f\xa4\xa1\xf0\x9f\xa5\xb1\xf0\x9f\xa5\xb4\xf0\x9f\x98\x8d\xf0\x9f\x90\xb3\xe2\x9d\xa4\xef\xb8\x8f\xe2\x80\x8d\xf0\x9f\x94\xa5\xf0\x9f\x8c\x9a\xf0\x9f\x8c\xad\xf0\x9f\x92\xaf\xf0\x9f\xa4\xa3\xe2\x9a\xa1\xef\xb8\x8f\xf0\x9f\x8d\x8c\xf0\x9f\x8f\x86\xf0\x9f\x92\x94\xf0\x9f\xa4\xa8\xf0\x9f\x98\x90\xf0\x9f\x8d\x93\xf0\x9f\x8d\xbe\xf0\x9f\x92\x8b\xf0\x9f\x96\x95\xf0\x9f\x98\x88\xf0\x9f\x98\x82\xf0\x9f\x98\xad'.decode()
|
||||
SPECIAL_EMOJI = "❤⬅↔➡⬆↕⬇" # TO BE ADDED
|
||||
USAGE = f"""```Usage:
|
||||
Reply to a message:
|
||||
Trace : .trace 👍👎🥰
|
||||
Untrace : .trace
|
||||
Trace keyword: .trace kw add 👍👎🥰
|
||||
Del keyword : .trace kw del
|
||||
|
||||
List all : .trace status
|
||||
Untrace all: .trace clean
|
||||
Keep log : .trace log [true|false]
|
||||
```
|
||||
**Available native emojis:** {NATIVE_EMOJI}
|
||||
"""
|
||||
|
||||
keep_log = True
|
||||
cached_sqlite = {
|
||||
key: value for key, value in sqlite.items() if key.startswith("trace.")
|
||||
}
|
||||
|
||||
|
||||
async def edit_and_delete(message: Message, text: str, entities: List[MessageEntity] = None,
|
||||
seconds=5, parse_mode: ParseMode = ParseMode.DEFAULT):
|
||||
if entities is None:
|
||||
entities = []
|
||||
global keep_log
|
||||
await message.edit(text, entities=entities, parse_mode=parse_mode)
|
||||
if seconds == -1 or keep_log:
|
||||
return
|
||||
await sleep(seconds)
|
||||
return await message.delete()
|
||||
|
||||
|
||||
async def print_usage(message: Message):
|
||||
return await edit_and_delete(message, USAGE, [], 15, ParseMode.MARKDOWN)
|
||||
|
||||
|
||||
async def get_users_by_userids(client: Client, uids) -> List[User]:
|
||||
return await client.get_users(uids)
|
||||
|
||||
|
||||
async def get_all_traced(client: Client) -> Dict:
|
||||
uid_reactions = {
|
||||
int(key.split("trace.user_id.")[1]): {"reactions": value}
|
||||
for key, value in cached_sqlite.items()
|
||||
if key.startswith("trace.user_id.")
|
||||
}
|
||||
|
||||
user_info = await get_users_by_userids(client, uid_reactions.keys())
|
||||
for user in user_info:
|
||||
uid_reactions[user.id]["user"] = user
|
||||
return uid_reactions
|
||||
|
||||
|
||||
def count_offset(text: str) -> int:
|
||||
return sum(
|
||||
1 if c in SPECIAL_EMOJI or c not in SPECIAL_EMOJI and not emoji.is_emoji(c) else 2 for c in text
|
||||
)
|
||||
|
||||
|
||||
def append_emoji_to_text(text: str, reaction_list: List[Union[ReactionEmoji, ReactionCustomEmoji]],
|
||||
entities: List[MessageEntity]):
|
||||
text += "["
|
||||
for reaction in reaction_list:
|
||||
if type(reaction) is ReactionEmoji:
|
||||
text += f"{reaction.emoticon}, "
|
||||
elif type(reaction) is ReactionCustomEmoji:
|
||||
entities.append(MessageEntity(
|
||||
type=MessageEntityType.CUSTOM_EMOJI,
|
||||
offset=count_offset(text),
|
||||
length=2,
|
||||
custom_emoji_id=reaction.document_id
|
||||
))
|
||||
text += "👋, "
|
||||
else: # Would it reach here?
|
||||
text += str(reaction)
|
||||
text = text[:-2] + "]\n"
|
||||
return text, entities
|
||||
|
||||
|
||||
def get_keyword_emojis_from_message(message) -> Tuple[str, List[Union[str, int]]]:
|
||||
return (
|
||||
(message.parameter[0], get_emojis_from_message(message))
|
||||
if message
|
||||
else None
|
||||
)
|
||||
|
||||
|
||||
def get_emojis_from_message(message: Message) -> Optional[List[Union[str, int]]]:
|
||||
if not message:
|
||||
return None
|
||||
|
||||
emoji_list = []
|
||||
index = 0
|
||||
entity_i = 0
|
||||
# Parse input to preserve order.
|
||||
# TODO: Can be more elegant
|
||||
for c in message.text:
|
||||
if len(emoji_list) == 3:
|
||||
break
|
||||
if emoji.is_emoji(c):
|
||||
if message.entities \
|
||||
and len(message.entities) - 1 >= entity_i \
|
||||
and message.entities[entity_i].type == MessageEntityType.CUSTOM_EMOJI \
|
||||
and message.entities[entity_i].offset == index:
|
||||
emoji_list.append(message.entities[entity_i].custom_emoji_id)
|
||||
entity_i += 1
|
||||
else:
|
||||
emoji_list.append(c)
|
||||
index += 2
|
||||
if c in SPECIAL_EMOJI:
|
||||
index -= 1
|
||||
else:
|
||||
index += 1
|
||||
return emoji_list
|
||||
|
||||
|
||||
def get_name_and_username_from_message(message: Message):
|
||||
other_name = ""
|
||||
if message.reply_to_message.from_user.first_name:
|
||||
other_name += message.reply_to_message.from_user.first_name
|
||||
if message.reply_to_message.from_user.last_name:
|
||||
other_name += message.reply_to_message.from_user.last_name
|
||||
other_username = message.reply_to_message.from_user.username
|
||||
return other_name, other_username
|
||||
|
||||
|
||||
def append_username_to_text(text: str, other_name: str, other_username: str, entities: List[MessageEntity],
|
||||
message: Message, user: Optional[User] = None):
|
||||
if other_username:
|
||||
entities.append(MessageEntity(
|
||||
type=MessageEntityType.MENTION,
|
||||
offset=count_offset(text) + 2,
|
||||
length=count_offset(other_username),
|
||||
))
|
||||
text += f" @{other_username}"
|
||||
elif other_name:
|
||||
if user:
|
||||
entities.append(MessageEntity(
|
||||
type=MessageEntityType.TEXT_MENTION,
|
||||
offset=count_offset(text) + 2,
|
||||
length=count_offset(other_name),
|
||||
user=user
|
||||
))
|
||||
else:
|
||||
entities.append(MessageEntity(
|
||||
type=MessageEntityType.TEXT_MENTION,
|
||||
offset=count_offset(text) + 2,
|
||||
length=count_offset(other_name),
|
||||
user=message.reply_to_message.from_user
|
||||
))
|
||||
text += f" {other_name}"
|
||||
else:
|
||||
text += "Some unknown ghost"
|
||||
text += ": "
|
||||
return text, entities
|
||||
|
||||
|
||||
def new_bold_string_entities(text: str) -> Tuple[str, List[MessageEntity]]:
|
||||
return append_bold_string("", text, [])
|
||||
|
||||
|
||||
def append_bold_string(text: str, append_text: str, entities: List[MessageEntity]) -> Tuple[str, List[MessageEntity]]:
|
||||
entities.append(MessageEntity(
|
||||
type=MessageEntityType.BOLD,
|
||||
offset=count_offset(text),
|
||||
length=count_offset(append_text)
|
||||
))
|
||||
text += append_text
|
||||
return text, entities
|
||||
|
||||
|
||||
async def gen_reaction_list(emojis, bot: Client):
|
||||
me = bot.me or await bot.get_me()
|
||||
reaction_list = []
|
||||
if not me.is_premium: # Remove custom emojis if not premium (will it happen?)
|
||||
emojis = [x for x in emojis if type(x) is not int]
|
||||
emojis = reduce(lambda x, y: x if y in x else x + [y], [[], ] + emojis) # Remove replicated
|
||||
for emoji in emojis:
|
||||
if type(emoji) is int:
|
||||
reaction_list.append(ReactionCustomEmoji(document_id=emoji))
|
||||
elif type(emoji) is str and emoji in NATIVE_EMOJI:
|
||||
reaction_list.append(ReactionEmoji(emoticon=emoji))
|
||||
return reaction_list
|
||||
|
||||
|
||||
def append_log_status(text: str, entities: List[MessageEntity]) -> Tuple[str, List[MessageEntity]]:
|
||||
entities.append(
|
||||
MessageEntity(
|
||||
type=MessageEntityType.BOLD,
|
||||
offset=count_offset(text),
|
||||
length=len(f"\nKeep log: \n {keep_log}")
|
||||
)
|
||||
)
|
||||
text += f"\nKeep log: \n {keep_log}"
|
||||
return text, entities
|
||||
|
||||
|
||||
@listener(command="trace",
|
||||
need_admin=True,
|
||||
diagnostics=False,
|
||||
description=USAGE)
|
||||
async def trace(bot: Client, message: Message):
|
||||
global keep_log
|
||||
'''
|
||||
# For debug use
|
||||
if len(message.parameter) and message.parameter[0] == "magicword":
|
||||
return await message.edit(str(message))
|
||||
'''
|
||||
if len(message.parameter) == 0: # Either untrace someone or throw error
|
||||
if message.reply_to_message is None or message.reply_to_message.from_user is None:
|
||||
return await print_usage(message)
|
||||
other_id = message.reply_to_message.from_user.id
|
||||
if not cached_sqlite.get(f"trace.user_id.{other_id}", None):
|
||||
return await edit_and_delete(message, "This user is not in the traced list.")
|
||||
prev_emojis = cached_sqlite.get(f"trace.user_id.{other_id}", None)
|
||||
|
||||
del sqlite[f"trace.user_id.{other_id}"]
|
||||
del cached_sqlite[f"trace.user_id.{other_id}"]
|
||||
|
||||
text, entities = new_bold_string_entities("Sucessfully untraced: \n")
|
||||
other_name, other_username = get_name_and_username_from_message(message)
|
||||
text, entities = append_username_to_text(text, other_name, other_username, entities, message)
|
||||
text, entities = append_emoji_to_text(text, prev_emojis, entities)
|
||||
return await edit_and_delete(message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN)
|
||||
elif len(message.parameter) == 1:
|
||||
if message.parameter[0] in ["status", "clean"]: # Get all traced info
|
||||
traced_uids = await get_all_traced(bot)
|
||||
text, entities = (
|
||||
new_bold_string_entities("Traced userlist:\n")
|
||||
if message.parameter[0] == "status"
|
||||
else new_bold_string_entities("Sucessfully untraced: \n")
|
||||
)
|
||||
|
||||
for traced_uid in traced_uids.keys():
|
||||
if message.parameter[0] == "clean": # Delete all trace
|
||||
del sqlite[f"trace.user_id.{traced_uid}"]
|
||||
del cached_sqlite[f"trace.user_id.{traced_uid}"]
|
||||
other_name = ""
|
||||
if traced_uids[traced_uid]["user"].first_name:
|
||||
other_name += traced_uids[traced_uid]["user"].first_name
|
||||
if traced_uids[traced_uid]["user"].last_name:
|
||||
other_name += traced_uids[traced_uid]["user"].last_name
|
||||
other_username = traced_uids[traced_uid]["user"].username
|
||||
|
||||
text, entities = append_username_to_text(text, other_name, other_username, entities, message,
|
||||
traced_uids[traced_uid]["user"])
|
||||
text, entities = append_emoji_to_text(text, traced_uids[traced_uid]["reactions"], entities)
|
||||
|
||||
text, entities = append_bold_string(text, "\nTraced keywords: \n", entities)
|
||||
if traced_keywords := cached_sqlite.get("trace.keywordlist", None):
|
||||
for keyword in traced_keywords:
|
||||
reaction_list = cached_sqlite.get(f"trace.keyword.{keyword.encode().hex()}", None)
|
||||
if message.parameter[0] == "clean":
|
||||
del sqlite[f"trace.keyword.{keyword.encode().hex()}"]
|
||||
del cached_sqlite[f"trace.keyword.{keyword.encode().hex()}"]
|
||||
text += f" {keyword}: "
|
||||
text, entities = append_emoji_to_text(text, reaction_list, entities)
|
||||
if message.parameter[0] == "clean":
|
||||
del sqlite["trace.keywordlist"]
|
||||
del cached_sqlite["trace.keywordlist"]
|
||||
if message.parameter[0] == "status":
|
||||
text, entities = append_log_status(text, entities)
|
||||
return await edit_and_delete(message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
if emojis := get_emojis_from_message(message):
|
||||
reaction_list = await gen_reaction_list(emojis, bot)
|
||||
if reaction_list:
|
||||
sqlite[f"trace.user_id.{message.reply_to_message.from_user.id}"] = reaction_list
|
||||
cached_sqlite[f"trace.user_id.{message.reply_to_message.from_user.id}"] = reaction_list
|
||||
await bot.invoke(
|
||||
SendReaction(
|
||||
peer=await bot.resolve_peer(int(message.chat.id)),
|
||||
msg_id=message.reply_to_message_id,
|
||||
reaction=reaction_list,
|
||||
big=True
|
||||
)
|
||||
)
|
||||
text = "Successfully traced: \n"
|
||||
# TODO: Add username
|
||||
text, entities = append_emoji_to_text(text, reaction_list, [])
|
||||
return await edit_and_delete(message, text, entities=entities, seconds=5,
|
||||
parse_mode=ParseMode.MARKDOWN)
|
||||
return await edit_and_delete(message, "No valid emojis found!")
|
||||
return await print_usage(message)
|
||||
elif len(message.parameter) == 2: # log t|f; kw del
|
||||
if message.parameter[0] == "log":
|
||||
if message.parameter[1] == "true":
|
||||
keep_log = True
|
||||
elif message.parameter[1] == "false":
|
||||
keep_log = False
|
||||
else:
|
||||
return await print_usage(message)
|
||||
return await message.edit(str(f"**Keep log: \n {keep_log}**"))
|
||||
elif message.parameter[1] == "del":
|
||||
keyword = message.parameter[0]
|
||||
keyword_encoded_hex = keyword.encode().hex()
|
||||
if not cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}"):
|
||||
return await edit_and_delete(message, f"Keyword \"{keyword}\" is not traced.")
|
||||
prev_emojis = cached_sqlite.get(f"trace.keyword.{keyword_encoded_hex}")
|
||||
|
||||
del sqlite[f"trace.keyword.{keyword_encoded_hex}"]
|
||||
del cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"]
|
||||
|
||||
sqlite["trace.keywordlist"].remove(keyword)
|
||||
cached_sqlite["trace.keywordlist"].remove(keyword)
|
||||
|
||||
text, entities = new_bold_string_entities("Sucessfully untraced keyword: \n")
|
||||
text += f" {keyword}: "
|
||||
text, entities = append_emoji_to_text(text, prev_emojis, entities)
|
||||
return await edit_and_delete(message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN)
|
||||
else:
|
||||
return await print_usage(message)
|
||||
elif len(message.parameter) == 3:
|
||||
keyword, emojis = get_keyword_emojis_from_message(message)
|
||||
keyword_encoded_hex = keyword.encode().hex()
|
||||
if keyword and len(emojis) != 0:
|
||||
reaction_list = await gen_reaction_list(emojis, bot)
|
||||
if reaction_list:
|
||||
sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
|
||||
cached_sqlite[f"trace.keyword.{keyword_encoded_hex}"] = reaction_list
|
||||
|
||||
if cached_sqlite.get("trace.keywordlist", None) is None:
|
||||
sqlite["trace.keywordlist"] = [keyword]
|
||||
cached_sqlite["trace.keywordlist"] = [keyword]
|
||||
elif keyword not in cached_sqlite["trace.keywordlist"]:
|
||||
cached_sqlite["trace.keywordlist"].append(keyword)
|
||||
sqlite["trace.keywordlist"] = cached_sqlite["trace.keywordlist"]
|
||||
|
||||
text, entities = new_bold_string_entities("Successfully traced keyword: \n")
|
||||
text += f" {keyword}: "
|
||||
text, entities = append_emoji_to_text(text, reaction_list, entities)
|
||||
return await edit_and_delete(message, text, entities=entities, seconds=5, parse_mode=ParseMode.MARKDOWN)
|
||||
return await edit_and_delete(message, "No valid emojis found!")
|
||||
else:
|
||||
return await print_usage(message)
|
||||
|
||||
|
||||
@listener(incoming=True, outgoing=False, ignore_edited=True)
|
||||
async def trace_user(client: Client, message: Message):
|
||||
if message.from_user is None:
|
||||
return
|
||||
with contextlib.suppress(Exception):
|
||||
if reaction_list := cached_sqlite.get(
|
||||
f"trace.user_id.{message.from_user.id}", None
|
||||
):
|
||||
await client.invoke(
|
||||
SendReaction(
|
||||
peer=await client.resolve_peer(int(message.chat.id)),
|
||||
msg_id=message.id,
|
||||
reaction=reaction_list,
|
||||
big=True
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@listener(incoming=True, outgoing=True, ignore_edited=True)
|
||||
async def trace_keyword(client: Client, message: Message):
|
||||
if message.from_user is None:
|
||||
return
|
||||
with contextlib.suppress(Exception):
|
||||
if message.text:
|
||||
if keyword_list := cached_sqlite.get("trace.keywordlist", None):
|
||||
for keyword in keyword_list:
|
||||
if keyword in message.text:
|
||||
if reaction_list := cached_sqlite.get(
|
||||
f"trace.keyword.{keyword.encode().hex()}", None
|
||||
):
|
||||
await client.invoke(
|
||||
SendReaction(
|
||||
peer=await client.resolve_peer(int(message.chat.id)),
|
||||
msg_id=message.id,
|
||||
reaction=reaction_list,
|
||||
big=True
|
||||
)
|
||||
)
|
Loading…
Reference in New Issue
Block a user