PagerMaid_Plugins_Pyro/pmcaptcha/main.py
2022-07-04 22:12:59 +08:00

2678 lines
105 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""PMCaptcha v2 - A PagerMaid-Pyro plugin
v1 by xtaodata and cloudreflection
v2 by Sam
"""
import re
import time
import html
import asyncio
import inspect
import traceback
from dataclasses import dataclass, field
from typing import Optional, Callable, Union, List, Any, Dict, Coroutine
from pyrogram.errors import FloodWait, AutoarchiveNotAvailable, ChannelsAdminPublicTooMuch
from pyrogram.raw.functions.channels import UpdateUsername
from pyrogram.raw.types import GlobalPrivacySettings
from pyrogram.raw.functions.account import SetGlobalPrivacySettings, GetGlobalPrivacySettings
from pyrogram.enums.chat_type import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.raw.functions import messages
from pyrogram.types import User, Sticker
from pagermaid import bot, logs
from pagermaid.config import Config
from pagermaid.sub_utils import Sub
from pagermaid.utils import Message, alias_command
from pagermaid.listener import listener
from pagermaid.single_utils import sqlite
cmd_name = "pmcaptcha"
log_collect_bot = "CloudreflectionPmcaptchabot"
img_captcha_bot = "PagerMaid_Sam_Bot"
def sort_line_number(m):
try:
func = getattr(m[1], "__func__", m[1])
return func.__code__.co_firstlineno
except AttributeError:
return -1
async def log(message: str, remove_prefix: bool = False):
console.info(message.replace('`', '\"'))
Config.LOG and logging.send_log(message, remove_prefix)
def get_version():
from pagermaid import working_dir
from os import sep
from json import load
with open(f"{working_dir}{sep}plugins{sep}version.json", 'r', encoding="utf-8") as f:
version_json = load(f)
return version_json.get(cmd_name, lang('unknown_version'))
# region Text Formatting
def code(text: str) -> str:
return f"<code>{text}</code>"
def italic(text: str) -> str:
return f"<i>{text}</i>"
def bold(text: str) -> str:
return f"<b>{text}</b>"
def gen_link(text: str, url: str) -> str:
return f"<a href=\"{url}\">{text}</a>"
def str_timestamp(unix_ts: int) -> str:
import datetime
date_time = datetime.datetime.fromtimestamp(unix_ts, datetime.timezone(datetime.timedelta(hours=8)))
return date_time.strftime("%Y-%m-%dT%XZ%z")
# endregion
lang_dict = {
# region General
"settings_lists": [
"All Settings (Please refer to docs to learn more about):\n%s",
"所有设置(具体数值含义请自行查看文档):\n%s"
],
"cmd_err_run": [
f"Error occurred when running command: {code('%s')}: {code('%s')}\n{code('%s')}",
f"运行指令 {code('%s')} 时发生错误: {code('%s')}\n{code('%s')}",
],
"no_cmd_given": [
"Please use this command in private chat, or add parameters to execute.",
"请在私聊时使用此命令,或添加参数执行。"
],
"invalid_user_id": [
"Invalid User ID",
"未知用户或无效的用户 ID"
],
"invalid_param": [
"Invalid Parameter",
"无效的参数"
],
"enabled": [
"Enabled",
"开启"
],
"disabled": [
"Disabled",
"关闭"
],
"none": [
"None",
""
],
"tip_edit": [
f"You can edit this by using {code('%s')}",
f"如需编辑,请使用 {code('%s')}"
],
"tip_run_in_pm": [
"You can only run this command in private chat, or by adding parameters.",
"请在私聊使用此命令,或添加参数执行。"
],
# endregion
# region Plugin
"plugin_desc": [
"Captcha for PM",
"私聊人机验证插件"
],
"check_usage": [
"Please use %s to see available commands.",
"请使用 %s 查看可用命令"
],
"curr_version": [
f"Current {code('PMCaptcha')} Version: %s",
f"{code('PMCaptcha')} 当前版本:%s"
],
"unknown_version": [
italic("Unknown"),
italic("未知")
],
# endregion
# region Vocabs
"vocab_msg": [
"Message",
"消息"
],
"vocab_array": [
"List",
"列表"
],
"vocab_bool": [
"Boolean",
"y / n"
],
"vocab_int": [
"Integer",
"整数"
],
"vocab_cmd": [
"Command",
"指令"
],
"vocab_rule": [
"Rule",
"规则"
],
"vocab_action": [
"Action",
"操作"
],
# endregion
# region Captcha Challenge
"verify_verified": [
"Verified user",
"已验证用户"
],
"verify_unverified": [
"Unverified user",
"未验证用户"
],
"verify_blocked": [
"You were blocked.",
"您已被封禁"
],
"verify_log_punished": [
"User %s has been %s.",
"已对用户 %s 执行`%s`操作"
],
"verify_challenge": [
"Please answer this question to prove you are human (1 chance)",
"请回答这个问题证明您不是机器人 (一次机会)"
],
"verify_challenge_timed": [
"You have %i seconds.",
"您有 %i 秒来回答这个问题"
],
"verify_passed": [
"Verification passed.",
"验证通过"
],
"verify_failed": [
"Verification failed.",
"验证失败"
],
# Sticker
"verify_send_sticker": [
"Please send a sticker to me.",
"请发送一个贴纸给我"
],
# endregion
# region Help
"cmd_param": [
"Parameter",
"参数"
],
"cmd_param_optional": [
"Optional",
"可选"
],
"cmd_alias": [
"Alias",
"别名/快捷命令"
],
"cmd_detail": [
f"Do {code(f',{user_cmd_name} h ')}[command ] for details",
f"详细指令请输入 {code(f',{user_cmd_name} h ')}[指令名称 ]",
],
"cmd_not_found": [
"Command Not Found",
"指令不存在"
],
"cmd_list": [
"Command List",
"指令列表"
],
"priority": [
"Priority",
"优先级"
],
"cmd_search_result": [
f"Search Result for `%s`",
f"`%s` 的搜索结果"
],
"cmd_search_docs": [
"Documentation",
"文档"
],
"cmd_search_cmds": [
"Commands",
"指令"
],
"cmd_search_none": [
"No result found.",
"未找到结果"
],
# endregion
# region Check
"user_verified": [
f"User {code('%i')} {italic('verified')}",
f"用户 {code('%i')} {italic('已验证')}"
],
"user_unverified": [
f"User {code('%i')} {bold('unverified')}",
f"用户 {code('%i')} {bold('未验证')}"
],
# endregion
# region Add / Delete
"add_whitelist_success": [
f"User {code('%i')} added to whitelist",
f"用户 {code('%i')} 已添加到白名单"
],
"add_whitelist_failed": [
f"Failed to add iser {code('%i')} to whitelist",
f"无法添加用户 {code('%i')} 到白名单"
],
"remove_verify_log_success": [
f"Removed User {code('%i')}'s verify record",
f"已删除用户 {code('%i')} 的验证记录"
],
"remove_verify_log_failed": [
f"Failed to remove User {code('%i')}'s verify record.",
f"删除用户 {code('%i')} 的验证记录失败"
],
"remove_verify_log_not_found": [
f"Verify record not found for User {code('%i')}",
f"未找到用户 {code('%i')} 的验证记录"
],
# endregion
# region Unstuck
"unstuck_success": [
f"User {code('%i')} has removed from challenge mode",
f"用户 {code('%i')} 已解除验证状态"
],
"not_stuck": [
f"User {code('%i')} is not stuck",
f"用户 {code('%i')} 未在验证状态"
],
# endregion
# region Welcome
"welcome_curr_rule": [
"Current welcome rule",
"当前验证通过时消息规则"
],
"welcome_set": [
"Welcome message set.",
"已设置验证通过消息"
],
"welcome_reset": [
"Welcome message reset.",
"已重置验证通过消息"
],
# endregion
# region Whitelist
"whitelist_curr_rule": [
"Current whitelist rule",
"当前白名单规则"
],
"whitelist_set": [
"Keywords whitelist set.",
"已设置关键词白名单"
],
"whitelist_reset": [
"Keywords whitelist reset.",
"已重置关键词白名单"
],
# endregion
# region Blacklist
"blacklist_curr_rule": [
"Current blacklist rule",
"当前黑名单规则"
],
"blacklist_set": [
"Keywords blacklist set.",
"已设置关键词黑名单"
],
"blacklist_reset": [
"Keywords blacklist reset.",
"已重置关键词黑名单"
],
"blacklist_triggered": [
"Blacklist rule triggered",
"您触发了黑名单规则"
],
# endregion
# region Timeout
"timeout_curr_rule": [
"Current timeout: %i second(s)",
"当前超时时间: %i"
],
"timeout_set": [
"Verification timeout has been set to %i seconds.",
"已设置验证超时时间为 %i"
],
"timeout_off": [
"Verification timeout disabled.",
"已关闭验证超时时间"
],
"timeout_exceeded": [
"Verification timeout.",
"验证超时"
],
# endregion
# region Disable PM
"disable_pm_curr_rule": [
"Current disable PM status: %s",
"当前禁止私聊状态: 已%s"
],
"disable_pm_tip_exception": [
"This feature will automatically allow contents and whitelist users.",
"此功能会自动放行联系人与白名单用户"
],
"disable_set": [
f"Disable private chat has been set to {bold('%s')}.",
f"已设置禁止私聊为{bold('%s')}"
],
"disable_pm_enabled": [
"Owner has private chat disabled.",
"对方已禁止私聊。"
],
# endregion
# region Stats
"stats_display": [
"has verified %i users in total.\nSuccess: %i\nBlocked: %i",
"已进行验证 %i\n验证通过: %i\n拦截: %i"
],
"stats_reset": [
"Statistics has been reset.",
"已重置统计"
],
# endregion
# region Action
"action_curr_rule": [
"Current action rule",
"当前验证失败规则"
],
"action_set": [
f"Action has been set to {bold('%s')}.",
f"验证失败后将执行{bold('%s')}操作"
],
"action_set_none": [
"Action has been set to none.",
"验证失败后将不执行任何操作"
],
"action_ban": [
"Ban",
"封禁"
],
"action_delete": [
"Ban and delete",
"封禁并删除对话"
],
"action_archive": [
"Ban and archive",
"封禁并归档"
],
# endregion
# region Report
"report_curr_rule": [
"Current report state: %s",
"当前举报状态为: %s"
],
"report_set": [
f"Report has been set to {bold('%s')}.",
f"已设置举报状态为{bold('%s')}"
],
# endregion
# region Premium
"premium_curr_rule": [
"Current premium user rule",
"当前 Premium 用户规则"
],
"premium_set_allow": [
f"Telegram Premium users will be allowed to {bold('bypass')} the captcha.",
f"{bold('不对')} Telegram Premium 用户{bold('发起验证')}"
],
"premium_set_ban": [
f"Telegram Premium users will be {bold('banned')} from private chat.",
f"{bold('禁止')} Telegram Premium 用户私聊"
],
"premium_set_only": [
f"{bold('Only allowed')} Telegram Premium users to private chat.",
f"{bold('仅允许')} Telegram Premium 用户私聊"
],
"premium_set_none": [
"Nothing will do to Telegram Premium",
"将不对 Telegram Premium 用户执行额外操作"
],
"premium_only": [
"Owner only allows Telegram Premium users to private chat.",
"对方只允许 Telegram Premium 用户私聊"
],
"premium_ban": [
"Owner bans Telegram Premium users from private chat.",
"对方禁止 Telegram Premium 用户私聊"
],
# endregion
# region Groups In Common
"groups_in_common_curr_rule": [
"Current groups in common rule",
"当前共同群规则"
],
"groups_in_common_set": [
f"Groups in common larger than {bold('%i')} will be whitelisted.",
f"共同群数量大于 {bold('%i')} 时将自动添加到白名单"
],
"groups_in_common_disabled": [
"Group in command is not enabled",
"未开启共同群数量检测"
],
"groups_in_common_disable": [
"Groups in common disabled.",
"已关闭共同群检查"
],
# endregion
# region Chat History
"chat_history_curr_rule": [
f"Chat history equal or larger than {bold('%i')} will be whitelisted.",
f"聊天记录数量大于 {bold('%i')} 时将自动添加到白名单"
],
"chat_history_disabled": [
"Chat history check is not enabled",
"未开启聊天记录数量检测"
],
# endregion
# region Initiative
"initiative_curr_rule": [
"Current initiative status: %s",
"当前对主动进行对话的用户添加白名单状态为: %s"
],
"initiative_set": [
f"Initiative has been set to {bold('%s')}.",
f"已设置对主动进行对话的用户添加白名单状态为{bold('%s')}"
],
# endregion
# region Silent
"silent_curr_rule": [
"Current silent status: %s",
"当前静音状态: 已%s"
],
"silent_set": [
f"Silent has been set to {bold('%s')}.",
f"已设置静音模式为{bold('%s')}"
],
# endregion
# region Flood
"flood_curr_rule": [
"Current flood detect limit was set to %i user(s)",
"当前轰炸人数已设置为 %i"
],
# Username
"flood_username_curr_rule": [
"Current flood username option was set to %s",
"当前轰炸时切换用户名选项已设置为 %s"
],
"flood_username_set_confirm": [
(f"The feature may lose your username, are you sure you want to enable this feature?\n"
f"Please enter {code(f',{cmd_name} flood_username y')} again to confirm."),
f"此功能有可能会导致您的用户名丢失,您是否确定要开启此功能?\n请再次输入 {code(f',{cmd_name} flood_username y')} 来确认"
],
"flood_username_set": [
f"Change username in flood preiod has been %s.",
f"轰炸时切换用户名已%s"
],
"flood_channel_desc": [
("This channel is a placeholder of username, which the owner is being flooded.\n"
"Please content him later after this channel is gone."),
"这是一个用于临时设置用户名的频道,该群主正在被私聊轰炸\n请在此频道消失后再联系他。"
],
# Action
"flood_act_curr_rule": [
"Current flood action was set to %s",
"当前轰炸操作已设置为 %s"
],
"flood_act_set_asis": [
f"All users in flood period will be {bold('treat as verify failed')}.",
f"所有在轰炸期间的用户将会{bold('与验证失败的处理方式一致')}"
],
"flood_act_set_captcha": [
f"All users in flood period will be {bold('asked for captcha')}.",
f"所有在轰炸期间的用户将会{bold('进行验证码挑战')}"
],
"flood_act_set_none": [
"Nothing will do to users in flood period.",
"所有在轰炸期间的用户将不会被进行任何处理"
],
# endregion
# region Custom Rule
"custom_rule_curr_rule": [
"Current custom rule",
"当前自定义规则"
],
"custom_rule_set": [
f"Custom rule has been set to\n{code('%s')}.",
f"已设置自定义规则为\n{code('%s')}"
],
"custom_rule_reset": [
"Custom rule has been deleted.",
"已删除自定义规则"
],
"custom_rule_exec_err": [
"Error occurred when executing custom rule",
"执行自定义规则时发生错误"
],
# endregion
# region Collect Logs
"collect_logs_curr_rule": [
"Current collect logs status: %s",
"当前收集日志状态: 已%s"
],
"collect_logs_note": [
("This feature will only collect user information and chat logs of non-verifiers "
f"via @{log_collect_bot} , and is not provided to third parties (except @LivegramBot ).\n"
"Information collected will be used for PMCaptcha improvements, "
"toggling this feature does not affect the use of PMCaptcha."),
(f"此功能仅会通过 @{log_collect_bot} 收集未通过验证者的用户信息以及验证未通过的聊天记录;"
"且不会提供给第三方(@LivegramBot 除外)。\n收集的信息将用于 PMCaptcha 改进,开启或关闭此功能不影响 PMCaptcha 的使用。")
],
"collect_logs_set": [
"Collect logs has been set to %s.",
"已设置收集日志为 %s"
],
# endregion
# region Captcha Type
"type_curr_rule": [
"Current captcha type: %s",
"当前验证码类型: %s"
],
"type_set": [
f"Captcha type has been set to {bold('%s')}.",
f"已设置验证码类型为 {bold('%s')}"
],
"type_param_name": [
"Type",
"类型"
],
"type_captcha_img": [
"Image",
"图像辨识"
],
"type_captcha_math": [
"Math",
"计算"
],
"type_captcha_sticker": [
"Sticker",
"贴纸"
],
# endregion
# region Image Captcha Type
"img_captcha_curr_rule": [
"Current image captcha type: %s",
"当前图像验证类型: %s"
],
"img_captcha_type_func": [
"funCaptcha",
"funCaptcha",
],
"img_captcha_type_github": [
"GitHub",
"GitHub",
],
"img_captcha_type_rec": [
"reCaptcha",
"reCaptcha"
],
"img_captcha_retry_curr_rule": [
"Current max retry for image captcha: %s",
"当前图像验证码最大重试次数: %s"
],
"img_captcha_retry_set": [
"Max retry for image captcha has been set to %s.",
"已设置图像验证码最大重试次数为 %s"
],
# endregion
}
def lang(lang_id: str, lang_code: str = Config.LANGUAGE or "en") -> str:
lang_code = lang_code or "en"
return lang_dict.get(lang_id)[1 if lang_code.startswith("zh") else 0]
def lang_full(lang_id: str, *format_args):
return "\n".join(lang_str % format_args for lang_str in lang_dict[lang_id])
async def exec_api(coro_func: Coroutine):
for _ in range(3):
try:
return await coro_func
except FloodWait as e:
console.debug(f"Flood triggered, waiting for {e.value} seconds")
await asyncio.sleep(e.value)
except Exception as e:
console.error(f"API Call Failed: {e}\n{traceback.format_exc()}")
return None
@dataclass
class Log:
task: Optional[asyncio.Task] = None
queue: asyncio.Queue = field(default_factory=asyncio.Queue)
last_send_time: int = 0
async def worker(self):
while True:
text = None
try:
if int(time.time()) - self.last_send_time < 5:
await asyncio.sleep(5 - (int(time.time()) - self.last_send_time))
continue
(text,) = await self.queue.get()
await bot.send_message(Config.LOG_ID, text, ParseMode.HTML)
self.last_send_time = int(time.time())
except asyncio.CancelledError:
break
except FloodWait as e:
console.debug(f"Flood triggered when sending log, wait for {e.value}s")
await asyncio.sleep(e.value)
except Exception as e:
console.error(f"Error when sending log: {e}\n{traceback.format_exc()}")
finally:
text and self.queue.task_done()
def send_log(self, message: str, remove_prefix: bool):
if not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
message = message if remove_prefix else " ".join(("[PMCaptcha]", message))
self.queue.put_nowait((message,))
@dataclass
class Setting:
key_name: str
whitelist: Sub = field(init=False)
pending_ban_list: Sub = field(init=False)
pending_challenge_list: Sub = field(init=False)
def __post_init__(self):
self.whitelist = Sub("pmcaptcha.success")
self.pending_ban_list = Sub("pmcaptcha.pending_ban")
self.pending_challenge_list = Sub("pmcaptcha.pending_challenge")
def get(self, key: str, default=None):
if sqlite.get(self.key_name) is None:
return default
return sqlite[self.key_name].get(key, default)
def set(self, key: str, value: Any):
"""Set the value of a key in the database, return value"""
if (data := sqlite.get(self.key_name)) is None:
sqlite[self.key_name] = data = {}
data.update({key: value})
sqlite[self.key_name] = data
return value
def delete(self, key: str):
"""Delete a key in the database, if key exists"""
if self.get(key) is not None:
data = sqlite[self.key_name]
del data[key]
sqlite[self.key_name] = data
return self
def is_verified(self, user_id: int) -> bool:
return self.whitelist.check_id(user_id) or user_id == bot.me.id
# region Captcha Challenge
def get_challenge_state(self, user_id: int) -> dict:
return sqlite.get(f"{self.key_name}.challenge.{user_id}", {})
def set_challenge_state(self, user_id: int, state: dict):
sqlite[f"{self.key_name}.challenge.{user_id}"] = state
return state
def del_challenge_state(self, user_id: int):
key = f"{self.key_name}.challenge.{user_id}"
if sqlite.get(key):
del sqlite[key]
# endregion
# region Flood State
def get_flood_state(self) -> dict:
return sqlite.get(f"{self.key_name}.flood_state", {})
def set_flood_state(self, state: dict) -> dict:
sqlite[f"{self.key_name}.flood_state"] = state
return state
def del_flood_state(self):
key = f"{self.key_name}.flood_state"
if sqlite.get(key):
del sqlite[key]
# endregion
@dataclass
class Command:
user: User
msg: Message
# Regex
alias_rgx = r":alias: (.+)"
param_rgx = r":param (opt)?\s?(\w+):\s?(.+)"
# region Helpers (Formatting, User ID)
def _generate_markdown(self):
result = []
members = inspect.getmembers(self, inspect.iscoroutinefunction)
members.sort(key=sort_line_number)
for name, func in members:
if name.startswith("_"):
continue
result.append(self._extract_docs(func.__name__, func.__doc__ or '', True))
return "\n".join(result)
async def _run_command(self):
command = len(self.msg.parameter) > 0 and self.msg.parameter[0] or cmd_name
if not (func := self[command]):
return False, "NOT_FOUND", command
full_arg_spec = inspect.getfullargspec(func)
args_len = None if full_arg_spec.varargs else len(full_arg_spec.args)
cmd_args = self.msg.parameter[1:args_len]
func_args = []
for index, arg_type in enumerate(tuple(full_arg_spec.annotations.values())): # Check arg type
if args_len is None:
func_args = cmd_args
break
try:
if getattr(arg_type, "__origin__", None) == Union:
NoneType = type(None)
if (
len(arg_type.__args__) != 2
or arg_type.__args__[1] is not NoneType
):
continue
if len(cmd_args) - 1 > index and not cmd_args[index] or len(cmd_args) - 1 < index:
func_args.append(None)
continue
arg_type = arg_type.__args__[0]
func_args.append(arg_type(cmd_args[index]))
except ValueError:
return False, "INVALID_PARAM", tuple(full_arg_spec.annotations.keys())[index]
except IndexError: # No more args
await self.help(command)
return True, None, None
try:
await func(*func_args)
except Exception as e:
console.error(f"Error when running command {command}: {e}\n{traceback.format_exc()}")
await self.msg.edit(lang("cmd_err_run") % (self._get_user_cmd_input(), str(e), traceback.format_exc()))
return True, None, None
def _get_user_cmd_input(self) -> str:
return f",{user_cmd_name} {' '.join(self.msg.parameter)}"
def _extract_docs(self, subcmd_name: str, text: str, markdown: bool = False) -> str:
extras = []
if result := re.search(self.param_rgx, text):
is_optional = f"({italic(lang('cmd_param_optional'))} ) " if result[1] else ""
extras.extend(
(
f"{lang('cmd_param')}:",
f"{is_optional}{code(result[2].lstrip('_'))} - {result[3]}",
)
)
text = re.sub(self.param_rgx, "", text)
if result := re.search(self.alias_rgx, text):
alias = result[1].replace(" ", "").split(",")
alia_text = ", ".join(code(a) for a in alias)
extras.append(f"{lang('cmd_alias')}: {alia_text}")
text = re.sub(self.alias_rgx, "", text)
len(extras) and extras.insert(0, "")
cmd_display = code(f",{cmd_name} {self._get_cmd_with_param(subcmd_name)}".strip())
if markdown:
result = ["<details>",
f"<summary>{self._get_cmd_with_param(subcmd_name) or cmd_name} · {re.search(r'(.+)', self[subcmd_name].__doc__ or '')[1].strip()}</summary>",
"\n>\n", f"用法:{cmd_display}", "",
re.sub(r" {4,}", "", text).replace("{cmd_name}", cmd_name).strip().replace("\n", "\n\n"),
"\n\n".join(extras)]
result.extend(("", "---", "</details>"))
return "\n".join(result)
return "\n".join([cmd_display, re.sub(r" {4,}", "", text).replace("{cmd_name}", cmd_name).strip()] + extras)
def _get_cmd_with_param(self, subcmd_name: str) -> str:
if subcmd_name == cmd_name:
return ""
msg = subcmd_name
if result := re.search(self.param_rgx, getattr(self, msg).__doc__ or ''):
param = result[2].lstrip("_")
msg += f" [{param}]" if result[1] else html.escape(f" <{param}>")
return msg
def _get_mapped_alias(self, alias_name: str, ret_type: str):
# Get alias function
for name, func in inspect.getmembers(self, inspect.iscoroutinefunction):
if name.startswith("_"):
continue
if ((result := re.search(self.alias_rgx, func.__doc__ or "")) and
alias_name in result[1].replace(" ", "").split(",")):
return func if ret_type == "func" else name
async def _display_value(self, *, key: Optional[str] = None, display_text: str, sub_cmd: str, value_type: str):
text = [display_text, "", lang('tip_edit') % html.escape(f",{user_cmd_name} {sub_cmd} <{lang(value_type)}>")]
key and text.insert(0, lang(f"{key}_curr_rule") + ":")
return await self._edit("\n".join(text))
# Set On / Off Boolean
async def _set_toggle(self, key: str, toggle: str, *, reverse: bool = False):
if ((toggle := toggle.lower()[0]) not in ("y", "n", "t", "f", "1", "0") and
(toggle := toggle.lower()) not in ("on", "off")):
return await self.help(key)
toggle = toggle in ("y", "t", "1", "on")
not reverse and (toggle and setting.set(key, True) or setting.delete(key))
reverse and (toggle and setting.delete(key) or setting.set(key, False))
await self._edit(lang(f"{key}_set") % lang("enabled" if toggle else "disabled"))
async def _get_user_id(self, user_id: Union[str, int]) -> Optional[int]:
if not user_id and not self.msg.reply_to_message_id and self.msg.chat.type != ChatType.PRIVATE:
return
try:
if int(user_id) < 0:
return
except (ValueError, TypeError):
pass
user = None
user_id = user_id or self.msg.reply_to_message_id or (
self.msg.chat.type == ChatType.PRIVATE and self.msg.chat.id or 0)
if not user_id or not (user := await bot.get_users(user_id)) or (
user.is_bot or user.is_verified or user.is_deleted):
return
return user.id
# Set Black / White List
async def _set_list(self, _type: str, array: str):
if not array:
return await self._display_value(
key=_type,
display_text=code(setting.get(_type, lang('none'))),
sub_cmd=f"{_type[0]}l",
value_type="vocab_array")
if array.startswith("-c"):
setting.delete(_type)
return await self._edit(lang(f'{_type}_reset'))
setting.set(_type, array.replace(" ", "").split(","))
await self._edit(lang(f'{_type}_set'))
# endregion
async def _edit(self, msg: str):
text = "\n\n".join((f">>> {code(self._get_user_cmd_input())}", msg))
return await self.msg.edit_text(text, parse_mode=ParseMode.HTML, disable_web_page_preview=True)
def __getitem__(self, cmd: str) -> Optional[Callable]:
# Get subcommand function
if func := getattr(self, cmd, None):
return func
# Check for alias
if func := self._get_mapped_alias(cmd, "func"):
return func
return # Not found
def get(self, cmd: str, default=None):
return self[cmd] or default
async def pmcaptcha(self):
"""查询当前用户的验证状态"""
if not (user_id := await self._get_user_id(self.msg.chat.id)):
return await self._edit(lang('invalid_user_id'))
await self._edit(lang(f'verify_{"" if setting.is_verified(user_id) else "un"}verified'))
await asyncio.sleep(5)
await self.msg.safe_delete()
async def version(self):
"""查看 PMCaptcha 当前版本
:alias: v, ver
"""
await self._edit(f"{lang('curr_version') % get_version()}")
async def help(self, command: Optional[str], search_str: Optional[str] = None):
"""显示指令帮助信息,使用 <code>,{cmd_name} search [搜索内容]</code> 进行文档、指令(和别名)搜索
:param opt command: 命令名称
:param opt search_str: 搜索的文字,只有 <code>command</code< 为 <code>search</code> 时有效
:alias: h
"""
help_msg = [f"{code('PMCaptcha')} {lang('cmd_list')}:", ""]
footer = [
italic(lang('cmd_detail')),
"",
f"{lang('priority')}:\n{' > '.join(Rule._get_rules_priority())}",
"",
f"遇到任何问题请先 {code(',apt update')}{code(',restart')} 后复现再反馈",
(f"👉 {gen_link('捐赠网址', 'https://afdian.net/@xtaodada')} "
f"{gen_link('捐赠说明', 'https://t.me/PagerMaid_Modify/121')} "
f"(v{get_version()})"),
]
if command == "search": # Search for commands or docs
if not search_str:
return await self.help("h")
search_str = search_str.lower()
search_results = [lang('cmd_search_result') % search_str]
have_doc = False
have_cmd = False
for name, func in inspect.getmembers(self, inspect.iscoroutinefunction):
if name.startswith("_"):
continue
# Search for docs
docs = func.__doc__ or ""
if docs.lower().find(search_str) != -1:
not have_doc and search_results.append(f"{lang('cmd_search_docs')}:")
have_doc = True
search_results.append(self._extract_docs(func.__name__, docs))
# Search for commands
if name.find(search_str) != -1:
not have_cmd and search_results.append(f"{lang('cmd_search_cmds')}:")
have_cmd = True
search_results.append(
(code(f"- {code(self._get_cmd_with_param(name))}".strip())
+ f"\n· {re.search(r'(.+)', docs)[1].strip()}\n"))
# Search for aliases
elif result := re.search(self.alias_rgx, docs):
if search_str not in result[1].replace(" ", "").split(","):
continue
not have_cmd and search_results.append(f"{lang('cmd_search_cmds')}:")
have_cmd = True
search_results.append(
(f"* {code(search_str)} -> {code(self._get_cmd_with_param(func.__name__))}".strip()
+ f"\n· {re.search(r'(.+)', docs)[1].strip()}\n"))
len(search_results) == 1 and search_results.append(italic(lang('cmd_search_none')))
return await self._edit("\n\n".join(search_results))
elif command: # Single command help
func = getattr(self, command, self._get_mapped_alias(command, "func"))
return await (
self._edit(self._extract_docs(func.__name__, func.__doc__ or ''))
if func else self._edit(f"{lang('cmd_not_found')}: {code(command)}"))
members = inspect.getmembers(self, inspect.iscoroutinefunction)
members.sort(key=sort_line_number)
for name, func in members:
if name.startswith("_"):
continue
help_msg.append(
(code(f",{user_cmd_name} {self._get_cmd_with_param(name)}".strip())
+ f"\n· {re.search(r'(.+)', func.__doc__ or '')[1].strip()}\n"))
await self._edit("\n".join(help_msg + footer))
# region Checking User / Manual update
async def check(self, _id: Optional[str]):
"""查询指定用户验证状态,如未指定为当前私聊用户 ID
:param opt _id: 用户 ID
"""
if not (user_id := await self._get_user_id(_id)):
return await self._edit(lang('invalid_user_id'))
await self._edit(lang(f"user_{'' if setting.is_verified(user_id) else 'un'}verified") % user_id)
async def add(self, _id: Optional[str]):
"""将 ID 加入已验证,如未指定为当前私聊用户 ID
:param opt _id: 用户 ID
"""
if not (user_id := await self._get_user_id(_id)):
return await self._edit(lang('invalid_user_id'))
if captcha := curr_captcha.get(user_id): # This user is currently in challenge state
await captcha.action(True)
if curr_captcha.get(user_id):
del curr_captcha[user_id]
result = True
else:
result = setting.whitelist.add_id(user_id)
await bot.unarchive_chats(chat_ids=user_id)
await self._edit(lang(f"add_whitelist_{'success' if result else 'failed'}") % user_id)
async def delete(self, _id: Optional[str]):
"""移除 ID 验证记录,如未指定为当前私聊用户 ID
:param opt _id: 用户 ID
:alias: del
"""
if not (user_id := await self._get_user_id(_id)):
return await self._edit(lang('invalid_user_id'))
text = lang(f"remove_verify_log_{'success' if setting.whitelist.del_id(user_id) else 'not_found'}")
await self._edit(text % user_id)
# endregion
async def unstuck(self, _id: Optional[str]):
"""解除一个用户的验证状态,通常用于解除卡死的验证状态
:param opt _id: 用户 ID
"""
if not (user_id := await self._get_user_id(_id)):
return await self._edit(lang('invalid_user_id'))
captcha = None
if (state := setting.get_challenge_state(user_id)) or (captcha := curr_captcha.get(user_id)):
await CaptchaTask.archive(user_id, un_archive=True)
try:
(captcha and captcha.type or state.get("type", "math")) == "img" and await bot.unblock_user(user_id)
except Exception as e:
console.error(f"Error when unblocking user {user_id}: {e}\n{traceback.format_exc()}")
if captcha := curr_captcha.get(user_id):
captcha.timer_task and captcha.timer_task.cancel()
del curr_captcha[user_id]
state and setting.del_challenge_state(user_id)
return await self._edit(lang('unstuck_success') % user_id)
await self._edit(lang('not_stuck') % user_id)
async def welcome(self, *message: Optional[str]):
"""查看或设置验证通过时发送的消息
使用 <code>,{cmd_name} welcome -c</code> 可恢复默认规则
:param opt message: 消息内容
:alias: wel
"""
if not message:
return await self._display_value(
key="welcome",
display_text=code(setting.get('welcome', lang('none'))),
sub_cmd="wel",
value_type="vocab_msg")
message = " ".join(message)
if message.startswith("-c"):
setting.delete("welcome")
return await self._edit(lang('welcome_reset'))
setting.set("welcome", message)
await self._edit(lang('welcome_set'))
async def whitelist(self, array: Optional[str]):
"""查看或设置关键词白名单列表(英文逗号分隔)
使用 <code>,{cmd_name} whitelist -c</code> 可清空列表
:param opt array: 白名单列表 (英文逗号分隔)
:alias: wl, whl
"""
return await self._set_list("whitelist", array)
async def blacklist(self, array: Optional[str]):
"""查看或设置关键词黑名单列表 (英文逗号分隔)
使用 <code>,{cmd_name} blacklist -c</code> 可清空列表
:param opt array: 黑名单列表 (英文逗号分隔)
:alias: bl
"""
return await self._set_list("blacklist", array)
async def timeout(self, seconds: Optional[int], _type: Optional[str]):
"""查看或设置超时时间,默认为 <code>30</code> 秒、图像模式为 <code>5</code> 分钟
使用 <code>,{cmd_name} wait off</code> 可关闭验证时间限制
有关验证超时的默认选项:
- <code>math</code> | <code>30</code> 秒
- <code>img</code> | <code>5</code> 分钟
- <code>sticker</code> | <code>30</code> 秒
在图像模式中,此超时时间会于用户最后活跃而重置,
建议数值设置大一点让机器人有一个时间可以处理后端操作
:param opt seconds: 超时时间,单位秒
:param opt _type: 验证类型,默认为当前类型
:alias: wait
"""
if _type and _type not in ("math", "img", "sticker"):
return await self.help("wait")
captcha_type: str = _type or setting.get("type", "math")
key_name: str = {
"sticker": "sticker_timeout",
"img": "img_timeout",
"math": "timeout"
}.get(captcha_type)
default_timeout_time: int = {
"sticker": 30,
"img": 300,
"math": 30
}.get(captcha_type)
if seconds is None:
return await self._display_value(
display_text=lang('timeout_curr_rule') % int(setting.get(key_name, default_timeout_time)),
sub_cmd="wait",
value_type="vocab_int")
elif seconds == "off":
setting.delete(key_name)
return await self._edit(lang('timeout_off'))
if seconds < 0:
return await self._edit(lang('invalid_param'))
setting.set(key_name, seconds)
await self._edit(lang('timeout_set') % seconds)
async def disable_pm(self, toggle: Optional[str]):
"""启用 / 禁止陌生人私聊,默认为 <code>N</code> (允许私聊)
此功能会放行联系人和白名单(<i>已通过验证</i>)用户
您可以使用 <code>,{cmd_name} add</code> 将用户加入白名单
:param opt toggle: 开关 (y / n)
:alias: disablepm, disable
"""
if not toggle:
return await self._display_value(
display_text=lang('disable_pm_curr_rule') % lang('enabled' if setting.get('disable') else 'disabled'),
sub_cmd="disable_pm",
value_type="vocab_bool")
await self._set_toggle("disable", toggle)
async def stats(self, arg: Optional[str]):
"""查看验证统计
使用 <code>,{cmd_name} stats -c</code> 重置数据
:param opt arg: 参数 (reset)
"""
if not arg:
data = (setting.get('pass', 0) + setting.get('banned', 0), setting.get('pass', 0), setting.get('banned', 0))
await self.msg.edit_text(f"{code('PMCaptcha')} {lang('stats_display') % data}", parse_mode=ParseMode.HTML)
return
if arg.startswith("-c"):
setting.delete('pass').delete('banned')
return await self.msg.edit(lang('stats_reset'), parse_mode=ParseMode.HTML)
async def action(self, action: Optional[str]):
"""选择验证失败的处理方式,默认为 <code>none</code>
处理方式如下:
- <code>ban</code> | 封禁
- <code>delete</code> | 封禁并删除对话
- <code>none</code> | 不执行任何操作
:param opt action: 处理方式
:alias: act
"""
if not action:
action = setting.get("action", "none")
return await self._display_value(
key="action",
display_text=lang(f"action_{action == 'none' and 'set_none' or action}"),
sub_cmd="act",
value_type="vocab_action")
if action not in ("ban", "delete", "none"):
return await self.help("act")
if (action == "none" and setting.delete("action") or setting.set("action", action)) == action:
return await self._edit(lang('action_set') % lang(f'action_{action}'))
await self._edit(lang('action_set_none'))
async def report(self, toggle: Optional[str]):
"""选择验证失败后是否举报该用户,默认为 <code>N</code>
:param opt toggle: 开关 (y / n)
"""
if not toggle:
return await self._display_value(
display_text=lang('report_curr_rule') % lang('enabled' if setting.get('report') else 'disabled'),
sub_cmd="report",
value_type="vocab_bool")
await self._set_toggle("report", toggle)
async def premium(self, action: Optional[str]):
"""选择对 <b>Premium</b> 用户的操作,默认为 <code>none</code>
处理方式如下:
- <code>allow</code> | 白名单
- <code>ban</code> | 封禁
- <code>only</code> | 只允许
- <code>none</code> | 不执行任何操作
:param opt action: 处理方式
:alias: vip, prem
"""
if not action:
return await self._display_value(
key="premium",
display_text=lang(f'premium_set_{setting.get("premium", "none")}'),
sub_cmd="vip",
value_type="vocab_action")
if action not in ("allow", "ban", "only", "none"):
return await self.help("vip")
action == "none" and setting.delete("action") or setting.set("action", action)
await self._edit(lang(f'premium_set_{action}'))
async def groups_in_common(self, count: Optional[int]):
"""设置是否对拥有一定数量的共同群的用户添加白名单
使用 <code>,{cmd_name} groups -1</code> 重置设置
:param opt count: 共同群数量
:alias: group, groups, common
"""
if not count:
groups = setting.get('groups_in_common')
text = lang(f"groups_in_common_{'set' if groups is not None else 'disabled'}")
if groups is not None:
text = text % groups
return await self._display_value(
display_text=text,
sub_cmd="groups",
value_type="vocab_int")
if count == -1:
setting.delete('groups_in_common')
return await self._edit(lang('groups_in_common_disable'))
elif count < 0:
return await self.help("groups_in_common")
setting.set('groups_in_common', count)
await self._edit(lang('groups_in_common_set') % count)
async def chat_history(self, count: Optional[int]):
"""设置对拥有一定数量的聊天记录的用户添加白名单(触发验证的信息不计算在内)
使用 <code>,{cmd_name} his -1</code> 重置设置
<b>请注意,由于 Telegram 内部限制,信息获取有可能会不完整,请不要设置过大的数值</b>
:param opt count: 聊天记录数量
:alias: his, history
"""
if not count:
history_count = setting.get('history_count')
text = lang("chat_history_curr_rule" if history_count is not None else "chat_history_disabled")
if history_count is not None:
text = text % history_count
return await self._display_value(
display_text=text,
sub_cmd="his",
value_type="vocab_bool")
setting.set('history_count', count)
await self._edit(lang('chat_history_curr_rule') % count)
async def initiative(self, toggle: Optional[str]):
"""设置对主动进行对话的用户添加白名单,默认为 <code>N</code>
:param opt toggle: 开关 (y / n)
"""
if not toggle:
return await self._display_value(
display_text=lang('initiative_curr_rule') % lang(
'enabled' if setting.get('initiative', False) else 'disabled'),
sub_cmd="initiative",
value_type="vocab_bool")
await self._set_toggle("initiative", toggle)
async def silent(self, toggle: Optional[str]):
"""减少信息发送,默认为 <code>N</code>
开启后,封禁、验证成功提示(包括欢迎信息)信息将不会发送
(并不会影响到 <code>log</code> 发送)
:param opt toggle: 开关 (y / n)
:alias: quiet
"""
if not toggle:
return await self._display_value(
display_text=lang('silent_curr_rule') % lang('enabled' if setting.get('silent', False) else 'disabled'),
sub_cmd="quiet",
value_type="vocab_bool")
await self._set_toggle("silent", toggle)
async def flood(self, limit: Optional[int]):
"""设置一分钟内超过 <code>n</code> 人开启轰炸检测机制,默认为 <code>50</code> 人
此机制会在用户被轰炸时启用,持续 <code>5</code> 分钟,假如有用户继续进行私聊计时将会重置
当轰炸开始时,<code>PMCaptcha</code> 将会启动以下一系列机制
- 强制开启自动归档(无论是否 <code>Telegram Premium</code> 用户都会尝试开启)
- 不向用户发送 <code>CAPTCHA</code> 挑战
- 继上面的机制,记录未发送 <code>CAPTCHA</code> 的用户 ID
- (用户可选)创建临时频道,并把用户名转移到创建的频道上 【默认关闭】
轰炸结束后,如果用户名已转移到频道上,将恢复用户名,并删除频道
并对记录收集机器人发送轰炸的<code>用户数量</code>、<code>轰炸开始时间</code>、<code>轰炸结束时间</code>、<code>轰炸时长</code>(由于不存在隐私问题,此操作为强制性)
请参阅 <code>,{cmd_name} h flood_username</code> 了解更多有关创建临时频道的机制
请参阅 <code>,{cmd_name} h flood_act</code> 查看轰炸结束后的处理方式
:param opt limit: 人数限制
:alias: boom
"""
if not limit:
return await self._display_value(
display_text=lang('flood_curr_rule') % setting.get('flood_limit', 50),
sub_cmd="flood",
value_type="vocab_int")
setting.set('flood_limit', limit)
await self._edit(lang('flood_curr_rule') % limit)
async def flood_username(self, toggle: Optional[str]):
"""设置是否在轰炸时启用“转移用户名到临时频道”机制(如有用户名)
将此机制分开出来的原因是此功能有可能会被抢注用户名<i>(虽然经测试并不会出现此问题)</i>
但为了万一依然分开出来作为一个选项了
启用后,在轰炸机制开启时,会进行以下操作
- 创建临时频道
- (如创建成功)清空用户名,设置用户名为临时频道,并在频道简介设置正在受到轰炸提示
- (如设置失败)恢复用户名,删除频道
注意:<b>请预留足够的公开群用户名设置额度,否则将不会设置成功,但同时用户名也不会被清空</b>
:param opt toggle: 开关 (y / n)
:alias: boom_username
"""
global user_want_set_flood_username
if not toggle:
return await self._display_value(
display_text=lang('flood_username_curr_rule') % lang(
'enabled' if setting.get('flood_username', False) else 'disabled'),
sub_cmd="flood_username",
value_type="vocab_bool")
if toggle in ("y", "t", "1", "on") and not user_want_set_flood_username:
user_want_set_flood_username = True
return await self._edit(lang('flood_username_set_confirm'))
user_want_set_flood_username = None
await self._set_toggle("flood_username", toggle)
async def flood_act(self, action: Optional[str]):
"""设置轰炸结束后进行的处理方式,默认为 <code>none</code>
可用的处理方式如下:
- <code>asis</code> | 与验证失败的处理方式一致但不会进行验证失败通知以及发送log记录
- <code>captcha</code> | 对每个用户进行 <code>CAPTCHA</code> 挑战
- <code>none</code> | 不进行任何操作
:param opt action: 处理方式
:alias: boom_act
"""
if not action:
return await self._display_value(
display_text=lang('flood_act_curr_rule') % lang(f"flood_act_set_{setting.get('flood_act', 'none')}"),
sub_cmd="flood_act",
value_type="vocab_action")
if action not in ("asis", "captcha", "none"):
return await self.help("flood_act")
action == "none" and setting.delete("flood_act") or setting.set("flood_act", action)
await self._edit(lang(f'flood_act_set_{action}'))
async def custom_rule(self, *rule: Optional[str]):
"""用户自定义过滤规则,返回<code>True</code>为白名单,否则继续执行下面的规则
使用 <code>,{cmd_name} custom_rule -c</code> 可删除规则
注意事项:
- 返回<code>True</code>并不代表添加到白名单,只是停止继续执行规则
- 规则发送错误默认返回<code>False</code>(继续执行规则),并透过<code>log</code>输出错误信息
- <b>由于此指令能够直接操作账号,因此请在输入他人给与的规则前先亲自确认是否安全</b>
可用参数:
- <code>msg</code> | 触发验证的信息
- <code>text</code> | 触发验证的信息的文本,永远不为<code>None</code>
- <code>user</code> | 用户
- <code>me</code> | 机器人用户(自己)
范例:
<code>text == "BYPASS"</code>
解释:
当文字为“BYPASS”时认为是白名单用户不继续执行规则
:param rule: 规则
"""
if not rule:
return await self._display_value(
key="custom_rule",
display_text=code(setting.get('custom_rule', lang('none'))),
sub_cmd="custom_rule",
value_type="vocab_rule")
rule = " ".join(rule)
if rule.startswith("-c"):
setting.delete("welcome")
return await self._edit(lang('custom_rule_reset'))
setting.set("custom_rule", rule)
await self._edit(lang('custom_rule_set') % rule)
async def collect_logs(self, toggle: Optional[str]):
"""查看或设置是否允许 <code>PMCaptcha</code> 收集验证错误相关信息以帮助改进
默认为 <code>Y</code>,收集的信息包括被验证者的信息以及未通过验证的信息记录
:param opt toggle: 开关 (y / n)
:alias: collect, log
"""
if not toggle:
status = lang('enabled' if setting.get('collect_logs', True) else 'disabled')
return await self._display_value(
display_text=f"{lang('collect_logs_curr_rule') % status}\n{lang('collect_logs_note')}",
sub_cmd="log",
value_type="vocab_bool")
await self._set_toggle("collect_logs", toggle, reverse=True)
async def change_type(self, _type: Optional[str]):
"""切换验证码类型,默认为 <code>math</code>
验证码类型如下:
- <code>math</code> | 计算验证
- <code>img</code> | 图像辨识验证
- <code>sticker</code> | 贴纸验证(发送贴纸即可)
<b>注意:如果图像验证不能使用将回退到计算验证</b>
:param opt _type: 验证码类型
:alias: type, typ
"""
if not _type:
return await self._display_value(
display_text=lang('type_curr_rule') % lang(f'type_captcha_{setting.get("type", "math")}'),
sub_cmd="typ",
value_type="type_param_name")
if _type not in ("img", "math", "sticker"):
return await self.help("typ")
_type == "math" and setting.delete("type") or setting.set("type", _type)
await self._edit(lang('type_set') % lang(f'type_captcha_{_type}'))
async def show_settings(self):
"""显示目前所有的设置
:alias: settings, setting
"""
settings_text = []
text_none = bold(lang('none'))
for key, default in (
("whitelist", text_none),
("blacklist", text_none),
("timeout", 300 if setting.get("type") == "img" else 30),
("disable_pm", bold(lang('disabled'))),
("action", bold(lang("action_set_none"))),
("report", bold(lang('disabled'))),
("premium", bold(lang('premium_set_none'))),
("groups_in_common", text_none),
("chat_history", -1),
("initiative", bold(lang("disabled"))),
("silent", bold(lang("disabled"))),
("flood", 5),
("flood_username", bold(lang("disabled"))),
("flood_act", bold(lang("flood_act_set_none"))),
("collect_logs", bold(lang("enabled"))),
("type", bold(lang("type_captcha_math"))),
("img_captcha", bold(lang("img_captcha_type_func"))),
("img_captcha_retry", 3),
("custom_rule", text_none),
("welcome", text_none)
):
lang_text = lang(f'{key}_curr_rule')
# Timeout (rule: timeout, value: [multiple])
if key == "timeout":
captcha_type = setting.get("type", "math")
key: str = {
"sticker": "sticker_timeout",
"img": "img_timeout",
"math": "timeout"
}.get(captcha_type)
default: int = {
"sticker": 30,
"img": 300,
"math": 30
}.get(captcha_type)
# Disable (rule: disable_pm, val: disable)
elif key == "disable_pm":
key = "disable"
# Chat History (rule: chat_history, val: history_count)
elif key == "chat_history":
key = "history_count"
# Flood (key: flood, val: flood_limit)
elif key == "flood":
key = "flood_limit"
# Image Type (rule: img_captcha_type, val: img_type)
elif key == "img_captcha":
key = "img_type"
# Image Retry Chance (rule: img_captcha_retry, value: img_max_retry)
elif key == "img_captcha_retry":
key = "img_max_retry"
value = setting.get(key, default)
if isinstance(value, bool):
value = bold(lang(f'enabled' if value else 'disabled'))
elif key == "premium":
value = "\n" + value
if lang_text.find("%") != -1:
lang_text = lang_text % value
else:
lang_text += f": {bold(str(value))}"
settings_text.append(lang_text)
await self._edit(lang('settings_lists') % "\n".join(settings_text))
# Image Captcha
async def change_img_type(self, _type: Optional[str]):
"""切换图像辨识使用接口,默认为 <code>func</code>
目前可用的接口:
- <code>func</code> (<i>ArkLabs funCaptcha</i> )
- <code>github</code> (<i>GitHub 螺旋星系</i> )
- <code>rec</code> (<i>Google reCAPTCHA</i> )
请注意, <code>reCAPTCHA</code> 难度相比前两个<a href="https://t.me/c/1441461877/958395">高出不少</a>
因此验证码系统会在尝试过多后提供 <code>funCaptcha</code> 接口让用户选择
:param opt _type: 验证码类型
:alias: img_type, img_typ
"""
if not _type:
return await self._display_value(
display_text=lang('type_curr_rule') % lang(f'img_captcha_type_{setting.get("img_type", "func")}'),
sub_cmd="img_typ",
value_type="type_param_name")
if _type not in ("func", "github", "rec"):
return await self.help("img_typ")
_type == "func" and setting.delete("img_type") or setting.set("img_type", _type)
await self._edit(lang('type_set') % lang(f'img_captcha_type_{_type}'))
async def img_retry_chance(self, number: Optional[int]):
"""图形验证码最大可重试次数,默认为 <code>3</code>
:param opt number: 重试次数
:alias: img_re
"""
if number is None:
return await self._display_value(
display_text=lang('img_captcha_retry_curr_rule') % setting.get("img_max_retry", 3),
sub_cmd="img_re",
value_type="vocab_int")
if number < 0:
return await self._edit(lang('invalid_param'))
setting.set("img_max_retry", number)
await self._edit(lang('img_captcha_retry_set') % number)
# region Captcha
@dataclass
class TheOrder:
"""Worker of blocking user (Punishment)"""
queue = asyncio.Queue()
task: Optional[asyncio.Task] = None
flood_text = "[The Order] Flood Triggered: wait %is, Command: %s, Target: %s"
def __post_init__(self):
if pending := setting.pending_ban_list.get_subs():
console.debug(f"Pending user(s) to ban: {len(pending)}")
if len(pending) > 0 and not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
for user_id in pending:
self.queue.put_nowait((user_id, False))
async def worker(self):
console.debug("Punishment Worker started")
while True:
target = None
try:
target, skip_log = await self.queue.get()
action = setting.get("action", "none")
if action in ("ban", "delete"):
if not await exec_api(bot.block_user(user_id=target)):
console.debug(f"Failed to block user {target}")
if action == "delete":
if not await exec_api(bot.invoke(messages.DeleteHistory(
just_clear=False,
revoke=False,
peer=await bot.resolve_peer(target),
max_id=0))):
console.debug(f"Failed to delete user chat {target}")
setting.pending_ban_list.del_id(target)
setting.get_challenge_state(target) and setting.del_challenge_state(target)
setting.set("banned", setting.get("banned", 0) + 1)
chat_link = gen_link(str(target), f"tg://user?id={target}")
text = f"[PMCaptcha - The Order] {lang('verify_log_punished')} (Punishment)"
(not skip_log and action not in ("none", "archive") and
await log(text % (chat_link, lang(f'action_{action}')), True))
skip_log and console.debug(text % (chat_link, lang(f'action_{action}')))
except asyncio.CancelledError:
break
except Exception as e:
await log(f"Error occurred when punishing user: {e}\n{traceback.format_exc()}")
finally:
target and self.queue.task_done()
async def active(self, user_id: int, reason_code: str):
if not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
try:
not setting.get("silent") and await bot.send_message(user_id, "\n\n".join((
lang_full(reason_code), lang_full("verify_blocked")
)))
except FloodWait:
pass # Skip waiting
finally:
setting.pending_ban_list.add_id(user_id)
await self.queue.put((user_id, False))
console.debug(f"User {user_id} added to ban queue")
@dataclass
class TheWorldEye:
"""Anti-Flooding System
Actual name of each functions:
- sophitia -> Watcher
- synchronize -> flood_triggered
- overload -> flood_ended
"""
queue = asyncio.Queue()
watcher: Optional[asyncio.Task] = None
timer_task: Optional[asyncio.Task] = None
# Watcher
last_challenge_time: Optional[int] = None
level: int = 0
# Post Init Value
channel_id: Optional[int] = None
username: Optional[str] = None
triggered: bool = False
start: Optional[int] = None
update: Optional[int] = None
end: Optional[int] = None
user_ids: Optional[list] = field(init=False)
auto_archive_enabled_default: Optional[bool] = None
def __post_init__(self):
self.user_ids = []
if state := setting.get_flood_state(): # PMCaptcha restarts, flood keeps going
# Resume last flood state
now = int(time.time())
self.triggered = True
self.channel_id = state.get("channel_id")
self.username = state.get("username")
self.start = state.get("start")
self.update = state.get("update")
self.user_ids = state.get("user_ids")
self.auto_archive_enabled_default = state.get("auto_archive_enabled_default")
self.reset_timer(300 - (now - self.start))
console.debug("PMCaptcha restarted, flood state resume")
self.watcher = asyncio.create_task(self.sophitia())
# region Timer
async def _flood_timer(self, interval: int):
try:
await asyncio.sleep(interval)
except asyncio.CancelledError:
return
console.debug("Flood ends")
self.triggered = False
self.end = int(time.time())
await self.overload()
def reset_timer(self, interval: int = 300):
if self.timer_task:
self.timer_task.cancel()
self.update = int(time.time())
self.timer_task = asyncio.create_task(self._flood_timer(interval))
console.debug("Flood timer reset")
return self
# endregion
async def _set_channel_username(self):
console.debug("Creating temporary channel")
try:
channel = await bot.create_channel(
"PMCaptcha Temporary Channel",
"\n\n".join((lang("flood_channel_desc", "en"), lang("flood_channel_desc", "zh"))))
console.debug("Temporary channel created")
self.channel_id = channel.id
except Exception as e:
await log(f"Failed to create temporary channel: {e}\n{traceback.format_exc()}")
return False
console.debug("Moving username to temporary channel")
try:
await bot.set_username(None)
except Exception as e:
await log(f"Failed to remove username: {e}\n{traceback.format_exc()}")
return False
result = False
try:
await bot.invoke(UpdateUsername(channel=await bot.resolve_peer(channel.id), username=self.username))
result = True
except ChannelsAdminPublicTooMuch:
await log("Failed to move username to temporary channel, too many public channels")
except Exception as e:
await log(f"Failed to set username for channel: {e}\n{traceback.format_exc()}")
if not result:
console.debug("Setting back username")
try:
await bot.set_username(self.username)
await bot.delete_supergroup(channel.id)
except Exception as e:
await log(f"Failed to set username back: {e}\n{traceback.format_exc()}")
self.username = None
return result
async def _restore_username(self):
if self.channel_id:
console.debug("Deleting temporary channel")
try:
await bot.delete_channel(self.channel_id)
except Exception as e:
console.debug(f"Failed to delete temporary channel: {e}\n{traceback.format_exc()}")
if self.username:
console.debug("Setting back username")
try:
await bot.set_username(self.username)
except Exception as e:
await log(f"Failed to set username back: {e}\n{traceback.format_exc()}")
self.username = self.channel_id = None
# region State
def save_state(self):
setting.set_flood_state({
"start": self.start,
"update": self.update,
"user_ids": self.user_ids,
"auto_archive_enabled_default": self.auto_archive_enabled_default,
"username": self.username,
"channel_id": self.channel_id
})
def update_state(self):
data = setting.get_flood_state()
data.update({
"update": self.update,
"user_ids": self.user_ids,
})
setting.set_flood_state(data)
@staticmethod
def del_state():
setting.del_flood_state()
# endregion
# noinspection SpellCheckingInspection
async def sophitia(self):
"""Watches the private message chat (World)"""
console.debug("Flood Watcher started")
while True:
user_id = None
try:
(user_id,) = await self.queue.get()
if self.triggered: # Continues flooding, add to list and reset timer
self.reset_timer()
self.user_ids.append(user_id)
console.debug(f"User {user_id} added to flood list")
self.update_state()
continue
now = int(time.time())
if self.last_challenge_time and now - self.last_challenge_time < 60:
# A user is challenged less than a min
self.level += 1
elif not self.last_challenge_time or now - self.last_challenge_time > 60:
self.level = 1
self.last_challenge_time = now
if self.level >= setting.get("flood_limit", 50):
console.warn(f"Flooding detected: {self.level} reached in 1 min")
self.triggered = True
self.start = self.update = now
self.reset_timer()
await self.synchronize()
except asyncio.CancelledError:
break
except Exception as e:
await log(f"Error occurred in flood watcher: {e}\n{traceback.format_exc()}")
finally:
user_id and self.queue.task_done()
async def add_synchronize(self, user_id: int):
await self.queue.put((user_id,))
async def synchronize(self):
"""Triggered when flood starts (Iris has started synchronizing people)"""
# Force enable auto archive to reduce api flood
settings: GlobalPrivacySettings = await bot.invoke(GetGlobalPrivacySettings())
self.auto_archive_enabled_default = settings.archive_and_mute_new_noncontact_peers
console.debug("Enabling auto archive")
try:
await bot.invoke(SetGlobalPrivacySettings(
settings=GlobalPrivacySettings(archive_and_mute_new_noncontact_peers=True)
))
console.debug("Auto archive enabled")
except AutoarchiveNotAvailable:
console.warn("Auto archive is not available, API Flooding may be larger than expected")
except Exception as e:
console.error(f"Failed to enable auto archive: {e}\n{traceback.format_exc()}")
if setting.get("flood_username") and bot.me.username:
self.username = bot.me.username
console.debug("Moving username to temporary channel")
if not await self._set_channel_username():
self.username = None
# Save state
self.save_state()
async def overload(self):
"""Executed after flood ends (Nine has performed load action)"""
console.info(f"Flood ended, {len(self.user_ids)} users were affected, duration: {self.end - self.start}s")
if self.channel_id or self.username:
console.debug("Changing back username")
await self._restore_username()
try:
await bot.send_message(log_collect_bot, "\n".join((
f"💣 检测到私聊轰炸",
f"设置限制: {code(setting.get('flood_limit', 50))}",
f"用户数量: {code(str(len(self.user_ids)))}",
f"开始时间: {code(str_timestamp(self.start))}",
f"结束时间: {code(str_timestamp(self.end))}",
f"轰炸时长: {code(str(self.update - self.start))}",
)))
except Exception as e:
console.debug(f"Failed to send flood log: {e}\n{traceback.format_exc()}")
if not self.auto_archive_enabled_default: # Restore auto archive setting
try:
await bot.invoke(SetGlobalPrivacySettings(
settings=GlobalPrivacySettings(archive_and_mute_new_noncontact_peers=False)
))
console.debug("Auto archive disabled")
except Exception as e:
console.debug(f"Failed to disable auto archive: {e}\n{traceback.format_exc()}")
flood_act = setting.get("flood_act")
if flood_act == "asis":
if not the_order.task or the_order.task.done():
the_order.task = asyncio.create_task(the_order.worker())
for user_id in self.user_ids:
setting.pending_ban_list.add_id(user_id)
for user_id in self.user_ids:
await the_order.queue.put((user_id, True))
elif flood_act == "captcha":
if not captcha_task.task or captcha_task.task.done():
captcha_task.task = asyncio.create_task(captcha_task.worker())
for user_id in self.user_ids:
if (setting.pending_challenge_list.check_id(user_id) or curr_captcha.get(user_id) or
setting.get_challenge_state(user_id)):
continue
await self.queue.put((user_id, None, None, None))
setting.pending_challenge_list.add_id(user_id)
console.debug(f"User {user_id} added to challenge queue")
self.user_ids.clear()
self.start = self.end = self.update = self.auto_archive_enabled_default = None
self.del_state()
@dataclass
class CaptchaTask:
"""A class to start, resume and verify the captcha challenge
and contains some nice function like archiving user, getting user's settings
The main function of this class is to queue & start a captcha for the user
"""
queue = asyncio.Queue()
task: Optional[asyncio.Task] = None
flood_text = "[CaptchaTask] Flood Triggered: wait %is, Command: %s, Target: %s"
def __post_init__(self):
if pending := setting.pending_challenge_list.get_subs():
console.debug(f"Pending user(s) to challenge: {len(pending)}")
if len(pending) > 0 and not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
for user_id in pending:
self.queue.put_nowait((user_id, None, None, None))
@staticmethod
async def archive(user_id: int, *, un_archive: bool = False):
from pyrogram.raw.functions.account import UpdateNotifySettings
from pyrogram.raw.types import InputNotifyPeer, InputPeerNotifySettings
notify_setting = InputPeerNotifySettings(**{
"mute_until": None if un_archive else 2147483647,
"show_previews": True if un_archive else None,
"silent": False if un_archive else None
})
peer = InputNotifyPeer(peer=await bot.resolve_peer(user_id))
await exec_api(bot.invoke(UpdateNotifySettings(peer=peer, settings=notify_setting)))
await exec_api((bot.unarchive_chats if un_archive else bot.archive_chats)(user_id))
@staticmethod
async def get_user_settings(user_id: int) -> (bool, bool):
can_report = True
auto_archived = False
if peer_settings := await exec_api(bot.invoke(messages.GetPeerSettings(peer=await bot.resolve_peer(user_id)))):
can_report = peer_settings.settings.report_spam
auto_archived = peer_settings.settings.autoarchived
else:
console.debug(f"GetPeerSettings failed for {user_id}")
return can_report, auto_archived
async def worker(self):
console.debug("Captcha Challenge Worker started")
while True:
user_id: Optional[int] = None
try:
user_id, msg, can_report, auto_archived = await self.queue.get()
user = msg and msg.from_user or await bot.get_users(user_id)
if can_report is None or auto_archived is None:
can_report, auto_archived = await self.get_user_settings(user_id)
if (last_captcha := setting.get_challenge_state(user_id)) and not curr_captcha.get(user_id):
# Resume last captcha challenge
if last_captcha["type"] not in captcha_challenges:
console.info("Failed to resume last captcha challenge: "
f"Unknown challenge type {last_captcha['type']}")
continue
await captcha_challenges[last_captcha["type"]].resume(user=user, msg=msg, state=last_captcha)
continue
# Start a captcha challenge
await self.archive(user_id)
captcha = (captcha_challenges.get(setting.get("type", "math"), MathChallenge)
(user, can_report))
captcha.log_msg(msg and (msg.text or msg.caption or "") or None)
captcha = await captcha.start() or captcha
curr_captcha[user_id] = captcha
setting.pending_challenge_list.del_id(user_id)
except asyncio.CancelledError:
break
except Exception as e:
await log(f"Error occurred when challenging user: {e}\n{traceback.format_exc()}")
finally:
user_id and self.queue.task_done()
async def add(self, user_id: int, msg: Optional[Message], can_report: Optional[bool],
auto_archived: Optional[bool]):
await the_world_eye.add_synchronize(user_id)
if not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
if not (setting.pending_challenge_list.check_id(user_id) or curr_captcha.get(user_id) or
setting.get_challenge_state(user_id)):
setting.pending_challenge_list.add_id(user_id)
await self.queue.put((user_id, msg, can_report, auto_archived))
console.debug(f"User {user_id} added to challenge queue")
@dataclass
class CaptchaChallenge:
type: str
user: User
input: bool
logs: List[str] = field(init=False, default_factory=list)
captcha_write_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
# User Settings
can_report: bool = True
# Post Init Value
captcha_start: int = 0
captcha_end: int = 0
challenge_msg_id: Optional[int] = None
timer_task: Optional[asyncio.Task] = None
# region Logging
def log_msg(self, msg: Optional[str]):
if isinstance(msg, str) and not msg.strip():
return
self.logs.append(isinstance(msg, str) and msg.strip() or msg)
async def send_log(self, ban_code: Optional[str] = None):
import json
from io import BytesIO
if not setting.get("collect_logs", True):
return
user = self.user
log_file = BytesIO(json.dumps(self.logs, indent=4).encode())
log_file.name = f"{user.id}_{self.captcha_start}.json"
caption = [f"UID: {code(str(user.id))}" + (f" @{user.username}" if self.user.username else ""),
f"Mention: {gen_link(str(user.id), f'tg://user?id={user.id}')}"]
if user.first_name or user.last_name:
user_full_name = []
user.first_name and user_full_name.append(user.first_name)
user.last_name and user_full_name.append(user.last_name)
caption.append(f"Name: {code(' '.join(user_full_name))}")
elif user.is_deleted:
caption.append(f"Name: {bold('Deleted Account')}")
if user.is_scam or user.is_fake or user.is_premium:
tags = []
user.is_scam and tags.append(code("Scam"))
user.is_fake and tags.append(code("Fake"))
user.is_premium and tags.append(code("Premium"))
caption.append(f"Tags: {', '.join(tags)}")
user.language_code and caption.append(f"Language: {code(user.language_code)}")
user.dc_id and caption.append(f"DC: {code(str(user.dc_id))}")
user.phone_number and caption.append(f"Phone: {code(user.phone_number)}")
self.type and caption.append(f"Captcha Type: {code(self.type)}")
self.can_report and setting.get("report") and caption.append(f"Spam Reported: {code('Yes')}")
ban_code and caption.append(f"Block Reason: {code(ban_code)}")
self.captcha_start and caption.append(f"Start: {code(str(self.captcha_start))}")
self.captcha_end and caption.append(f"End: {code(str(self.captcha_end))}")
(self.captcha_start and self.captcha_end and
caption.append(f"Duration: {code(str(self.captcha_end - self.captcha_start))}s"))
await exec_api(bot.archive_chats(log_collect_bot))
await exec_api(bot.unblock_user(log_collect_bot))
if not await exec_api(bot.send_document(log_collect_bot, log_file, caption="\n".join(caption),
parse_mode=ParseMode.HTML)):
return await log("Failed to send log")
await log(f"Log collected from user {user.id}")
# endregion
# region State
def save_state(self, extra: Optional[dict] = None):
self.captcha_start = self.captcha_start or int(time.time())
data = {
"type": self.type,
"start": self.captcha_start,
"logs": self.logs,
"msg_id": self.challenge_msg_id,
"report": self.can_report
}
extra and data.update(extra)
setting.set_challenge_state(self.user.id, data)
def update_state(self, changes: Optional[dict] = None):
data = setting.get_challenge_state(self.user.id)
changes and data.update(changes)
setting.set_challenge_state(self.user.id, data)
def del_state(self):
setting.del_challenge_state(self.user.id)
# endregion
# region Verify Result
async def _verify_success(self):
setting.whitelist.add_id(self.user.id)
setting.set("pass", setting.get("pass", 0) + 1)
success_msg = setting.get("welcome") or lang_full("verify_passed")
welcome_msg: Optional[Message] = None
if setting.get("silent"):
try:
self.challenge_msg_id and await bot.delete_messages(self.user.id, self.challenge_msg_id)
except Exception as e:
console.error(f"Failed to delete challenge message: {e}\n{traceback.format_exc()}")
return await CaptchaTask.archive(self.user.id, un_archive=True)
try:
if self.challenge_msg_id:
welcome_msg = await bot.edit_message_text(self.user.id, self.challenge_msg_id, success_msg)
except Exception as e:
console.error(f"Failed to edit welcome message: {e}\n{traceback.format_exc()}")
try:
welcome_msg = await bot.send_message(self.user.id, success_msg)
self.challenge_msg_id = welcome_msg.id
except Exception as e:
console.error(f"Failed to send welcome message: {e}\n{traceback.format_exc()}")
await asyncio.sleep(3)
welcome_msg and await welcome_msg.safe_delete()
await CaptchaTask.archive(self.user.id, un_archive=True)
async def _verify_failed(self):
try:
self.challenge_msg_id and await bot.delete_messages(self.user.id, self.challenge_msg_id)
(self.can_report and setting.get("report") and
await bot.invoke(messages.ReportSpam(peer=await bot.resolve_peer(self.user.id))))
except Exception as e:
console.debug(f"Error occurred when executing verify failed function: {e}\n{traceback.format_exc()}")
await the_order.active(self.user.id, "verify_failed")
await self.send_log("verify_failed")
async def action(self, success: bool):
self.captcha_end = int(time.time())
self.del_state()
self.remove_timer()
await getattr(self, f"_verify_{'success' if success else 'failed'}")()
console.debug(f"User {self.user.id} verify {'success' if success else 'failed'}")
# endregion
# region Timer
async def _challenge_timer(self, timeout: int):
try:
await asyncio.sleep(timeout)
except asyncio.CancelledError:
return
except Exception as e:
console.error(f"Error occurred when running challenge timer: {e}\n{traceback.format_exc()}")
async with self.captcha_write_lock:
console.debug(f"User {self.user.id} verification timed out")
await self.action(False)
if curr_captcha.get(self.user.id):
del curr_captcha[self.user.id]
def reset_timer(self, timeout: Optional[int] = None):
self.timer_task and self.timer_task.cancel()
timeout = timeout is not None and timeout or setting.get(
f"{self.type == 'img' and 'img_' or ''}timeout", self.type == "img" and 300 or 30)
if timeout > 0:
self.timer_task = asyncio.create_task(self._challenge_timer(timeout))
console.debug(f"User {self.user.id} verification timer reset")
return self
def remove_timer(self):
if task := self.timer_task:
task.cancel()
return self
# endregion
@classmethod
async def resume(cls, *, user: User, msg: Optional[Message] = None, state: dict):
console.debug(f"User {user.id} resumed captcha challenge {state['type']}")
async def start(self):
console.debug(f"User {self.user.id} started {self.type} captcha challenge")
class MathChallenge(CaptchaChallenge):
answer: int
def __init__(self, user: User, can_report: bool):
super().__init__("math", user, True, can_report)
@classmethod
async def resume(cls, *, user: User, msg: Optional[Message] = None, state: dict):
captcha = cls(user, state.get("report", True))
captcha.captcha_start = state['start']
captcha.logs = state['logs']
captcha.challenge_msg_id = state['msg_id']
captcha.answer = state['answer']
if (timeout := setting.get("timeout", 30)) > 0:
time_passed = int(time.time()) - int(state['start'])
if time_passed > timeout:
# Timeout
return await captcha.action(False)
if msg: # Verify result
await captcha.verify(msg.text or msg.caption or "")
else: # Restore timer
captcha.reset_timer(timeout - time_passed)
await super(MathChallenge, captcha).resume(user=user, msg=msg, state=state)
async def start(self):
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
import random
first_value, second_value = random.randint(1, 10), random.randint(1, 10)
timeout = setting.get("timeout", 30)
operator = random.choice(("+", "-", "*"))
expression = f"{first_value} {operator} {second_value}"
challenge_msg = await exec_api(bot.send_message(self.user.id, "\n".join((
lang_full('verify_challenge'),
"", code(f"{expression} = ?"), "",
lang_full('verify_challenge_timed', timeout if timeout > 0 else "")
)), parse_mode=ParseMode.HTML))
if not challenge_msg:
return await log(f"Failed to send math captcha challenge to {self.user.id}")
self.challenge_msg_id = challenge_msg.id
self.answer = eval(expression)
self.save_state({"answer": self.answer})
self.reset_timer(timeout)
await super(MathChallenge, self).start()
async def verify(self, answer: str):
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
try:
user_answer = int("".join(re.findall(r"\d+", answer)))
if "-" in answer:
user_answer = -user_answer
except ValueError:
return await the_order.active(self.user.id, "verify_failed")
await self.action(user_answer == self.answer)
return user_answer == self.answer
class ImageChallenge(CaptchaChallenge):
try_count: int
def __init__(self, user: User, can_report: bool):
super().__init__("img", user, False, can_report)
self.try_count = 0
@classmethod
async def resume(cls, *, user: User, msg: Optional[Message] = None, state: dict):
captcha = cls(user, state['report'])
captcha.captcha_start = state['start']
captcha.logs = state['logs']
captcha.challenge_msg_id = state['msg_id']
captcha.try_count = state['try_count']
if captcha.try_count >= setting.get("img_max_retry", 3):
return await captcha.action(False)
if (timeout := setting.get("img_timeout", 300)) > 0: # Restore timer
time_passed = int(time.time()) - int(state['last_active'])
if time_passed > timeout:
# Timeout
return await captcha.action(False)
captcha.reset_timer(timeout - time_passed)
curr_captcha[user.id] = captcha
await super(ImageChallenge, captcha).resume(user=user, msg=msg, state=state)
async def start(self):
from pyrogram.raw.types import UpdateMessageID
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
while True:
try:
if (not (result := await bot.get_inline_bot_results(
img_captcha_bot, setting.get("img_type", "func"))) or
not result.results):
console.debug(f"Failed to get captcha results from {img_captcha_bot}, fallback")
break # Fallback
# From now on, wait for bot result
updates = await bot.send_inline_bot_result(self.user.id, result.query_id, result.results[0].id)
for update in updates.updates:
if isinstance(update, UpdateMessageID):
self.challenge_msg_id = update.id
self.save_state({"try_count": self.try_count, "last_active": int(time.time())})
await bot.block_user(self.user.id)
self.reset_timer()
await super(ImageChallenge, self).start()
return
console.debug(f"Failed to send image captcha challenge to {self.user.id}, fallback")
break
except TimeoutError:
console.debug(f"Image captcha bot timeout for {self.user.id}, fallback")
break # Fallback
except FloodWait as e:
console.debug(f"Image captcha triggered flood for {self.user.id}, wait {e.value}")
await asyncio.sleep(e.value)
except Exception as e:
console.error(
f"Failed to send image captcha challenge to {self.user.id}: {e}\n{traceback.format_exc()}")
await asyncio.sleep(10)
console.debug("Failed to get image captcha, fallback to math captcha.")
fallback_captcha = MathChallenge(self.user, self.can_report)
await fallback_captcha.start()
return fallback_captcha
async def verify(self, success: bool):
if success:
await bot.unblock_user(self.user.id)
self.challenge_msg_id = 0
return await self.action(success)
else:
self.try_count += 1
if self.try_count >= setting.get("img_max_retry", 3):
await self.action(False)
return True
console.debug(f"User failed to complete image captcha challenge, try count: {self.try_count}")
self.update_state({"try_count": self.try_count})
class StickerChallenge(CaptchaChallenge):
def __init__(self, user: User, can_report: bool):
super().__init__("sticker", user, True, can_report)
@classmethod
async def resume(cls, *, user: User, msg: Optional[Message] = None, state: dict):
captcha = cls(user, state['report'])
captcha.captcha_start = state['start']
captcha.logs = state['logs']
captcha.challenge_msg_id = state['msg_id']
if (timeout := setting.get("timeout", 30)) > 0:
time_passed = int(time.time()) - int(state['start'])
if time_passed > timeout:
# Timeout
return await captcha.action(False)
if msg: # Verify result
await captcha.verify(msg.text or msg.caption or "")
else: # Restore timer
captcha.reset_timer(timeout - time_passed)
await super(StickerChallenge, captcha).resume(user=user, msg=msg, state=state)
async def start(self):
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
timeout = setting.get("timeout", 30)
challenge_msg = await exec_api(bot.send_message(self.user.id, "\n".join((
lang_full('verify_challenge'),
"", code(lang_full("verify_send_sticker")), "",
lang_full('verify_challenge_timed', timeout if timeout > 0 else "")
)), parse_mode=ParseMode.HTML))
if not challenge_msg:
return await log(f"Failed to send sticker captcha challenge to {self.user.id}")
self.challenge_msg_id = challenge_msg.id
self.save_state()
self.reset_timer(timeout)
await super(StickerChallenge, self).start()
async def verify(self, response: Optional[Sticker]):
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
if not response:
return await the_order.active(self.user.id, "verify_failed")
await self.action(bool(response))
return bool(response)
# endregion
@dataclass
class Rule:
user: User
msg: Message
can_report: Optional[bool] = None
auto_archived: Optional[bool] = None
def _precondition(self) -> bool:
return (
self.user.id in (347437156, 583325201, 1148248480) or # Skip for PGM/PMC Developers
self.msg.from_user.is_contact or
self.msg.from_user.is_verified or
self.msg.chat.type == ChatType.BOT or
setting.is_verified(self.user.id))
def _get_text(self) -> str:
return self.msg.text or self.msg.caption or ""
async def _get_user_settings(self) -> (bool, bool):
if isinstance(self.can_report, bool):
return self.can_report, self.auto_archived
return await captcha_task.get_user_settings(self.user.id)
async def _run_rules(self, *, outgoing: bool = False):
if not outgoing and self._precondition():
return
members = inspect.getmembers(self, inspect.iscoroutinefunction)
members.sort(key=sort_line_number)
for name, func in members:
docs = func.__doc__ or ""
try:
if (not name.startswith("_") and (
"outgoing" in docs and outgoing and await func() or
"outgoing" not in docs and not self.user.is_self and await func()
)):
console.debug(f"Rule triggered: `{name}` (user: {self.user.id} chat: {self.msg.chat.id})")
break
except Exception as e:
console.error(f"Failed to run rule `{name}`: {e}\n{traceback.format_exc()}")
@staticmethod
def _get_rules_priority() -> tuple:
prio_list = []
members = inspect.getmembers(Rule, inspect.iscoroutinefunction)
members.sort(key=sort_line_number)
for name, func in members:
if name.startswith("_"):
continue
docs = func.__doc__ or ""
if "no_prio" not in docs:
if result := re.search(r"name:\s?(.+)", docs):
name = result[1]
prio_list.append(name)
return tuple(prio_list)
async def initiative(self) -> bool:
"""outgoing"""
initiative = setting.get("initiative", False)
initiative and not setting.whitelist.check_id(self.msg.chat.id) and setting.whitelist.add_id(self.msg.chat.id)
return initiative
async def user_defined(self) -> bool:
if custom_rule := setting.get("custom_rule"):
pass
try:
exec(f"async def _(msg, text, user, me):\n return {custom_rule}")
return bool(await locals()["_"](self.msg, self._get_text(), self.user, bot.me))
except Exception as e:
await log(f"{lang('custom_rule_exec_err')}: {e}\n{traceback.format_exc()}")
return False
async def flooding(self) -> bool:
"""name: flood"""
if the_world_eye.triggered:
_, auto_archived = await self._get_user_settings()
not auto_archived and await captcha_task.archive(self.user.id)
await the_world_eye.add_synchronize(self.user.id)
return the_world_eye.triggered
async def disable_pm(self) -> bool:
disabled = setting.get('disable', False)
disabled and await the_order.active(self.user.id, "disable_pm_enabled")
return disabled
async def chat_history(self) -> bool:
if (history_count := setting.get("history_count")) is not None:
count = 0
async for _ in bot.get_chat_history(self.user.id, offset_id=self.msg.id, offset=-history_count):
count += 1
if count >= history_count:
setting.whitelist.add_id(self.user.id)
return True
return False
async def groups_in_common(self) -> bool:
from pyrogram.raw.functions.users import GetFullUser
if (common_groups := setting.get("groups_in_common")) is not None:
if user_full := await exec_api(bot.invoke(GetFullUser(id=await bot.resolve_peer(self.user.id)))):
if user_full.common_chats_count >= common_groups:
setting.whitelist.add_id(self.user.id)
return True
else:
console.warn(f"Failed to Get Common Groups for user {self.user.id}")
return False
async def premium(self) -> bool:
if premium := setting.get("premium", False):
if premium == "only" and not self.msg.from_user.is_premium:
await the_order.active(self.user.id, "premium_only")
elif not self.msg.from_user.is_premium:
return False
elif premium == "ban":
await the_order.active(self.user.id, "premium_ban")
return premium
# Whitelist / Blacklist
async def word_filter(self) -> bool:
"""name: whitelist > blacklist"""
text = self._get_text()
if text is None:
return False
if array := setting.get("whitelist"):
for word in array.split(","):
if word not in text:
continue
setting.whitelist.add_id(self.user.id)
return True
if array := setting.get("blacklist"):
for word in array.split(","):
if word not in text:
continue
reason_code = "blacklist_triggered"
await the_order.active(self.user.id, reason_code)
# Collect logs
can_report, _ = await self._get_user_settings()
captcha = CaptchaChallenge("", self.user, False, can_report)
captcha.log_msg(text)
await captcha.send_log(reason_code)
return True
return False
async def add_captcha(self) -> bool:
"""name: captcha"""
user_id = self.user.id
if setting.get_challenge_state(user_id) and not curr_captcha.get(user_id) or not curr_captcha.get(user_id):
# Put in challenge queue
await captcha_task.add(user_id, self.msg, *(await self._get_user_settings()))
return True
return False
async def verify_challenge_answer(self) -> bool:
"""no_priority"""
user_id = self.user.id
if (captcha := curr_captcha.get(user_id)) and captcha.input and captcha.type == "math":
text = self._get_text()
captcha.log_msg(text)
await captcha.verify(text) and await self.msg.safe_delete()
del curr_captcha[user_id]
return True
return False
async def verify_sticker_response(self) -> bool:
"""no_priority"""
print("")
if (captcha := curr_captcha.get(user_id := self.user.id)) and captcha.input and captcha.type == "sticker":
captcha.log_msg(self._get_text())
await captcha.verify(self.msg.sticker) and await self.msg.safe_delete()
del curr_captcha[user_id]
return True
return False
# Watches every image captcha result
@listener(is_plugin=False, incoming=True, outgoing=True, privates_only=True)
async def image_captcha_listener(_, msg: Message):
# Ignores non-private chat, not via bot, username not equal to image bot
if msg.chat.type != ChatType.PRIVATE or not msg.via_bot or msg.via_bot.username != img_captcha_bot:
return
user_id = msg.chat.id
if (last_captcha := sqlite.get(f"pmcaptcha.challenge.{user_id}")) and not curr_captcha.get(user_id):
# Resume last captcha challenge
if last_captcha['type'] != "img":
return await log("Failed to resume last captcha challenge: "
f"Unknown challenge type {last_captcha['type']}")
await ImageChallenge.resume(user=msg.from_user, state=last_captcha)
if not curr_captcha.get(user_id): # User not in verify state
return
captcha = curr_captcha[user_id]
captcha.reset_timer().update_state({"last_active": int(time.time())})
if "CAPTCHA_SOLVED" in msg.caption:
await msg.safe_delete()
await captcha.verify(True)
del curr_captcha[user_id]
elif "CAPTCHA_FAILED" in msg.caption:
if "forced" in msg.caption:
await captcha.action(False)
del curr_captcha[user_id]
return
if await captcha.verify(False):
del curr_captcha[user_id]
await msg.safe_delete()
elif "CAPTCHA_FALLBACK" in msg.caption:
await msg.safe_delete()
# Fallback to selected captcha type
captcha_type = msg.caption.replace("CAPTCHA_FALLBACK", "").strip()
console.debug(f"Image bot return fallback request, fallback to {captcha_type}")
if captcha_type == "math":
captcha = MathChallenge(msg.from_user, captcha.can_report)
await captcha.start()
curr_captcha[user_id] = captcha
return
@listener(is_plugin=False, outgoing=True, privates_only=True)
async def initiative_listener(_, msg: Message):
await Rule(bot.me, msg)._run_rules(outgoing=True)
@listener(is_plugin=False, incoming=True, outgoing=False, ignore_edited=True, privates_only=True)
async def chat_listener(_, msg: Message):
await Rule(msg.from_user, msg)._run_rules()
@listener(is_plugin=True, outgoing=True,
command=user_cmd_name, parameters=f"<{lang('vocab_cmd')}> [{lang('cmd_param')}]",
need_admin=True,
description=f"{lang('plugin_desc')}\n{(lang('check_usage') % code(f',{user_cmd_name} h'))}")
async def cmd_entry(_, msg: Message):
cmd = Command(msg.from_user, msg)
if len(msg.parameter) > 0 and msg.parameter[0] == "markdown":
from io import BytesIO
file = BytesIO(cmd._generate_markdown().encode())
file.name = "commands.md"
await bot.send_document(msg.chat.id, file, caption="PMCaptcha commands markdown")
return await msg.delete()
result, err_code, extra = await cmd._run_command()
if not result:
if err_code == "NOT_FOUND":
return await cmd._edit(f"{lang('cmd_not_found')}: {code(extra)}\n" +
lang("check_usage") % code(f',{user_cmd_name} h'))
elif err_code == "INVALID_PARAM":
return await cmd._edit(lang('invalid_param'))
async def resume_states():
console.debug("Resuming Captcha States")
for key, value in sqlite.items(): # type: str, dict
if key.startswith("pmcaptcha.challenge"):
user_id = int(key.split(".")[2])
if user_id not in curr_captcha and (challenge := captcha_challenges.get(value.get('type'))):
# Resume challenge state
try:
user = await bot.get_users(user_id)
await challenge.resume(user=user, state=value)
except Exception as e:
console.error(f"Error occurred when resuming captcha state: {e}\n{traceback.format_exc()}")
console.debug("Captcha State Resume Completed")
if __name__ == "plugins.pmcaptcha":
import gc
# Get alias for user command
user_cmd_name = alias_command(cmd_name)
# Force disabled for old PMCaptcha
globals().get("SubCommand") and exit(0)
# Flood Username confirm
user_want_set_flood_username = None
# Logger
console = logs.getChild(cmd_name)
globals().get("console") is None and exit(0)
captcha_challenges = {
"math": MathChallenge,
"img": ImageChallenge,
"sticker": StickerChallenge
}
_cancel_task = lambda task: task and task.cancel() # noqa
gbl = globals()
# noinspection PyRedeclaration
curr_captcha: Dict[int, Union[MathChallenge, ImageChallenge, StickerChallenge]] = globals().get("curr_captcha", {})
if setting := globals().get("setting"):
del setting
# noinspection PyRedeclaration
setting = Setting("pmcaptcha")
if logging := gbl.get("logging"):
_cancel_task(logging.task)
del logging
# noinspection PyRedeclaration
logging = Log()
if the_world_eye := gbl.get("the_world_eye"):
_cancel_task(the_world_eye.watcher)
del the_world_eye
# noinspection PyRedeclaration
the_world_eye = TheWorldEye()
if the_order := gbl.get("the_order"):
_cancel_task(the_order.task)
del the_order
# noinspection PyRedeclaration
the_order = TheOrder()
if captcha_task := gbl.get("captcha_task"):
_cancel_task(captcha_task.task)
del captcha_task
# noinspection PyRedeclaration
captcha_task = CaptchaTask()
if resume_task := gbl.get("resume_task"):
_cancel_task(resume_task)
resume_task = asyncio.create_task(resume_states())
gc.collect()