diff --git a/pmcaptcha/main.py b/pmcaptcha/main.py
index 193903f..cac38a8 100644
--- a/pmcaptcha/main.py
+++ b/pmcaptcha/main.py
@@ -1,403 +1,1444 @@
-# pmcaptcha - a pagermaid-pyro plugin by cloudreflection
-# https://t.me/cloudreflection_channel/268
-# ver 2022/06/27
+"""
+PMCaptcha - A PagerMaid-Pyro plugin by cloudreflection
+v2 rewritten by Sam
+https://t.me/cloudreflection_channel/268
+ver 2022/07/01
+"""
-import contextlib
-from pyrogram import Client
+import re
+import time
+import html
+import asyncio
+import inspect
+import traceback
+from dataclasses import dataclass, field
+from io import BytesIO
+from typing import Optional, Callable, Union, Dict, List
+
+from pyrogram.errors import FloodWait
from pyrogram.enums.chat_type import ChatType
-from pyrogram.raw.functions.account import UpdateNotifySettings
+from pyrogram.enums.parse_mode import ParseMode
+from pyrogram.raw.functions.account import UpdateNotifySettings, ReportPeer
from pyrogram.raw.functions.messages import DeleteHistory
-from pyrogram.raw.types import InputNotifyPeer, InputPeerNotifySettings
+from pyrogram.raw.types import InputNotifyPeer, InputPeerNotifySettings, InputReportReasonSpam
+from pyrogram.types import User
-from pagermaid import log,bot
+from pagermaid import bot
+from pagermaid.config import Config
+from pagermaid.sub_utils import Sub
from pagermaid.utils import Message
from pagermaid.listener import listener
from pagermaid.single_utils import sqlite
-from pagermaid.sub_utils import Sub
-import asyncio
-import random
+cmd_name = "pmcaptcha"
+version = "2.01"
-captcha_success = Sub("pmcaptcha.success")
-pm_captcha_help_msg = '''```,pmcaptcha```
-查询当前私聊用户验证状态
-
-```,pmcaptcha check id```
-查询指定id用户验证状态
-
-```,pmcaptcha add [id]```
-将id加入已验证,如未指定为当前私聊用户id
-
-```,pmcaptcha del [id]```
-移除id验证记录,如未指定为当前私聊用户id
-
-```,pmcaptcha wel [message]```
-查看或设置验证通过时发送的消息
-使用 ```,pmcaptcha wel -clear``` 可恢复默认规则
-
-```,pmcaptcha bl [list]```
-查看或设置关键词黑名单列表(英文逗号分隔)
-使用 ```,pmcaptcha bl -clear``` 可清空列表
-
-```,pmcaptcha wait [int]```
-查看或设置超时时间,默认为 30 秒
-使用```,pmcaptcha wait off```可关闭验证时间限制
-
-```,pmcaptcha disablepm [true/false]```
-启用/禁止陌生人私聊
-此功能会放行联系人和白名单(已通过验证)用户
-您可以使用 ,pmcaptcha add 将用户加入白名单
-
-```,pmcaptcha stats```
-查看验证计数器
-使用 ```,pmcaptcha stats -clear``` 可重置
-
-```,pmcaptcha action [ban/delete/archive/none]```
-选择验证失败的处理方式,默认为 archive
-
-```,pmcaptcha premium [allow/ban/only/none]```
-选择对premium用户的操作,默认为 none
-
-```,pmcaptcha wl [list]```
-查看或设置关键词白名单列表(英文逗号分隔)
-使用 ```,pmcaptcha wl -clear``` 可清空列表
-
-```,pmcaptcha collect [y/n]```
-查看或设置是否允许 Pmcaptcha 收集验证错误相关信息以帮助改进
-默认为 N ,收集的信息包括验证的首条消息,被验证者的id和用户名
-
-设置优先级: disablepm>premium>wl>bl
-遇到任何问题请先 ```,apt update``` 更新后复现再反馈
-捐赠: cloudreflection.eu.org/donate'''
+# Log Collect
+log_collect_bot = "CloudreflectionPmcaptchabot"
+img_captcha_bot = "PagerMaid_Sam_Bot"
-@listener(is_plugin=False, incoming=False, outgoing=True, ignore_edited=True, privates_only=True)
-async def process_pm_captcha_self(_: Client, message: Message):
- if message.chat.is_verified or message.chat.type == ChatType.BOT:
+async def log(message: str, remove_prefix: bool = False):
+ if not Config.LOG:
return
- cid = message.chat.id
- if message.text:
- with contextlib.suppress(UnicodeDecodeError):
- if message.text[0] == ",": # 忽略命令
- return
- if captcha_success.check_id(cid):
+ message = message if remove_prefix else " ".join(("[PMCaptcha]", message))
+ try:
+ await bot.send_message(Config.LOG_ID, message, ParseMode.HTML)
+ except Exception as e: # noqa
+ print(f"Err: {e}\n{traceback.format_exc()}")
+
+
+def lang(lang_id: str, lang_code: str = Config.LANGUAGE) -> str:
+ lang_code = lang_code or "en"
+ return lang_dict.get(lang_id)[1 if lang_code.startswith("zh") else 0]
+
+
+def code(text: str) -> str:
+ return f"{text}
"
+
+
+def italic(text: str) -> str:
+ return f"{text}"
+
+
+def bold(text: str) -> str:
+ return f"{text}"
+
+
+def gen_link(text: str, url: str) -> str:
+ return f"{text}"
+
+
+async def punishment_worker(q: asyncio.Queue):
+ data = None
+ flood_text = "Flood Triggered: %is, command: %s, target: %s"
+ while True:
+ data = data or sqlite.get("pmcaptcha", {})
+ target = None
+ try:
+ (target,) = await q.get()
+ action = data.get("action", "archive")
+ if action in ("ban", "delete", "archive"):
+ for _ in range(3):
+ try:
+ await bot.block_user(user_id=target)
+ break
+ except FloodWait as e:
+ await log(flood_text % (e.value, "Block", target))
+ await asyncio.sleep(e.value)
+ if action == "delete":
+ for _ in range(3):
+ try:
+ await bot.invoke(DeleteHistory(peer=await bot.resolve_peer(target), max_id=0))
+ break
+ except FloodWait as e:
+ await log(flood_text % (e.value, "Delete Message", target))
+ await asyncio.sleep(e.value)
+ elif action == "archive":
+ for _ in range(3):
+ try:
+ await bot.archive_chats(chat_ids=target)
+ break
+ except FloodWait as e:
+ await log(flood_text % (e.value, "Archive", target))
+ await asyncio.sleep(e.value)
+ data['banned'] = data.get('banned', 0) + 1
+ sqlite['pmcaptcha'] = data
+ chat_link = gen_link(str(target), f"tg://openmessage?user_id={target}")
+ await log(("[PMCaptcha - The Order] "
+ f"{lang('verify_log_punished') % (chat_link, lang(f'action_{action}'))} (Punishment)"), True)
+ except asyncio.CancelledError:
+ break
+ except Exception as e: # noqa
+ await log(f"Error occurred when punishing user: {e}\n{traceback.format_exc()}")
+ finally:
+ target and q.task_done()
+
+
+whitelist = Sub("pmcaptcha.success")
+
+punishment_queue = asyncio.Queue()
+punishment_task: Optional[asyncio.Task] = None
+
+challenge_task: Dict[int, asyncio.Task] = {}
+curr_captcha: Dict[int, Union["MathChallenge", "ImageChallenge"]] = {}
+
+lang_dict = {
+ # region General
+ "no_cmd_given": [
+ "Please use this command in private chat, or add parameters to execute.",
+ "请在私聊时使用此命令,或添加参数执行。"
+ ],
+ "invalid_user_id": [
+ "Invalid User ID",
+ "未知用户或无效的用户 ID"
+ ],
+ "invalid_param": [
+ "Invalid Parameter",
+ "无效的参数"
+ ],
+ "enabled": [
+ "Enabled",
+ "开启"
+ ],
+ "disabled": [
+ "Disabled",
+ "关闭"
+ ],
+ "none": [
+ "None",
+ "无"
+ ],
+ "tip_edit": [
+ f"You can edit this by using {code('%s')}",
+ f"如需编辑,请使用 {code('%s')}"
+ ],
+ "tip_run_in_pm": [
+ "You can only run this command in private chat, or by adding parameters.",
+ "请在私聊使用此命令,或添加参数执行。"
+ ],
+ # endregion
+
+ # region Plugin
+ "plugin_desc": [
+ "Captcha for PM\nPlease use %s to see available commands.",
+ f"私聊人机验证插件\n请使用 %s 查看可用命令"
+ ],
+ # endregion
+
+ # region Vocabs
+ "vocab_msg": [
+ "Message",
+ "消息"
+ ],
+ "vocab_array": [
+ "List",
+ "列表"
+ ],
+ "vocab_bool": [
+ "Boolean",
+ "y / n"
+ ],
+ "vocab_int": [
+ "Integer",
+ "整数"
+ ],
+ "vocab_cmd": [
+ "Command",
+ "指令"
+ ],
+ # endregion
+
+ # region Help
+ "cmd_param": [
+ "Parameter",
+ "参数"
+ ],
+ "cmd_param_optional": [
+ "Optional",
+ "可选"
+ ],
+ "cmd_alias": [
+ "Alias",
+ "别名/快捷命令"
+ ],
+ "cmd_detail": [
+ f"Do {code(f',{cmd_name} h [command]')} for details",
+ f"详细指令请输入 {code(f',{cmd_name} h [指令名称]')}",
+ ],
+ "cmd_not_found": [
+ "Command Not Found",
+ "指令不存在"
+ ],
+ "cmd_list": [
+ "Command List",
+ "指令列表"
+ ],
+ "priority": [
+ "Priority",
+ "优先级"
+ ],
+ # endregion
+
+ # region Check
+ "user_verified": [
+ f"User {code('%i')} {italic('verified')}",
+ f"用户 {code('%i')} {italic('已验证')}"
+ ],
+ "user_unverified": [
+ f"User {code('%i')} {bold('unverified')}",
+ f"用户 {code('%i')} {bold('未验证')}"
+ ],
+ # endregion
+
+ # region Add
+ "add_whitelist_success": [
+ f"User {code('%i')} added to whitelist",
+ f"用户 {code('%i')} 已添加到白名单"
+ ],
+ "remove_verify_log_success": [
+ f"Removed User {code('%i')}'s verify record",
+ f"已删除用户 {code('%i')} 的验证记录"
+ ],
+ "verify_log_not_found": [
+ f"Verify record not found for User {code('%i')}",
+ f"未找到用户 {code('%i')} 的验证记录"
+ ],
+ # endregion
+
+ # region Unstuck
+ "unstuck_success": [
+ f"User {code('%i')} has removed from challenge mode",
+ f"用户 {code('%i')} 已解除验证状态"
+ ],
+ "not_stuck": [
+ f"User {code('%i')} is not stuck",
+ f"用户 {code('%i')} 未在验证状态"
+ ],
+ # endregion
+
+ # region Welcome
+ "welcome_curr_rule": [
+ "Current welcome rule",
+ "当前验证通过时消息规则"
+ ],
+ "welcome_set": [
+ "Welcome message set.",
+ "已设置验证通过消息"
+ ],
+ "welcome_reset": [
+ "Welcome message reset.",
+ "已重置验证通过消息"
+ ],
+ # endregion
+
+ # region Whitelist
+ "whitelist_curr_rule": [
+ "Current whitelist rule",
+ "当前白名单规则"
+ ],
+ "whitelist_set": [
+ "Keywords whitelist set.",
+ "已设置关键词白名单"
+ ],
+ "whitelist_reset": [
+ "Keywords whitelist reset.",
+ "已重置关键词白名单"
+ ],
+ # endregion
+
+ # region Blacklist
+ "blacklist_curr_rule": [
+ "Current blacklist rule",
+ "当前黑名单规则"
+ ],
+ "blacklist_set": [
+ "Keywords blacklist set.",
+ "已设置关键词黑名单"
+ ],
+ "blacklist_reset": [
+ "Keywords blacklist reset.",
+ "已重置关键词黑名单"
+ ],
+ "blacklist_triggered": [
+ "Blacklist rule triggered",
+ "您触发了黑名单规则"
+ ],
+ # endregion
+
+ # region Timeout
+ "timeout_curr_rule": [
+ "Current timeout: %i second(s)",
+ "当前超时时间: %i 秒"
+ ],
+ "timeout_set": [
+ "Verification timeout has been set to %i seconds.",
+ "已设置验证超时时间为 %i 秒"
+ ],
+ "timeout_off": [
+ "Verification timeout disabled.",
+ "已关闭验证超时时间"
+ ],
+ "timeout_exceeded": [
+ "Verification timeout.",
+ "验证超时"
+ ],
+ # endregion
+
+ # region Disable PM
+ "disable_pm_curr_rule": [
+ "Current disable PM status: %s",
+ "当前禁止私聊状态: 已%s"
+ ],
+ "disable_pm_tip_exception": [
+ "This feature will automatically allow contents and whitelist users.",
+ "此功能会自动放行联系人与白名单用户"
+ ],
+ "disable_pm_set": [
+ f"Disable private chat has been set to {bold('%s')}.",
+ f"已设置禁止私聊为{bold('%s')}"
+ ],
+ "disable_pm_enabled": [
+ "Owner has private chat disabled.",
+ "对方已禁止私聊。"
+ ],
+ # endregion
+
+ # region Silent
+ "silent_curr_rule": [
+ "Current silent status: %s",
+ "当前静音状态: 已%s"
+ ],
+ "silent_set": [
+ f"Silent has been set to {bold('%s')}.",
+ f"已设置静音模式为{bold('%s')}"
+ ],
+ # endregion
+
+ # region Stats
+ "stats_display": [
+ "PMCaptcha has verified %i users in total.\n%i users has passed, %i users has been blocked.",
+ "自上次重置起,已进行验证 %i 次\n其中验证通过 %i 次,拦截 %i 次"
+ ],
+ "stats_reset": [
+ "Statistics has been reset.",
+ "已重置统计"
+ ],
+ # endregion
+
+ # region Action
+ "action_param_name": [
+ "Action",
+ "操作"
+ ],
+ "action_curr_rule": [
+ "Current action rule",
+ "当前验证失败规则"
+ ],
+ "action_set": [
+ f"Action has been set to {bold('%s')}.",
+ f"验证失败后将执行{bold('%s')}操作"
+ ],
+ "action_set_none": [
+ "Action has been set to none.",
+ "验证失败后将不执行任何操作"
+ ],
+ "action_ban": [
+ "Ban",
+ "封禁"
+ ],
+ "action_delete": [
+ "Ban and delete",
+ "封禁并删除对话"
+ ],
+ "action_archive": [
+ "Ban and archive",
+ "封禁并归档"
+ ],
+ # endregion
+
+ # region Report
+ "report_curr_rule": [
+ "Current report state: %s",
+ "当前举报状态为: %s"
+ ],
+ "report_set": [
+ f"Report has been set to {bold('%s')}.",
+ f"已设置举报状态为{bold('%s')}"
+ ],
+ # endregion
+
+ # region Premium Set
+ "premium_curr_rule": [
+ "Current premium user rule",
+ "当前 Premium 用户规则"
+ ],
+ "premium_set_allow": [
+ f"Telegram Premium users will be allowed to {bold('bypass')} the captcha.",
+ f"将{bold('不对')} Telegram Premium 用户{bold('发起验证')}"
+ ],
+ "premium_set_ban": [
+ f"Telegram Premium users will be {bold('banned')} from private chat.",
+ f"将{bold('禁止')} Telegram Premium 用户私聊"
+ ],
+ "premium_set_only": [
+ f"{bold('Only allowed')} Telegram Premium users to private chat.",
+ f"将{bold('仅允许')} Telegram Premium 用户私聊"
+ ],
+ "premium_set_none": [
+ "Nothing will do to Telegram Premium",
+ "将不对 Telegram Premium 用户执行额外操作"
+ ],
+ "premium_only": [
+ "Owner only allows Telegram Premium users to private chat.",
+ "对方只允许 Telegram Premium 用户私聊"
+ ],
+ "premium_ban": [
+ "Owner bans Telegram Premium users from private chat.",
+ "对方禁止 Telegram Premium 用户私聊"
+ ],
+ # endregion
+
+ # region Collect Logs
+ "collect_logs_curr_rule": [
+ "Current collect logs status: %s",
+ "当前收集日志状态: 已%s"
+ ],
+ "collect_logs_note": [
+ ("This feature will only collect user information and chat logs of non-verifiers "
+ f"via @{log_collect_bot} , and is not provided to third parties (except @LivegramBot ).\n"
+ "Information collected will be used for PMCaptcha improvements, "
+ "toggling this feature does not affect the use of PMCaptcha."),
+ (f"此功能仅会通过 @{log_collect_bot} 收集未通过验证者的用户信息以及验证未通过的聊天记录;"
+ "且不会提供给第三方(@LivegramBot 除外)。\n收集的信息将用于 PMCaptcha 改进,开启或关闭此功能不影响 PMCaptcha 的使用。")
+ ],
+ "collect_logs_set": [
+ "Collect logs has been set to %s.",
+ "已设置收集日志为 %s"
+ ],
+ # endregion
+
+ # region Captcha Type
+ "type_curr_rule": [
+ "Current captcha type: %s",
+ "当前验证码类型: %s"
+ ],
+ "type_set": [
+ f"Captcha type has been set to {bold('%s')}.",
+ f"已设置验证码类型为 {bold('%s')}"
+ ],
+ "type_param_name": [
+ "Type",
+ "类型"
+ ],
+ "type_captcha_img": [
+ "Image",
+ "图像辨识"
+ ],
+ "type_captcha_math": [
+ "Math",
+ "计算"
+ ],
+ # endregion
+
+ # region Image Captcha Type
+ "img_captcha_type_func": [
+ "funCaptcha",
+ "funCaptcha",
+ ],
+ "img_captcha_type_github": [
+ "GitHub",
+ "GitHub",
+ ],
+ "img_captcha_type_rec": [
+ "reCaptcha",
+ "reCaptcha"
+ ],
+ "img_captcha_retry_curr_rule": [
+ "Current max retry for image captcha: %s",
+ "当前图像验证码最大重试次数: %s"
+ ],
+ "img_captcha_retry_set": [
+ "Max retry for image captcha has been set to %s.",
+ "已设置图像验证码最大重试次数为 %s"
+ ],
+ # endregion
+
+ # Functional
+
+ # region Verify
+ "verify_verified": [
+ "Verified user",
+ "已验证用户"
+ ],
+ "verify_unverified": [
+ "Unverified user",
+ "未验证用户"
+ ],
+ "verify_blocked": [
+ "You were blocked.",
+ "您已被封禁"
+ ],
+ "verify_log_punished": [
+ "User %s has been %s.",
+ "已对用户 %s 执行`%s`操作"
+ ],
+ "verify_challenge": [
+ "Please answer this question to prove you are human (1 chance)",
+ "请回答这个问题证明您不是机器人 (一次机会)"
+ ],
+ "verify_challenge_timed": [
+ "You have %i seconds.",
+ "您有 %i 秒来回答这个问题"
+ ],
+ "verify_passed": [
+ "Verification passed.",
+ "验证通过"
+ ],
+ "verify_failed": [
+ "Verification failed.",
+ "验证失败"
+ ]
+ # endregion
+}
+
+
+# noinspection DuplicatedCode
+@dataclass
+class SubCommand:
+ msg: Message
+
+ # Regex
+ alias_rgx = r":alias: (.+)"
+ param_rgx = r":param (opt)?\s?(\w+):\s?(.+)"
+
+ def _extract_docs(self, subcmd_name: str, text: str) -> str:
+ extras = []
+ if result := re.search(self.param_rgx, text):
+ is_optional = f"({italic(lang('cmd_param_optional'))} ) " if result[1] else ""
+ extras.extend(
+ (
+ f"{lang('cmd_param')}:",
+ f"{is_optional}{code(result[2].lstrip('_'))} - {result[3]}",
+ )
+ )
+
+ text = re.sub(self.param_rgx, "", text)
+ if result := re.search(self.alias_rgx, text):
+ alias = result[1].replace(" ", "").split(",")
+ alia_text = ", ".join(code(a) for a in alias)
+ extras.append(f"{lang('cmd_alias')}: {alia_text}")
+ text = re.sub(self.alias_rgx, "", text)
+ len(extras) and extras.insert(0, "")
+ subcmd_name = "" if cmd_name == subcmd_name else self._get_cmd_with_param(subcmd_name)
+ return "\n".join([
+ code(f",{cmd_name} {subcmd_name}".strip()),
+ re.sub(r" {4,}", "", text).replace("{cmd_name}", cmd_name).strip()
+ ] + extras)
+
+ def _get_cmd_with_param(self, subcmd_name: str) -> str:
+ msg = subcmd_name
+ if result := re.search(self.param_rgx, getattr(self, msg).__doc__):
+ param = result[2].lstrip("_")
+ msg += f" [{param}]" if result[1] else html.escape(f" <{param}>")
+ return msg
+
+ def _get_mapped_alias(self, alias_name: str, ret_type: str):
+ # Get alias function
+ for name, func in inspect.getmembers(self, inspect.iscoroutinefunction):
+ if name.startswith("_"):
+ continue
+ if result := re.search(self.alias_rgx, func.__doc__):
+ if alias_name in result[1].replace(" ", "").split(","):
+ return func if ret_type == "func" else name
+
+ def __getitem__(self, cmd: str) -> Optional[Callable]:
+ # Get subcommand function
+ if func := getattr(self, cmd, None):
+ return func
+ # Check for alias
+ if func := self._get_mapped_alias(cmd, "func"):
+ return func
+ return # Not found
+
+ async def pmcaptcha(self):
+ """查询当前私聊用户验证状态"""
+ if self.msg.chat.type != ChatType.PRIVATE:
+ await self.msg.edit(lang('tip_run_in_pm'), parse_mode=ParseMode.HTML)
+ else:
+ await self.msg.edit(lang(f'verify_{"" if whitelist.check_id(self.msg.chat.id) else "un"}verified'))
+ await asyncio.sleep(5)
+ await self.msg.safe_delete()
+
+ async def help(self, command: Optional[str] = None):
+ """显示指令帮助信息
+
+ :param opt command: 命令名称
+ :alias: h
+ """
+ help_msg = [f"{code('PMCaptcha')} {lang('cmd_list')} (v{version}):", ""]
+ footer = [
+ italic(lang('cmd_detail')),
+ "",
+ f"{lang('priority')}: disable_pm > premium > whitelist > blacklist",
+ f"遇到任何问题请先 {code(',apt update')} 更新后复现再反馈",
+ "捐赠: cloudreflection.eu.org/donate"
+ ]
+ if command: # Single command help
+ func = getattr(self, command, self._get_mapped_alias(command, "func"))
+ return await (
+ self.msg.edit_text(self._extract_docs(func.__name__, func.__doc__), parse_mode=ParseMode.HTML)
+ if func else self.msg.edit_text(f"{lang('cmd_not_found')}: {code(command)}", parse_mode=ParseMode.HTML))
+ for name, func in inspect.getmembers(self, inspect.iscoroutinefunction):
+ if name.startswith("_"):
+ continue
+ help_msg.append(
+ (
+ code(f",{cmd_name} {self._get_cmd_with_param(name)}")
+ + f"\n> {re.search(r'(.+)', func.__doc__)[1].strip()}\n"
+ )
+ )
+
+ if self.msg.chat.type != ChatType.PRIVATE:
+ await self.msg.edit_text(lang('tip_run_in_pm'), parse_mode=ParseMode.HTML)
+ await asyncio.sleep(5)
+ return await self.msg.safe_delete()
+ await self.msg.edit_text("\n".join(help_msg + footer), parse_mode=ParseMode.HTML)
+
+ async def check(self, _id: int):
+ """查询指定用户验证状态
+
+ :param _id: 用户 ID
+ """
+ try:
+ _id = _id or self.msg.chat.id
+ verified = whitelist.check_id(int(_id))
+ await self.msg.edit(lang(f"user_{'' if verified else 'un'}verified") % _id, parse_mode=ParseMode.HTML)
+ except ValueError:
+ await self.msg.edit(lang('invalid_user_id'), parse_mode=ParseMode.HTML)
+
+ async def add(self, _id: Optional[int] = None):
+ """将 ID 加入已验证,如未指定为当前私聊用户 ID
+
+ :param opt _id: 用户 ID
+ """
+ try:
+ if not _id and self.msg.chat.type != ChatType.PRIVATE:
+ return await self.msg.edit(lang('tip_run_in_pm'), parse_mode=ParseMode.HTML)
+ _id = _id or self.msg.chat.id
+ whitelist.add_id(int(_id))
+ await bot.unarchive_chats(chat_ids=int(_id))
+ await self.msg.edit(lang('add_whitelist_success') % _id, parse_mode=ParseMode.HTML)
+ except ValueError:
+ await self.msg.edit(lang('invalid_user_id'), parse_mode=ParseMode.HTML)
+
+ async def delete(self, _id: Optional[int] = None):
+ """移除 ID 验证记录,如未指定为当前私聊用户 ID
+
+ :param opt _id: 用户 ID
+ :alias: del
+ """
+ try:
+ if not _id and self.msg.chat.type != ChatType.PRIVATE:
+ return await self.msg.edit(lang('tip_run_in_pm'), parse_mode=ParseMode.HTML)
+ _id = _id or self.msg.chat.id
+ text = lang('remove_verify_log_success' if whitelist.del_id(int(_id)) else 'verify_log_not_found')
+ await self.msg.edit(text % _id, parse_mode=ParseMode.HTML)
+ except ValueError:
+ await self.msg.edit(lang('invalid_user_id'), parse_mode=ParseMode.HTML)
+
+ async def unstuck(self, _id: Optional[int] = None):
+ """解除一个用户的验证状态,通常用于解除卡死的验证状态
+
+ :param _id: 用户 ID
+ """
+ try:
+ if not _id and self.msg.chat.type != ChatType.PRIVATE:
+ return await self.msg.edit(lang('tip_run_in_pm'), parse_mode=ParseMode.HTML)
+ _id = _id or self.msg.chat.id
+ if sqlite.get(f"pmcaptcha.challenge.{_id}"):
+ del sqlite[f"pmcaptcha.challenge.{_id}"]
+ return await self.msg.edit(lang('unstuck_success') % _id, parse_mode=ParseMode.HTML)
+ await self.msg.edit(lang('not_stuck') % _id, parse_mode=ParseMode.HTML)
+ except ValueError:
+ await self.msg.edit(lang('invalid_user_id'), parse_mode=ParseMode.HTML)
+
+ async def welcome(self, *message: str):
+ """查看或设置验证通过时发送的消息
+ 使用 ,{cmd_name} welcome -clear
可恢复默认规则
+
+ :param message: 消息内容
+ :alias: wel
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not message:
+ return await self.msg.edit_text("\n".join((
+ lang('welcome_curr_rule') + ":",
+ code(data.get('welcome', lang('none'))),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} wel <{lang('vocab_msg')}>")
+ )), parse_mode=ParseMode.HTML)
+ message = " ".join(message)
+ if message == "-clear":
+ if data.get("welcome"):
+ del data["welcome"]
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('welcome_reset'), parse_mode=ParseMode.HTML)
+ return
+ data["welcome"] = message
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('welcome_set'), parse_mode=ParseMode.HTML)
+
+ async def whitelist(self, array: str):
+ """查看或设置关键词白名单列表(英文逗号分隔)
+ 使用 ,{cmd_name} whitelist -clear
可清空列表
+
+ :param array: 白名单列表 (英文逗号分隔)
+ :alias: wl, whl
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not array:
+ return await self.msg.edit_text("\n".join((
+ lang('whitelist_curr_rule') + ":",
+ code(data.get('whitelist', lang('none'))),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} wl <{lang('vocab_array')}>")
+ )), parse_mode=ParseMode.HTML)
+ if array == "-clear":
+ if data.get("whitelist"):
+ del data["whitelist"]
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('whitelist_reset'), parse_mode=ParseMode.HTML)
+ return
+ data["whitelist"] = array.replace(" ", "").split(",")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('whitelist_set'), parse_mode=ParseMode.HTML)
+
+ async def blacklist(self, array: str):
+ """查看或设置关键词黑名单列表 (英文逗号分隔)
+ 使用 ,{cmd_name} blacklist -clear
可清空列表
+
+ :param array: 黑名单列表 (英文逗号分隔)
+ :alias: bl
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not array:
+ return await self.msg.edit_text("\n".join((
+ lang('blacklist_curr_rule') + ":",
+ code(data.get('blacklist', lang('none'))),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} bl <{lang('vocab_array')}>")
+ )), parse_mode=ParseMode.HTML)
+ if array == "-clear":
+ if data.get("blacklist"):
+ del data["blacklist"]
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('blacklist_reset'), parse_mode=ParseMode.HTML)
+ return
+ data["blacklist"] = array.replace(" ", "").split(",")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('blacklist_set'), parse_mode=ParseMode.HTML)
+
+ async def timeout(self, seconds: Union[str, int]):
+ """查看或设置超时时间,默认为 30 秒 (不适用于图像模式)
+ 使用 ,{cmd_name} wait off
可关闭验证时间限制
+
+ :param seconds: 超时时间,单位秒
+ :alias: wait
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not seconds:
+ return await self.msg.edit_text("\n".join((
+ lang('timeout_curr_rule') % int(data.get('timeout', 30)),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} wait <{lang('vocab_int')}>")
+ )), parse_mode=ParseMode.HTML)
+ if seconds == "off":
+ if data.get("timeout"):
+ data["timeout"] = 0
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('timeout_off'), parse_mode=ParseMode.HTML)
+ return
+ try:
+ data["timeout"] = int(seconds)
+ except ValueError:
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('timeout_set') % seconds, parse_mode=ParseMode.HTML)
+
+ async def disable_pm(self, toggle: str):
+ """启用 / 禁止陌生人私聊
+ 此功能会放行联系人和白名单(已通过验证)用户
+ 您可以使用 ,{cmd_name} add
将用户加入白名单
+
+ :param toggle: 开关 (y / n)
+ :alias: disablepm
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not toggle:
+ return await self.msg.edit_text("\n".join((
+ lang('disable_pm_curr_rule') % lang('enabled' if data.get('disable') else 'disabled'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} disablepm <{lang('vocab_bool')}>")
+ )), parse_mode=ParseMode.HTML)
+ toggle = toggle.lower()[0]
+ if toggle not in ("y", "n", "t", "f", "1", "0"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["disable"] = toggle in ("y", "t", "1")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('disable_pm_set') % lang("enabled" if data["disable"] else "disabled"),
+ parse_mode=ParseMode.HTML)
+
+ async def stats(self, arg: str):
+ """查看验证统计
+ 可以使用 ,{cmd_name} stats -clear
重置
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not arg:
+ data = (data.get('pass', 0) + data.get('banned', 0), data.get('pass', 0), data.get('banned', 0))
+ return await self.msg.edit_text(code("PMCaptcha ") + lang('stats_display') % data,
+ parse_mode=ParseMode.HTML)
+ if arg == "-clear":
+ data["pass"] = 0
+ data["banned"] = 0
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('stats_reset'), parse_mode=ParseMode.HTML)
+ return
+
+ async def action(self, action: str):
+ """选择验证失败的处理方式,默认为 archive
+
+ :param action: 处理方式 (ban
/ delete
/ archive
/ none
)
+ :alias: act
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not action:
+ action = data.get("action", "archive")
+ return await self.msg.edit_text("\n".join((
+ lang('action_curr_rule') + ":",
+ lang('action_set_none') if action == "none" else lang('action_set') % lang(f'action_{action}'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} act <{lang('action_param_name')}>")
+ )), parse_mode=ParseMode.HTML)
+ if action not in ("ban", "delete", "archive", "none"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ if action in ("ban", "delete", "archive"):
+ await self.msg.edit(lang('action_set') % lang(f'action_{action}'), parse_mode=ParseMode.HTML)
+ elif action == "none":
+ await self.msg.edit(lang('action_set_none'), parse_mode=ParseMode.HTML)
+ data["action"] = action
+ sqlite["pmcaptcha"] = data
+
+ async def report(self, toggle: str):
+ """选择验证失败后是否举报该用户,默认为 N
+
+ :param toggle: 开关 (y / n)
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not toggle:
+ return await self.msg.edit_text("\n".join((
+ lang('report_curr_rule') % lang('enabled' if data.get('report') else 'disabled'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} report <{lang('vocab_bool')}>")
+ )), parse_mode=ParseMode.HTML)
+ toggle = toggle.lower()[0]
+ if toggle not in ("y", "n", "t", "f", "1", "0"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["report"] = toggle in ("y", "t", "1")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('report_set') % lang("enabled" if data["report"] else "disabled"),
+ parse_mode=ParseMode.HTML)
+
+ async def premium(self, action: str):
+ """选择对 Premium 用户的操作,默认为 archive
+
+ :param action: 操作方式 (allow
/ ban
/ only
/ none
)
+ :alias: vip, prem
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not action:
+ return await self.msg.edit_text("\n".join((
+ lang('premium_curr_rule') + ":",
+ lang(f'premium_set_{data.get("premium", "none")}'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} prem <{lang('action_param_name')}>")
+ )), parse_mode=ParseMode.HTML)
+ if action not in ("allow", "ban", "only", "none"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ if action == "none":
+ del data["premium"]
+ else:
+ data["premium"] = action
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang(f'premium_set_{action}'), parse_mode=ParseMode.HTML)
+
+ async def silent(self, toggle: Optional[str] = None):
+ """减少信息发送,默认为 no
+ 开启后,将不会发送封禁提示 (不影响 log 发送)
+
+ :param toggle: 开关 (yes / no)
+ :alias: quiet
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not toggle:
+ return await self.msg.edit_text("\n".join((
+ lang('silent_curr_rule') % lang('enabled' if data.get('silent', False) else 'disabled'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} silent <{lang('vocab_bool')}>")
+ )), parse_mode=ParseMode.HTML)
+ toggle = toggle.lower()[0]
+ if toggle not in ("y", "n", "t", "f", "1", "0"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["silent"] = toggle in ("y", "t", "1")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('silent_set') % lang("enabled" if data["silent"] else "disabled"),
+ parse_mode=ParseMode.HTML)
+
+ async def collect_logs(self, toggle: str):
+ """查看或设置是否允许 PMCaptcha
收集验证错误相关信息以帮助改进
+ 默认为 N,收集的信息包括被验证者的信息以及未通过验证的信息记录
+
+ :param toggle: 开关 (y / n)
+ :alias: collect, log
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not toggle:
+ status = lang('enabled' if data.get('collect', False) else 'disabled')
+ return await self.msg.edit_text("\n".join((
+ lang('collect_logs_curr_rule') % status,
+ lang("collect_logs_note"),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} log <{lang('vocab_bool')}>")
+ )), parse_mode=ParseMode.HTML)
+ toggle = toggle.lower()[0]
+ if toggle not in ("y", "n", "t", "f", "1", "0"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["collect"] = toggle in ("y", "t", "1")
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('collect_logs_set') % lang("enabled" if data["collect"] else "disabled"))
+
+ # Image Captcha
+
+ async def change_type(self, _type: str):
+ """切换验证码类型,默认为 math
+ 目前只有基础计算和图形辨识
+
+ 注意:如果图像验证不能使用将回退到计算验证
+
+ :param _type: 验证码类型 (img
/ math
)
+ :alias: type, typ
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not _type:
+ return await self.msg.edit_text("\n".join((
+ lang('type_curr_rule') % lang(f'type_captcha_{data.get("type", "math")}'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} typ <{lang('type_param_name')}>")
+ )), parse_mode=ParseMode.HTML)
+ if _type not in ("img", "math"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["type"] = _type
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('type_set') % lang(f'type_captcha_{_type}'), parse_mode=ParseMode.HTML)
+
+ async def change_img_type(self, _type: str):
+ """切换图像辨识使用接口,默认为 func
+ 目前可用的接口:
+ - func
(ArkLabs funCaptcha )
+ - github
(GitHub 螺旋星系 )
+ - rec
(Google reCAPTCHA )
+ 请注意, reCAPTCHA
难度相比前两个高出不少,
+ 因此验证码系统会在尝试过多后提供 func
接口让用户选择
+
+ :param _type: 验证码类型 (func
/ github
/ rec
)
+ :alias: img_type, img_typ
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not _type:
+ return await self.msg.edit_text("\n".join((
+ lang('type_curr_rule') % lang(f'img_captcha_type_{data.get("img_type", "func")}'),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} img_typ <{lang('type_param_name')}>")
+ )), parse_mode=ParseMode.HTML)
+ if _type not in ("func", "github", "rec"):
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+ data["img_type"] = _type
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(
+ lang('type_set') % lang(f'img_captcha_type_{_type}'),
+ parse_mode=ParseMode.HTML,
+ )
+
+ async def img_retry_chance(self, number: str):
+ """图形验证码最大可重试次数,默认为 3
+
+ :param number: 重试次数
+ :alias: img_re
+ """
+ data = sqlite.get("pmcaptcha", {})
+ if not number:
+ return await self.msg.edit_text("\n".join((
+ lang('img_captcha_retry_curr_rule') % data.get("img_max_retry", 3),
+ "",
+ lang('tip_edit') % html.escape(f",{cmd_name} img_re <{lang('vocab_int')}>")
+ )), parse_mode=ParseMode.HTML)
+ try:
+ data["img_max_retry"] = int(number)
+ sqlite["pmcaptcha"] = data
+ await self.msg.edit(lang('img_captcha_retry_set') % number, parse_mode=ParseMode.HTML)
+ except ValueError:
+ return await self.msg.edit(lang('invalid_param'), parse_mode=ParseMode.HTML)
+
+
+# region Captcha
+async def punish(user_id: int, reason_code: str):
+ try:
+ user = await bot.get_users(user_id)
+ not sqlite.get("pmcaptcha", {}).get("silent", False) and await bot.send_message(user_id, " ".join((
+ lang(reason_code, user.language_code),
+ lang("verify_blocked", user.language_code)
+ )))
+ except FloodWait:
+ pass # Skip waiting
+ global punishment_task
+ if not punishment_task or punishment_task.done():
+ punishment_task = asyncio.create_task(punishment_worker(punishment_queue))
+ return punishment_queue.put_nowait((user_id,))
+
+
+@dataclass
+class CaptchaChallenge:
+ type: str
+ user: User
+ input: bool
+ logs: List[str] = field(default_factory=list)
+ captcha_write_lock: asyncio.Lock = asyncio.Lock()
+
+ # Post Init Value
+ captcha_start: int = 0
+ challenge_msg_id: Optional[int] = None
+
+ # region Logging
+
+ def log_msg(self, msg: str):
+ self.logs.append(msg.strip())
+
+ async def send_log(self, ban_code: Optional[str] = None):
+ if not sqlite.get("pmcaptcha", {}).get("collect", False):
+ return
+ import json
+ user = self.user
+ log_file = BytesIO(json.dumps(self.logs, indent=4).encode())
+ log_file.name = f"{user.id}_{self.captcha_start}.json"
+ caption = [f"UID: {code(str(user.id))}" + (f" @{user.username}" if self.user.username else ""),
+ f"Mention: {gen_link(str(user.id), f'tg://openmessage?user_id={user.id}')}"]
+ if user.first_name or user.last_name:
+ user_full_name = []
+ user.first_name and user_full_name.append(user.first_name)
+ user.last_name and user_full_name.append(user.last_name)
+ caption.append(f"Name: {code(' '.join(user_full_name))}")
+ elif user.is_deleted:
+ caption.append(f"Name: {bold('Deleted Account')}")
+ if user.is_scam or user.is_fake or user.is_premium:
+ tags = []
+ user.is_scam and tags.append(code("Scam"))
+ user.is_fake and tags.append(code("Fake"))
+ user.is_premium and tags.append(code("Premium"))
+ caption.append(f"Tags: {', '.join(tags)}")
+ user.language_code and caption.append(f"Language: {code(user.language_code)}")
+ user.dc_id and caption.append(f"DC: {code(str(user.dc_id))}")
+ user.phone_number and caption.append(f"Phone: {code(user.phone_number)}")
+ self.type and caption.append(f"Captcha Type: {code(self.type)}")
+ ban_code and caption.append(f"Block Reason: {code(ban_code)}")
+ send = False
+ last_exp = None
+ try:
+ await bot.unblock_user(log_collect_bot)
+ except: # noqa
+ pass
+ for _ in range(3):
+ try:
+ await bot.send_document(log_collect_bot, log_file,
+ caption="\n".join(caption), parse_mode=ParseMode.HTML)
+ send = True
+ break
+ except Exception as e: # noqa
+ last_exp = f"{e}\n{traceback.format_exc()}"
+ if not send and last_exp:
+ return await log(f"Error occurred when sending log: {last_exp}")
+ elif not send:
+ return await log("Failed to send log")
+ await log(f"Log collected from user {user.id}")
+
+ # endregion
+
+ # region State
+
+ def save_state(self, extra: Optional[dict] = None):
+ self.captcha_start = self.captcha_start or int(time.time())
+ data = {
+ "type": self.type,
+ "start": self.captcha_start,
+ "logs": self.logs,
+ "msg_id": self.challenge_msg_id,
+ }
+ extra and data.update(extra)
+ sqlite[f"pmcaptcha.challenge.{self.user.id}"] = data
+
+ def update_state(self, changes: Optional[dict] = None):
+ data = sqlite.get(f"pmcaptcha.challenge.{self.user.id}", {})
+ changes and data.update(changes)
+ sqlite[f"pmcaptcha.challenge.{self.user.id}"] = data
+
+ def del_state(self):
+ key = f"pmcaptcha.challenge.{self.user.id}"
+ if sqlite.get(key):
+ del sqlite[key]
+
+ # endregion
+
+ # region Verify Result
+
+ async def _verify_success(self):
+ data = sqlite.get("pmcaptcha", {})
+ whitelist.add_id(self.user.id)
+ data['pass'] = data.get('pass', 0) + 1
+ sqlite['pmcaptcha'] = data
+ success_msg = data.get("welcome") or lang("verify_passed", self.user.language_code)
+ welcome_msg: Optional[Message] = None
+ try:
+ if self.challenge_msg_id:
+ welcome_msg = await bot.edit_message_text(self.user.id, self.challenge_msg_id, success_msg)
+ except: # noqa
+ pass
+ else:
+ try:
+ welcome_msg = await bot.send_message(self.user.id, success_msg)
+ self.challenge_msg_id = welcome_msg.id
+ except: # noqa
+ pass
+ await asyncio.sleep(3)
+ welcome_msg and await welcome_msg.safe_delete()
+ try:
+ peer = await bot.resolve_peer(self.user.id)
+ await bot.unarchive_chats(chat_ids=self.user.id)
+ await bot.invoke(UpdateNotifySettings(
+ peer=InputNotifyPeer(peer=peer),
+ settings=InputPeerNotifySettings(show_previews=True, silent=False)))
+ except: # noqa
+ pass
+
+ async def _verify_failed(self):
+ try:
+ self.challenge_msg_id and await bot.delete_messages(self.user.id, self.challenge_msg_id)
+ sqlite.get("pmcaptcha", {}).get("report", False) and await bot.invoke(ReportPeer(
+ peer=await bot.resolve_peer(self.user.id),
+ reason=InputReportReasonSpam(),
+ message=""
+ ))
+ except: # noqa
+ pass
+ await punish(self.user.id, "verify_failed")
+ await self.send_log()
+
+ async def action(self, success: bool):
+ async with self.captcha_write_lock:
+ self.del_state()
+ if task := challenge_task.get(self.user.id):
+ task.cancel()
+ del challenge_task[self.user.id]
+ await getattr(self, f"_verify_{'success' if success else 'failed'}")()
+
+ # endregion
+
+
+class MathChallenge(CaptchaChallenge):
+ answer: int
+
+ def __init__(self, user: User):
+ super().__init__("math", user, True)
+
+ @classmethod
+ async def resume(cls, msg: Message, state: dict):
+ user = msg.from_user
+ captcha = cls(user)
+ captcha.captcha_start = state['start']
+ captcha.logs = state['logs']
+ captcha.challenge_msg_id = state['msg_id']
+ now = int(time.time())
+ timeout = sqlite.get("pmcaptcha", {}).get("timeout", 30)
+ if timeout > 0 and now - state['start'] > timeout:
+ return await captcha.action(False)
+ captcha.answer = state['answer']
+ await captcha.verify(msg.text)
+
+ async def start(self):
+ if self.captcha_write_lock.locked():
+ return
+ async with self.captcha_write_lock:
+ import random
+ full_lang = self.user.language_code
+ first_value = random.randint(1, 10)
+ second_value = random.randint(1, 10)
+ timeout = sqlite.get("pmcaptcha", {}).get("timeout", 30)
+ operator = random.choice(("+", "-", "*"))
+ expression = f"{first_value} {operator} {second_value}"
+ challenge_msg = None
+ for _ in range(3):
+ try:
+ challenge_msg = await bot.send_message(self.user.id, "\n".join((
+ lang('verify_challenge', full_lang),
+ "",
+ code(f"{expression} = ?"),
+ "\n" + lang('verify_challenge_timed', full_lang) % timeout if timeout > 0 else ""
+ )), parse_mode=ParseMode.HTML)
+ break
+ except FloodWait as e:
+ await asyncio.sleep(e.value)
+ if not challenge_msg:
+ return await log(f"Failed to send math captcha challenge to {self.user.id}")
+ self.challenge_msg_id = challenge_msg.id
+ self.answer = eval(expression)
+ self.save_state({"answer": self.answer})
+ if timeout > 0:
+ challenge_task[self.user.id] = asyncio.create_task(self.challenge_timeout(timeout))
+
+ async def challenge_timeout(self, timeout: int):
+ try:
+ await asyncio.sleep(timeout)
+ except asyncio.CancelledError:
+ return
+ if self.captcha_write_lock.locked():
+ return
+ async with self.captcha_write_lock:
+ await self.action(False)
+ if curr_captcha.get(self.user.id):
+ del curr_captcha[self.user.id]
+
+ async def verify(self, answer: str):
+ if self.captcha_write_lock.locked():
+ return
+ async with self.captcha_write_lock:
+ try:
+ user_answer = int("".join(re.findall(r"\d+", answer)))
+ if "-" in answer:
+ user_answer = -user_answer
+ except ValueError:
+ return await punish(self.user.id, "verify_failed")
+ await self.action(user_answer == self.answer)
+ return user_answer == self.answer
+
+
+class ImageChallenge(CaptchaChallenge):
+ try_count: int
+
+ def __init__(self, user: User):
+ super().__init__("img", user, False)
+ self.try_count = 0
+
+ @classmethod
+ async def resume(cls, msg: Message, state: dict):
+ user = msg.from_user
+ captcha = cls(user)
+ captcha.captcha_start = state['start']
+ captcha.logs = state['logs']
+ captcha.challenge_msg_id = state['msg_id']
+ captcha.try_count = state['try_count']
+ if captcha.try_count >= sqlite.get("pmcaptcha", {}).get("img_max_retry", 3):
+ return await captcha.action(False)
+ curr_captcha[user.id] = captcha
+
+ async def start(self):
+ if self.captcha_write_lock.locked():
+ return
+ async with self.captcha_write_lock:
+ while True:
+ try:
+ if not (result := await bot.get_inline_bot_results(
+ img_captcha_bot, sqlite.get("pmcaptcha", {}).get("img_type", "func"))):
+ break # Fallback
+ # From now on, wait for bot result
+ updates = await bot.send_inline_bot_result(self.user.id, result.query_id, result.results[0].id)
+ self.challenge_msg_id = updates.updates[0].id
+ self.save_state({"try_count": self.try_count})
+ await bot.block_user(self.user.id)
+ return
+ except TimeoutError:
+ break # Fallback
+ except FloodWait as e:
+ await asyncio.sleep(e.value)
+ fallback_captcha = MathChallenge(self.user)
+ await fallback_captcha.start()
+ return fallback_captcha
+
+ async def verify(self, success: bool):
+ if success:
+ await bot.unblock_user(self.user.id)
+ self.challenge_msg_id = 0
+ return await self.action(success)
+ else:
+ self.try_count += 1
+ if self.try_count >= sqlite.get("pmcaptcha", {}).get("img_max_retry", 3):
+ await self.action(False)
+ return True
+ self.update_state({"try_count": self.try_count})
+
+
+# endregion
+
+# Watches every image captcha result
+@listener(is_plugin=False, incoming=True, outgoing=True, privates_only=True)
+async def image_captcha_listener(msg: Message):
+ # Ignores non-private chat, not via bot, username not equal to image bot
+ if msg.chat.type != ChatType.PRIVATE or not msg.via_bot or msg.via_bot.username != img_captcha_bot:
return
- else:
- return captcha_success.add_id(cid)
-
-
-async def do_action_and_read(client, cid, data):
- await client.unarchive_chats(chat_ids=cid)
- await asyncio.sleep(random.randint(0, 100) / 1000)
- await client.read_chat_history(cid)
- await asyncio.sleep(random.randint(0, 100) / 1000)
- action = data.get("action", "archive")
- if action in ["archive", "delete", "ban"]:
- await client.block_user(user_id=cid)
- if action in ["archive"]:
- await client.archive_chats(chat_ids=cid)
- if action in ["delete"]:
- with contextlib.suppress(Exception):
- await client.invoke(DeleteHistory(max_id=0, peer=await client.resolve_peer(cid)))
- # log
- await log(f"(pmcaptcha) 已对 [{cid}](tg://openmessage?user_id={cid}) 执行 {action} 操作")
- data['banned'] = data.get('banned', 0) + 1
- sqlite['pmcaptcha'] = data
-
-
-async def collect_imformation(client, message):
- with contextlib.suppress(Exception):
- await client.unblock_user("CloudreflectionPmcaptchabot")
- with contextlib.suppress(Exception):
- if message.text:
- async with message.bot.conversation("CloudreflectionPmcaptchabot") as conv:
- await conv.send_message(message.text)
- await conv.send_message(f"{message.from_user.id} @{message.from_user.username if message.from_user.username else ''}")
- await conv.mark_as_read()
+ user_id = msg.chat.id
+ if (last_captcha := sqlite.get(f"pmcaptcha.challenge.{user_id}")) and not curr_captcha.get(user_id):
+ # Resume last captcha challenge
+ if last_captcha['type'] != "img":
+ return await log("Failed to resume last captcha challenge: "
+ f"Unknown challenge type {last_captcha['type']}")
+ await ImageChallenge.resume(msg, last_captcha)
+ if not curr_captcha.get(user_id): # User not in verify state
+ return
+ if "CAPTCHA_SOLVED" in msg.caption:
+ await msg.safe_delete()
+ await curr_captcha[user_id].verify(True)
+ del curr_captcha[user_id]
+ elif "CAPTCHA_FAILED" in msg.caption:
+ if "forced" in msg.caption:
+ await curr_captcha[user_id].action(False)
+ del curr_captcha[user_id]
+ return
+ if await curr_captcha[user_id].verify(False):
+ del curr_captcha[user_id]
+ await msg.safe_delete()
+ elif "CAPTCHA_FALLBACK" in msg.caption:
+ await msg.safe_delete()
+ # Fallback to selected captcha type
+ captcha_type = msg.caption.replace("CAPTCHA_FALLBACK", "").strip()
+ if captcha_type == "math":
+ captcha = MathChallenge(msg.from_user)
+ await captcha.start()
+ curr_captcha[user_id] = captcha
+ return
@listener(is_plugin=False, incoming=True, outgoing=False, ignore_edited=True, privates_only=True)
-async def process_pm_captcha(client: Client, message: Message):
- # 忽略联系人、认证消息、机器人消息
- if message.from_user.is_contact or message.from_user.is_verified or message.chat.type == ChatType.BOT:
+async def chat_listener(msg: Message):
+ user_id = msg.chat.id
+ # 忽略联系人、认证消息、机器人消息、已验证用户
+ if (msg.from_user.is_contact or msg.from_user.is_verified or
+ msg.chat.type == ChatType.BOT or whitelist.check_id(user_id)):
return
- cid = message.chat.id
data = sqlite.get("pmcaptcha", {})
- if data.get('disable', False) and not captcha_success.check_id(cid):
- await message.reply('对方已设置禁止私聊,您已被封禁\n\nThe recipient is blocking all private messages. You are now blocked.')
- await do_action_and_read(client, cid, data)
- return
- if premium := data.get("premium", False):
- if premium=="only" and message.from_user.is_premium==False:
- await message.reply('对方已设置仅Telegram Premium用户可私聊,您已被封禁\n\nThe recipient is that only Telegram Premium user can send private messages. You are now blocked.')
- await do_action_and_read(client, cid, data)
+ # Disable PM
+ if data.get('disable', False):
+ return await punish(user_id, "disable_pm_enabled")
+ # Premium
+ if premium := data.get("premium"):
+ if premium == "only" and not msg.from_user.is_premium:
+ return await punish(user_id, "premium_only")
+ elif not msg.from_user.is_premium:
+ pass
+ elif premium == "ban":
+ return await punish(user_id, "premium_ban")
+ elif premium == "allow":
return
- if premium=="ban" and message.from_user.is_premium==True:
- await message.reply('对方已设置禁止Telegram Premium用户私聊,您已被封禁\n\nThe recipient is blocking all private messages from Telegram Premium users. You are now blocked.')
- await do_action_and_read(client, cid, data)
- return
- if premium=="allow" and message.from_user.is_premium==True:
- return
- if (
- not captcha_success.check_id(cid)
- and sqlite.get(f"pmcaptcha.{str(cid)}") is None
- ):
- if data.get("whitelist", False) and message.text is not None: #白名单
- for i in data.get("whitelist", "").split(","):
- if i in message.text:
- return captcha_success.add_id(cid)
- if data.get("blacklist", False) and message.text is not None: #黑名单
- for i in data.get("blacklist", "").split(","):
- if i in message.text:
- await message.reply('您触犯了黑名单规则,已被封禁\n\nYou are blocked because of a blacklist violation')
- await do_action_and_read(client, cid, data)
- if data.get("collect",False):
- await collect_imformation(client,message)
- return
+ # Whitelist / Blacklist
+ if msg.text is not None:
+ if array := data.get("whitelist"):
+ for word in array.split(","):
+ if word in msg.text:
+ return whitelist.add_id(user_id)
+ if array := data.get("blacklist"):
+ for word in array.split(","):
+ if word in msg.text:
+ reason_code = "blacklist_triggered"
+ await punish(user_id, reason_code)
+ # Collect logs
+ return await CaptchaChallenge("", msg.from_user, False, [msg.text]).send_log(reason_code)
+ # Captcha
+ captcha_challenges = {
+ "math": MathChallenge,
+ "img": ImageChallenge
+ }
+ if sqlite.get(f"pmcaptcha.challenge.{user_id}") and not curr_captcha.get(user_id) or not curr_captcha.get(user_id):
+ if (last_captcha := sqlite.get(f"pmcaptcha.challenge.{user_id}")) and not curr_captcha.get(user_id):
+ # Resume last captcha challenge
+ if last_captcha["type"] not in captcha_challenges:
+ return await log("Failed to resume last captcha challenge: "
+ f"Unknown challenge type {last_captcha['type']}")
+ return await captcha_challenges[last_captcha["type"]].resume(msg, last_captcha)
+ # Start a captcha challenge
+ try:
+ await bot.invoke(UpdateNotifySettings(
+ peer=InputNotifyPeer(peer=await bot.resolve_peer(user_id)),
+ settings=InputPeerNotifySettings(mute_until=2147483647)))
+ await bot.archive_chats(user_id)
+ except: # noqa
+ pass
+ # Send captcha
+ captcha_type = data.get("type", "math")
+ captcha = captcha_challenges.get(captcha_type, MathChallenge)(msg.from_user)
+ captcha.log_msg(msg.text)
+ captcha = await captcha.start() or captcha
+ curr_captcha[user_id] = captcha
+ elif (captcha := curr_captcha.get(user_id)) and captcha.input: # Verify answer
+ captcha.log_msg(msg.text)
+ if await captcha.verify(msg.text):
+ await msg.safe_delete()
+ del curr_captcha[user_id]
- with contextlib.suppress(Exception):
- await client.invoke(UpdateNotifySettings(peer=InputNotifyPeer(peer=await client.resolve_peer(cid)),
- settings=InputPeerNotifySettings(silent=True)))
- await asyncio.sleep(random.randint(0, 100) / 1000)
- await client.read_chat_history(cid)
- await asyncio.sleep(random.randint(0, 100) / 1000)
- await client.archive_chats(chat_ids=cid)
- wait = data.get("wait", 30)
- key1 = random.randint(1, 10)
- key2 = random.randint(1, 10)
- sqlite[f'pmcaptcha.{str(cid)}'] = str(key1 + key2)
- if wait!="已关闭":
- msg = await message.reply(
- '已启用私聊验证。请发送 \"' + str(key1) + '+' + str(key2) + '\" 的答案(阿拉伯数字)来与我私聊\n请在' + str(wait) +
- '秒内完成验证。您只有一次验证机会\n\nPlease answer the following question to prove you are human: \"' +
- str(key1) + '+' + str(key2) + '\"\nYou have ' + str(wait) +
- ' seconds and only one chance to answer.')
- await asyncio.sleep(wait)
- await msg.safe_delete() # noqa
- if sqlite.get(f'pmcaptcha.{str(cid)}') is not None:
- del sqlite[f'pmcaptcha.{str(cid)}']
- await message.reply('验证超时,您已被封禁\n\nYou failed provide an answer in time. You are now blocked.')
- await do_action_and_read(client, cid, data)
- if data.get("collect",False):
- await collect_imformation(client,message)
- else:
- await message.reply(
- '已启用私聊验证。请发送 \"' + str(key1) + '+' + str(key2) + '\" 的答案(阿拉伯数字)来与我私聊。\
- 您只有一次验证机会\n\nPlease answer the following question to prove you are human: \"' +
- str(key1) + '+' + str(key2) + '\"\nYou have only one chance to answer.')
- elif sqlite.get(f"pmcaptcha.{str(cid)}"):
- if message.text == sqlite.get(f"pmcaptcha.{str(cid)}"):
- await message.safe_delete()
- del sqlite[f'pmcaptcha.{str(cid)}']
- captcha_success.add_id(cid)
- with contextlib.suppress(Exception):
- await client.invoke(UpdateNotifySettings(peer=InputNotifyPeer(peer=await client.resolve_peer(cid)),
- settings=InputPeerNotifySettings(silent=False)))
- await asyncio.sleep(random.randint(0, 100) / 1000)
- await message.reply(data.get("Welcome", "验证通过\n\nYou have passed the captcha."))
- await asyncio.sleep(random.randint(0, 100) / 1000)
- await client.unarchive_chats(chat_ids=cid)
- data['pass'] = data.get('pass', 0) + 1
- sqlite['pmcaptcha'] = data
- else:
- del sqlite[f'pmcaptcha.{str(cid)}']
- await message.reply('验证错误,您已被封禁\n\nYou provided an incorrect answer. You are now blocked.')
- await do_action_and_read(client, cid, data)
- if data.get("collect",False):
- await collect_imformation(client,message)
-@listener(is_plugin=True, outgoing=True, command="pmcaptcha",
+
+@listener(is_plugin=True, outgoing=True,
+ command=cmd_name, parameters=f"<{lang('vocab_cmd')}> [{lang('cmd_param')}]",
need_admin=True,
- description='一个简单的私聊人机验证 请使用 ```,pmcaptcha h``` 查看可用命令')
-async def pm_captcha(client: Client, message: Message):
- cid_ = str(message.chat.id)
- data = sqlite.get("pmcaptcha", {})
- if len(message.parameter) == 0:
- if message.chat.type != ChatType.PRIVATE:
- await message.edit('请在私聊时使用此命令,或添加参数执行')
- await asyncio.sleep(3)
- await message.safe_delete()
- text = "已验证用户" if captcha_success.check_id(message.chat.id) else "未验证/验证中用户"
- await message.edit(text)
- elif len(message.parameter) == 1:
- if message.parameter[0] == "bl":
- await message.edit(
- '当前黑名单规则:\n' + str(data.get('blacklist', '无')) + '\n如需编辑,请使用 ,pmcaptcha bl +关键词(英文逗号分隔)')
- if message.parameter[0] == "wl":
- await message.edit(
- '当前白名单规则:\n' + str(data.get('whitelist', '无')) + '\n如需编辑,请使用 ,pmcaptcha wl +关键词(英文逗号分隔)')
- elif message.parameter[0] == 'wel':
- await message.edit(
- '当前通过时消息规则:\n' + str(data.get('welcome', '无')) + '\n如需编辑,请使用 ,pmcaptcha wel +要发送的消息')
- elif message.parameter[0] == 'wait':
- await message.edit(
- '当前验证等待时间(秒): ' + str(data.get('wait', '30')) + '\n如需编辑,请使用 ,pmcaptcha wait +等待秒数(整数)或使用 ,pmcaptcha wait off 关闭该功能')
- elif message.parameter[0] == 'h':
- await message.edit(pm_captcha_help_msg)
- if message.chat.type != ChatType.PRIVATE:
- await asyncio.sleep(5)
- return await message.safe_delete()
- elif message.parameter[0] == 'disablepm':
- status = '开启' if data.get('disable', False) else '关闭'
- await message.edit(
- (
- (
- f'当前禁止私聊状态: 已{status}'
- + '\n如需修改 请使用 ,pmcaptcha disablepm true/false'
- )
- + '\n此功能会放行联系人和白名单(已通过验证)用户'
- )
- )
- elif message.parameter[0]=="collect":
- status = '开启' if data.get('collect', False) else '关闭'
- await message.edit(
- (
- (
- f'当前收集验证错误信息状态: {status}'
- )
- + '\n此功能仅会通过 @CloudreflectionPmcaptchabot 收集未通过验证者的首条消息,id和用户名且不会提供给第三方(@LivegramBot 除外)。收集的信息将用于pmcaptcha改进,开启或关闭此功能不影响 pmcaptcha 的使用'
- )
- )
- elif message.parameter[0] == 'stats':
- await message.edit('自上次重置起,已进行验证 ' + str(data.get('pass', 0) + data.get('banned', 0)) +
- ' 次\n其中,通过验证 ' + str(data.get('pass', 0)) + ' 次,拦截 ' + str(data.get('banned', 0)) + ' 次')
- elif message.parameter[0] == 'premium':
- premium_action={"allow":"不验证Premium用户私聊","ban":"禁止Premium用户私聊","only":"仅允许Premium用户私聊","none":"无操作"}
- await message.edit(
- '当前对Premium用户的操作为: '+ premium_action.get(data.get("premium","none"))+'\n如需编辑,请使用 ,pmcaptcha premium [allow/ban/only/none] 修改')
- elif message.chat.type != ChatType.PRIVATE:
- await message.edit('请在私聊时使用此命令,或添加id参数执行')
- await asyncio.sleep(3)
- await message.safe_delete()
- elif message.parameter[0] == 'add':
- await message.edit(f'已将id {cid_} 添加至白名单')
- captcha_success.add_id(message.chat.id)
- elif message.parameter[0] == 'del':
- if captcha_success.del_id(message.chat.id):
- await message.edit(f'已删除id {cid_} 的验证记录')
- else:
- await message.edit('记录不存在')
- else:
- await message.edit('参数错误')
- elif message.parameter[0] == 'add':
- if message.parameter[1].isnumeric():
- await message.edit(f'已将id {message.parameter[1]} 添加至白名单')
- captcha_success.add_id(int(message.parameter[1]))
- await client.unarchive_chats(chat_ids=int(message.parameter[1]))
- else:
- await message.edit('参数错误')
- elif message.parameter[0] == 'del':
- if message.parameter[1].isnumeric():
- if captcha_success.del_id(int(message.parameter[1])):
- await message.edit(f'已删除id {message.parameter[1]} 的验证记录')
- else:
- await message.edit('记录不存在')
- else:
- await message.edit('参数错误')
- elif message.parameter[0] == 'wel':
- if message.parameter[1] == '-clear':
- if data.get("welcome", False):
- del data["welcome"]
- sqlite["pmcaptcha"] = data
- await message.edit('已恢复至默认规则')
- return
- data["welcome"] = " ".join(message.parameter[1:])
- sqlite["pmcaptcha"] = data
- await message.edit('规则已更新')
- elif message.parameter[0] == 'wait':
- if message.parameter[1]=="off":
- data["wait"] = "已关闭"
- sqlite["pmcaptcha"] = data
- await message.edit('已关闭验证时间限制')
- elif message.parameter[1].isnumeric():
- data["wait"] = int(message.parameter[1])
- sqlite["pmcaptcha"] = data
- await message.edit('等待时间已更新')
- else:
- await message.edit('错误:不是整数')
- elif message.parameter[0] == 'bl':
- if message.parameter[1] == '-clear':
- if data.get("blacklist", False):
- del data["blacklist"]
- sqlite["pmcaptcha"] = data
- await message.edit('规则列表已清空')
- return
- data["blacklist"] = " ".join(message.parameter[1:])
- sqlite["pmcaptcha"] = data
- await message.edit('规则已更新')
- elif message.parameter[0] == 'wl':
- if message.parameter[1] == '-clear':
- if data.get("whitelist", False):
- del data["whitelist"]
- sqlite["pmcaptcha"] = data
- await message.edit('规则列表已清空')
- return
- data["whitelist"] = " ".join(message.parameter[1:])
- sqlite["pmcaptcha"] = data
- await message.edit('规则已更新')
- elif message.parameter[0] == 'check':
- if message.parameter[1].isnumeric():
- if captcha_success.check_id(int(message.parameter[1])):
- await message.edit(f'id {message.parameter[1]} 已验证')
- else:
- await message.edit(f'id {message.parameter[1]} 未验证')
- else:
- await message.edit('未知用户/无效id')
- elif message.parameter[0] == 'disablepm':
- if message.parameter[1] == 'true':
- data["disable"] = True
- sqlite["pmcaptcha"] = data
- await message.edit('已禁止非白名单和联系人私聊\n您可以使用 ,pmcaptcha disablepm false 重新启用私聊')
- elif message.parameter[1] == 'false':
- data["disable"] = False
- sqlite["pmcaptcha"] = data
- await message.edit('已关闭禁止私聊,人机验证仍会工作')
- elif message.parameter[0] == 'stats' and message.parameter[1] == '-clear':
- data["pass"] = 0
- data["banned"] = 0
- sqlite["pmcaptcha"] = data
- await message.edit('已重置计数器')
- elif message.parameter[0] == 'action':
- if message.parameter[1] == 'ban':
- data["action"] = 'ban'
- sqlite["pmcaptcha"] = data
- await message.edit('验证失败后将执行**封禁**操作')
- elif message.parameter[1] == 'delete':
- data["action"] = 'delete'
- sqlite["pmcaptcha"] = data
- await message.edit('验证失败后将执行**封禁和删除**会话操作')
- elif message.parameter[1] == 'archive':
- data["action"] = 'archive'
- sqlite["pmcaptcha"] = data
- await message.edit('验证失败后将执行**封禁和归档**会话操作')
- elif message.parameter[1] == 'none':
- data["action"] = 'none'
- sqlite["pmcaptcha"] = data
- await message.edit('验证失败后将不执行任何操作')
- else:
- await message.edit('参数错误。')
- elif message.parameter[0]=="premium":
- if message.parameter[1] == "allow":
- data["premium"] = 'allow'
- sqlite["pmcaptcha"] = data
- await message.edit('将不对 Telegram Premium 用户发起验证')
- elif message.parameter[1] == "ban":
- data["premium"] = 'ban'
- sqlite["pmcaptcha"] = data
- await message.edit('将禁止 Telegram Premium 用户私聊')
- elif message.parameter[1] == "only":
- data["premium"] = 'only'
- sqlite["pmcaptcha"] = data
- await message.edit('将**仅允许** Telegram Premium 用户私聊')
- elif message.parameter[1] == "none":
- del data["premium"]
- sqlite["pmcaptcha"] = data
- await message.edit('将不对 Telegram Premium 用户执行额外操作')
- elif message.parameter[0]=="collect":
- if message.parameter[1] == "y":
- data['collect']=True
- sqlite["pmcaptcha"] = data
- await message.edit('已开启验证错误信息收集,感谢您的支持')
- elif message.parameter[1] == "n":
- del data['collect']
- sqlite["pmcaptcha"] = data
- await message.edit('已关闭验证错误信息收集')
+ description=lang("plugin_desc") % code(f',{cmd_name} h'))
+async def cmd_entry(msg: Message):
+ cmd = len(msg.parameter) > 0 and msg.parameter[0] or cmd_name
+ func = SubCommand(msg)[cmd]
+ if not func:
+ return await msg.edit_text(f"{lang('cmd_not_found')}: {code(cmd)}", parse_mode=ParseMode.HTML)
+ args_len = None if inspect.getfullargspec(func).varargs else len(inspect.getfullargspec(func).args)
+ await func(*(len(msg.parameter) > 1 and msg.parameter[1:args_len] or [None] * ((args_len or -1) - 1)))