PMCaptcha LTS Bug Fix Update

- 修复黑白名单无法正常运作
- 修复图片验证无法正常回退验证
This commit is contained in:
Sam 2023-07-30 22:24:39 +08:00 committed by GitHub
parent a8f1129d72
commit 20bae7c6df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,13 +4,22 @@ v1 by xtaodata and cloudreflection
v2 by Sam
"""
import re, gc, time, html, asyncio, inspect, traceback, json
from io import BytesIO
import asyncio
import gc
import html
import inspect
import json
import re
import time
import traceback
from base64 import b64decode, b64encode
from dataclasses import dataclass, field
from io import BytesIO
from random import randint
from typing import Optional, Callable, Union, List, Any, Dict, Coroutine
from base64 import b64decode, b64encode
from pyrogram.enums.chat_type import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.errors import (
FloodWait,
AutoarchiveNotAvailable,
@ -18,23 +27,21 @@ from pyrogram.errors import (
BotResponseTimeout,
PeerIdInvalid,
)
from pyrogram.raw.functions.channels import UpdateUsername
from pyrogram.raw.types import GlobalPrivacySettings
from pyrogram.raw.functions import messages
from pyrogram.raw.functions.account import (
SetGlobalPrivacySettings,
GetGlobalPrivacySettings,
)
from pyrogram.enums.chat_type import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.raw.functions import messages
from pyrogram.raw.functions.channels import UpdateUsername
from pyrogram.raw.types import GlobalPrivacySettings
from pyrogram.types import User, Sticker
from pagermaid import bot, logs
from pagermaid.config import Config
from pagermaid.sub_utils import Sub
from pagermaid.utils import Message, alias_command
from pagermaid.listener import listener
from pagermaid.single_utils import sqlite
from pagermaid.sub_utils import Sub
from pagermaid.utils import Message, alias_command
cmd_name = "pmcaptcha"
@ -65,7 +72,7 @@ def get_version():
from json import load
with open(
f"{working_dir}{sep}plugins{sep}version.json", "r", encoding="utf-8"
f"{working_dir}{sep}plugins{sep}version.json", "r", encoding="utf-8"
) as f:
version_json = load(f)
return version_json.get(cmd_name, "unknown")
@ -283,7 +290,7 @@ class Command:
cmd_args = self.msg.parameter[1:args_len]
func_args = []
for index, arg_type in enumerate(
tuple(full_arg_spec.annotations.values())
tuple(full_arg_spec.annotations.values())
): # Check arg type
if args_len is None:
func_args = cmd_args
@ -292,14 +299,14 @@ class Command:
if getattr(arg_type, "__origin__", None) == Union:
NoneType = type(None)
if (
len(arg_type.__args__) != 2
or arg_type.__args__[1] is not NoneType
len(arg_type.__args__) != 2
or arg_type.__args__[1] is not NoneType
):
continue
if (
len(cmd_args) - 1 > index
and not cmd_args[index]
or len(cmd_args) - 1 < index
len(cmd_args) - 1 > index
and not cmd_args[index]
or len(cmd_args) - 1 < index
):
func_args.append(None)
continue
@ -396,17 +403,17 @@ class Command:
if name.startswith("_"):
continue
if (
result := re.search(self.alias_rgx, func.__doc__ or "")
result := re.search(self.alias_rgx, func.__doc__ or "")
) and alias_name in result[1].replace(" ", "").split(","):
return func if ret_type == "func" else name
async def _display_value(
self,
*,
key: Optional[str] = None,
display_text: str,
sub_cmd: str,
value_type: str,
self,
*,
key: Optional[str] = None,
display_text: str,
sub_cmd: str,
value_type: str,
):
text = [
display_text,
@ -420,7 +427,7 @@ class Command:
# Set On / Off Boolean
async def _set_toggle(self, key: str, toggle: str, *, reverse: bool = False):
if (toggle := toggle.lower()[0]) not in ("y", "n", "t", "f", "1", "0") and (
toggle := toggle.lower()
toggle := toggle.lower()
) not in ("on", "off"):
return await self.help(key)
toggle = toggle in ("y", "t", "1", "on")
@ -430,9 +437,9 @@ class Command:
async def _get_user_id(self, user_id: Union[str, int]) -> Optional[int]:
if (
not user_id
and not self.msg.reply_to_message_id
and self.msg.chat.type != ChatType.PRIVATE
not user_id
and not self.msg.reply_to_message_id
and self.msg.chat.type != ChatType.PRIVATE
):
return
try:
@ -442,15 +449,15 @@ class Command:
pass
user = None
user_id = user_id or (
self.msg.reply_to_message
and self.msg.reply_to_message.from_user.id
or self.msg.chat.id
self.msg.reply_to_message
and self.msg.reply_to_message.from_user.id
or self.msg.chat.id
)
try:
if (
not user_id
or not (user := await bot.get_users(user_id))
or (user.is_bot or user.is_verified or user.is_deleted)
not user_id
or not (user := await bot.get_users(user_id))
or (user.is_bot or user.is_verified or user.is_deleted)
):
return
except (ValueError, PeerIdInvalid):
@ -584,8 +591,8 @@ class Command:
continue
help_msg.append(
(
code(f",{user_cmd_name} {self._get_cmd_with_param(name)}".strip())
+ f"\n· {re.search(r'(.+)', func.__doc__ or '')[1].strip()}\n"
code(f",{user_cmd_name} {self._get_cmd_with_param(name)}".strip())
+ f"\n· {re.search(r'(.+)', func.__doc__ or '')[1].strip()}\n"
)
)
await self._edit("\n".join(help_msg + footer))
@ -612,7 +619,7 @@ class Command:
if not (user_id := await self._get_user_id(_id)):
return await self._edit(lang("invalid_user_id"))
if captcha := curr_captcha.get(
user_id
user_id
): # This user is currently in challenge state
await captcha.action(True)
if curr_captcha.get(user_id):
@ -650,12 +657,12 @@ class Command:
return await self._edit(lang("invalid_user_id"))
captcha = None
if (state := setting.get_challenge_state(user_id)) or (
captcha := curr_captcha.get(user_id)
captcha := curr_captcha.get(user_id)
):
await CaptchaTask.archive(user_id, un_archive=True)
try:
(
captcha and captcha.type or state.get("type", "math")
captcha and captcha.type or state.get("type", "math")
) == "img" and await bot.unblock_user(user_id)
except Exception as e:
console.error(
@ -737,7 +744,7 @@ class Command:
if seconds is None:
return await self._display_value(
display_text=lang("timeout_curr_rule")
% int(setting.get(key_name, default_timeout_time)),
% int(setting.get(key_name, default_timeout_time)),
sub_cmd="wait",
value_type="vocab_int",
)
@ -760,7 +767,7 @@ class Command:
if not toggle:
return await self._display_value(
display_text=lang("disable_pm_curr_rule")
% lang("enabled" if setting.get("disable") else "disabled"),
% lang("enabled" if setting.get("disable") else "disabled"),
sub_cmd="disable_pm",
value_type="vocab_bool",
)
@ -825,7 +832,7 @@ class Command:
if not toggle:
return await self._display_value(
display_text=lang("report_curr_rule")
% lang("enabled" if setting.get("report", True) else "disabled"),
% lang("enabled" if setting.get("report", True) else "disabled"),
sub_cmd="report",
value_type="vocab_bool",
)
@ -913,7 +920,7 @@ class Command:
if not toggle:
return await self._display_value(
display_text=lang("initiative_curr_rule")
% lang("enabled" if setting.get("initiative", True) else "disabled"),
% lang("enabled" if setting.get("initiative", True) else "disabled"),
sub_cmd="initiative",
value_type="vocab_bool",
)
@ -930,7 +937,7 @@ class Command:
if not toggle:
return await self._display_value(
display_text=lang("silent_curr_rule")
% lang("enabled" if setting.get("silent") else "disabled"),
% lang("enabled" if setting.get("silent") else "disabled"),
sub_cmd="quiet",
value_type="vocab_bool",
)
@ -985,7 +992,7 @@ class Command:
if not toggle:
return await self._display_value(
display_text=lang("flood_username_curr_rule")
% lang("enabled" if setting.get("flood_username") else "disabled"),
% lang("enabled" if setting.get("flood_username") else "disabled"),
sub_cmd="flood_username",
value_type="vocab_bool",
)
@ -1009,7 +1016,7 @@ class Command:
if not action:
return await self._display_value(
display_text=lang("flood_act_curr_rule")
% lang(f"flood_act_set_{setting.get('flood_act', 'delete')}"),
% lang(f"flood_act_set_{setting.get('flood_act', 'delete')}"),
sub_cmd="flood_act",
value_type="vocab_action",
)
@ -1092,7 +1099,7 @@ class Command:
if not _type:
return await self._display_value(
display_text=lang("type_curr_rule")
% lang(f'type_captcha_{setting.get("type", "math")}'),
% lang(f'type_captcha_{setting.get("type", "math")}'),
sub_cmd="typ",
value_type="type_param_name",
)
@ -1109,26 +1116,26 @@ class Command:
settings_text = []
text_none = bold(lang("none"))
for key, default in (
("whitelist", text_none),
("blacklist", text_none),
("timeout", 300 if setting.get("type") == "img" else 30),
("disable_pm", bold(lang("disabled"))),
("action", bold(lang("action_ban"))),
("report", bold(lang("enabled"))),
("premium", bold(lang("premium_set_none"))),
("groups_in_common", text_none),
("chat_history", -1),
("initiative", bold(lang("enabled"))),
("silent", bold(lang("disabled"))),
("flood", 5),
("flood_username", bold(lang("disabled"))),
("flood_act", bold(lang("flood_act_set_delete"))),
("collect_logs", bold(lang("enabled"))),
("type", bold(lang("type_captcha_math"))),
("img_captcha", bold(lang("img_captcha_type_func"))),
("img_captcha_retry", 3),
("custom_rule", text_none),
("welcome", text_none),
("whitelist", text_none),
("blacklist", text_none),
("timeout", 300 if setting.get("type") == "img" else 30),
("disable_pm", bold(lang("disabled"))),
("action", bold(lang("action_ban"))),
("report", bold(lang("enabled"))),
("premium", bold(lang("premium_set_none"))),
("groups_in_common", text_none),
("chat_history", -1),
("initiative", bold(lang("enabled"))),
("silent", bold(lang("disabled"))),
("flood", 5),
("flood_username", bold(lang("disabled"))),
("flood_act", bold(lang("flood_act_set_delete"))),
("collect_logs", bold(lang("enabled"))),
("type", bold(lang("type_captcha_math"))),
("img_captcha", bold(lang("img_captcha_type_func"))),
("img_captcha_retry", 3),
("custom_rule", text_none),
("welcome", text_none),
):
lang_text = lang(f"{key}_curr_rule")
# Timeout (rule: timeout, value: [multiple])
@ -1229,7 +1236,7 @@ class Command:
if not _type:
return await self._display_value(
display_text=lang("type_curr_rule")
% lang(f'img_captcha_type_{setting.get("img_type", "func")}'),
% lang(f'img_captcha_type_{setting.get("img_type", "func")}'),
sub_cmd="img_typ",
value_type="type_param_name",
)
@ -1247,7 +1254,7 @@ class Command:
if number is None:
return await self._display_value(
display_text=lang("img_captcha_retry_curr_rule")
% setting.get("img_max_retry", 3),
% setting.get("img_max_retry", 3),
sub_cmd="img_re",
value_type="vocab_int",
)
@ -1256,6 +1263,8 @@ class Command:
setting.set("img_max_retry", number)
await self._edit(lang("img_captcha_retry_set") % number)
# Web Configure (Sam: I'm not touching this)
async def web_configure(self, config: Optional[str]):
"""PMCaptcha 网页可视化配置
@ -1316,14 +1325,14 @@ class TheOrder:
if not await exec_api(bot.block_user(user_id=target)):
console.debug(f"Failed to block user {target}")
if action == "delete" and not await exec_api(
bot.invoke(
messages.DeleteHistory(
just_clear=False,
revoke=False,
peer=await bot.resolve_peer(target),
max_id=0,
bot.invoke(
messages.DeleteHistory(
just_clear=False,
revoke=False,
peer=await bot.resolve_peer(target),
max_id=0,
)
)
)
):
console.debug(f"Failed to delete user chat {target}")
setting.pending_ban_list.del_id(target)
@ -1334,20 +1343,20 @@ class TheOrder:
chat_link = gen_link(str(target), f"tg://user?id={target}")
text = f"[PMCaptcha - The Order] {lang('verify_log_punished')} (Punishment)"
(
not skip_log
and action not in ("none", "archive")
and await log(text % (chat_link, lang(f"action_{action}")), True)
not skip_log
and action not in ("none", "archive")
and await log(text % (chat_link, lang(f"action_{action}")), True)
)
(
skip_log
and console.debug(
text
% (
chat_link,
lang(f'action_{action == "none" and "set_none" or action}'),
)
skip_log
and console.debug(
text
% (
chat_link,
lang(f'action_{action == "none" and "set_none" or action}'),
)
)
)
except asyncio.CancelledError:
break
except Exception as e:
@ -1558,7 +1567,7 @@ class TheWorldEye:
# A user is challenged less than a min
self.level += 1
elif (
not self.last_challenge_time or now - self.last_challenge_time > 60
not self.last_challenge_time or now - self.last_challenge_time > 60
):
self.level = 1
self.last_challenge_time = now
@ -1730,8 +1739,8 @@ class CaptchaTask:
notify_setting = InputPeerNotifySettings(
**{
"mute_until": None if un_archive else 2147483647,
"show_previews": True if un_archive else None,
"silent": False if un_archive else None,
"show_previews": un_archive,
"silent": un_archive,
}
)
peer = InputNotifyPeer(peer=await bot.resolve_peer(user_id))
@ -1747,7 +1756,7 @@ class CaptchaTask:
can_report = True
auto_archived = False
if peer_settings := await exec_api(
bot.invoke(messages.GetPeerSettings(peer=await bot.resolve_peer(user_id)))
bot.invoke(messages.GetPeerSettings(peer=await bot.resolve_peer(user_id)))
):
can_report = peer_settings.settings.report_spam
auto_archived = peer_settings.settings.autoarchived
@ -1765,7 +1774,7 @@ class CaptchaTask:
if can_report is None or auto_archived is None:
can_report, auto_archived = await self.get_user_settings(user_id)
if (
last_captcha := setting.get_challenge_state(user_id)
last_captcha := setting.get_challenge_state(user_id)
) and not curr_captcha.get(user_id):
# Resume last captcha challenge
if last_captcha["type"] not in captcha_challenges:
@ -1797,19 +1806,19 @@ class CaptchaTask:
user_id and self.queue.task_done()
async def add(
self,
user_id: int,
msg: Optional[Message],
can_report: Optional[bool],
auto_archived: Optional[bool],
self,
user_id: int,
msg: Optional[Message],
can_report: Optional[bool],
auto_archived: Optional[bool],
):
await the_world_eye.add_synchronize(user_id)
if not self.task or self.task.done():
self.task = asyncio.create_task(self.worker())
if not (
setting.pending_challenge_list.check_id(user_id)
or curr_captcha.get(user_id)
or setting.get_challenge_state(user_id)
setting.pending_challenge_list.check_id(user_id)
or curr_captcha.get(user_id)
or setting.get_challenge_state(user_id)
):
setting.pending_challenge_list.add_id(user_id)
self.queue.put_nowait((user_id, msg, can_report, auto_archived))
@ -1847,6 +1856,7 @@ class CaptchaChallenge:
log_file = BytesIO(json.dumps(self.logs, indent=4).encode())
log_file.name = f"{user.id}_{self.captcha_start}.json"
caption = [
f"FROM: {code(str(bot.me.id))}",
f"UID: {code(str(user.id))}"
+ (f" @{user.username}" if self.user.username else ""),
f"Mention: {gen_link(str(user.id), f'tg://user?id={user.id}')}",
@ -1875,21 +1885,21 @@ class CaptchaChallenge:
self.captcha_start and caption.append(f"Start: {code(str(self.captcha_start))}")
self.captcha_end and caption.append(f"End: {code(str(self.captcha_end))}")
(
self.captcha_start
and self.captcha_end
and caption.append(
f"Duration: {code(str(self.captcha_end - self.captcha_start))}s"
)
self.captcha_start
and self.captcha_end
and caption.append(
f"Duration: {code(str(self.captcha_end - self.captcha_start))}s"
)
)
await exec_api(bot.archive_chats(log_collect_bot))
await exec_api(bot.unblock_user(log_collect_bot))
if not await exec_api(
bot.send_document(
log_collect_bot,
log_file,
caption="\n".join(caption),
parse_mode=ParseMode.HTML,
)
bot.send_document(
log_collect_bot,
log_file,
caption="\n".join(caption),
parse_mode=ParseMode.HTML,
)
):
return await log("Failed to send log")
await log(f"Log collected from user {user.id}")
@ -1967,11 +1977,11 @@ class CaptchaChallenge:
for challenge_msg_id in self.challenge_msg_ids:
await bot.delete_messages(self.user.id, challenge_msg_id)
(
self.can_report
and setting.get("report", True)
and await bot.invoke(
messages.ReportSpam(peer=await bot.resolve_peer(self.user.id))
)
self.can_report
and setting.get("report", True)
and await bot.invoke(
messages.ReportSpam(peer=await bot.resolve_peer(self.user.id))
)
)
except Exception as e:
console.debug(
@ -2013,12 +2023,12 @@ class CaptchaChallenge:
def reset_timer(self, timeout: Optional[int] = None):
self.timer_task and self.timer_task.cancel()
timeout = (
timeout is not None
and timeout
or setting.get(
f"{self.type == 'img' and 'img_' or ''}timeout",
self.type == "img" and 300 or 30,
)
timeout is not None
and timeout
or setting.get(
f"{self.type == 'img' and 'img_' or ''}timeout",
self.type == "img" and 300 or 30,
)
)
if timeout > 0:
self.timer_task = asyncio.create_task(self._challenge_timer(timeout))
@ -2067,7 +2077,7 @@ class MathChallenge(CaptchaChallenge):
captcha.reset_timer(timeout - time_passed)
await super(MathChallenge, captcha).resume(user=user, msg=msg, state=state)
async def start(self):
async def start(self, previous_msg_id: Optional[int] = None):
if self.captcha_write_lock.locked():
return
async with self.captcha_write_lock:
@ -2077,23 +2087,24 @@ class MathChallenge(CaptchaChallenge):
timeout = setting.get("timeout", 30)
operator = random.choice(("+", "-", "*"))
expression = f"{first_value} {operator} {second_value}"
challenge_msg = await exec_api(
bot.send_message(
self.user.id,
"\n".join(
(
lang_full("verify_challenge"),
"",
code(f"{expression} = ?"),
"",
lang_full(
"verify_challenge_timed", timeout if timeout > 0 else ""
),
)
),
parse_mode=ParseMode.HTML,
)
)
params = {
"chat_id": self.user.id,
"text": "\n".join(
(
lang_full("verify_challenge"),
"",
code(f"{expression} = ?"),
"",
lang_full(
"verify_challenge_timed", timeout if timeout > 0 else ""
),
)
),
"parse_mode": ParseMode.HTML,
}
if previous_msg_id:
params["message_id"] = previous_msg_id
challenge_msg = await exec_api((bot.edit_message_text if previous_msg_id else bot.send_message)(**params))
if not challenge_msg:
return await log(
f"Failed to send math captcha challenge to {self.user.id}"
@ -2155,12 +2166,12 @@ class ImageChallenge(CaptchaChallenge):
while True:
try:
if (
not (
result := await bot.get_inline_bot_results(
img_captcha_bot, setting.get("img_type", "func")
not (
result := await bot.get_inline_bot_results(
img_captcha_bot, setting.get("img_type", "func")
)
)
)
or not result.results
or not result.results
):
console.debug(
f"Failed to get captcha results from {img_captcha_bot}, fallback"
@ -2321,11 +2332,12 @@ class Rule:
def _precondition(self) -> bool:
return (
self.user.id in (347437156, 583325201, 1148248480, 751686745)
or self.msg.from_user.is_contact # Skip for PGM/PMC Developers
or self.msg.from_user.is_verified
or self.msg.chat.type == ChatType.BOT
or setting.is_verified(self.user.id)
# Skip for PGM/PMC Developers
self.user.id in (347437156, 583325201, 1148248480, 751686745, 676660002)
or self.msg.from_user.is_contact
or self.msg.from_user.is_verified
or self.msg.chat.type == ChatType.BOT
or setting.is_verified(self.user.id)
)
def _get_text(self) -> str:
@ -2346,12 +2358,12 @@ class Rule:
docs = func.__doc__ or ""
try:
if not name.startswith("_") and (
"outgoing" in docs
and outgoing
and await func()
or "outgoing" not in docs
and not self.user.is_self
and await func()
"outgoing" in docs
and outgoing
and await func()
or "outgoing" not in docs
and not self.user.is_self
and await func()
):
console.debug(
f"Rule triggered: `{name}` (user: {self.user.id} chat: {self.msg.chat.id})"
@ -2388,9 +2400,9 @@ class Rule:
async def user_defined(self) -> bool:
if custom_rule := setting.get("custom_rule"):
try:
exec(f"async def _(msg, text, user, me):\n return {custom_rule}")
exec(f"async def _(msg, text, user, me, bot):\n return {custom_rule}")
return bool(
await locals()["_"](self.msg, self._get_text(), self.user, bot.me)
await locals()["_"](self.msg, self._get_text(), self.user, bot.me, bot)
)
except Exception as e:
await log(
@ -2410,7 +2422,7 @@ class Rule:
if (history_count := setting.get("history_count", -1)) > 0:
count = 0
async for msg in bot.get_chat_history(
self.user.id, limit=history_count + 1
self.user.id, limit=history_count + 1
):
if msg.id != self.msg.id:
count += 1
@ -2424,7 +2436,7 @@ class Rule:
if (common_groups := setting.get("groups_in_common")) is not None:
if user_full := await exec_api(
bot.invoke(GetFullUser(id=await bot.resolve_peer(self.user.id)))
bot.invoke(GetFullUser(id=await bot.resolve_peer(self.user.id)))
):
if user_full.full_user.common_chats_count >= common_groups:
setting.whitelist.add_id(self.user.id)
@ -2454,13 +2466,13 @@ class Rule:
if text is None:
return False
if array := setting.get("whitelist"):
for word in array.split(","):
for word in array:
if word not in text:
continue
setting.whitelist.add_id(self.user.id)
return True
if array := setting.get("blacklist"):
for word in array.split(","):
for word in array:
if word not in text:
continue
reason_code = "blacklist_triggered"
@ -2485,9 +2497,9 @@ class Rule:
"""name: captcha"""
user_id = self.user.id
if (
setting.get_challenge_state(user_id)
and not curr_captcha.get(user_id)
or not curr_captcha.get(user_id)
setting.get_challenge_state(user_id)
and not curr_captcha.get(user_id)
or not curr_captcha.get(user_id)
):
# Put in challenge queue
await captcha_task.add(
@ -2500,9 +2512,9 @@ class Rule:
"""no_priority"""
user_id = self.user.id
if (
(captcha := curr_captcha.get(user_id))
and captcha.input
and captcha.type == "math"
(captcha := curr_captcha.get(user_id))
and captcha.input
and captcha.type == "math"
):
text = self._get_text()
captcha.log_msg(text)
@ -2514,9 +2526,9 @@ class Rule:
async def verify_sticker_response(self) -> bool:
"""no_priority"""
if (
(captcha := curr_captcha.get(user_id := self.user.id))
and captcha.input
and captcha.type == "sticker"
(captcha := curr_captcha.get(user_id := self.user.id))
and captcha.input
and captcha.type == "sticker"
):
captcha.log_msg(self._get_text())
await captcha.verify(self.msg.sticker) and await self.msg.safe_delete()
@ -2530,14 +2542,14 @@ class Rule:
async def image_captcha_listener(_, msg: Message):
# Ignores non-private chat, not via bot, username not equal to image bot
if (
msg.chat.type != ChatType.PRIVATE
or not msg.via_bot
or msg.via_bot.username != img_captcha_bot
msg.chat.type != ChatType.PRIVATE
or not msg.via_bot
or msg.via_bot.username != img_captcha_bot
):
return
user_id = msg.chat.id
if (
last_captcha := sqlite.get(f"pmcaptcha.challenge.{user_id}")
last_captcha := sqlite.get(f"pmcaptcha.challenge.{user_id}")
) and not curr_captcha.get(user_id):
# Resume last captcha challenge
if last_captcha["type"] != "img":
@ -2567,10 +2579,18 @@ async def image_captcha_listener(_, msg: Message):
# Fallback to selected captcha type
captcha_type = msg.caption.replace("CAPTCHA_FALLBACK", "").strip()
console.debug(f"Image bot return fallback request, fallback to {captcha_type}")
# Unstuck
await bot.unblock_user(user_id)
if captcha := curr_captcha.get(user_id):
captcha.timer_task and captcha.timer_task.cancel()
setting.get_challenge_state(user_id) and setting.del_challenge_state(user_id)
if captcha_type == "math":
captcha = MathChallenge(msg.from_user, captcha.can_report)
await captcha.start()
curr_captcha[user_id] = captcha
new_captcha = MathChallenge(captcha.user, captcha.can_report)
challenge_msg_id = captcha.challenge_msg_ids[0]
await new_captcha.start(challenge_msg_id)
curr_captcha[user_id] = new_captcha
return
@ -2617,7 +2637,7 @@ async def resume_states():
if key.startswith("pmcaptcha.challenge"):
user_id = int(key.split(".")[2])
if user_id not in curr_captcha and (
challenge := captcha_challenges.get(value.get("type"))
challenge := captcha_challenges.get(value.get("type"))
):
# Resume challenge state
try: