mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-16 11:02:56 +00:00
🥵 PMCaptcha v2.25 (#59)
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
This commit is contained in:
parent
f303c5a275
commit
59f8be4416
@ -22,9 +22,7 @@ async def check_self_and_from(message: Message):
|
||||
if not message.from_user:
|
||||
return False
|
||||
data = await bot.get_chat_member(cid, message.from_user.id)
|
||||
if data.status not in [ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.OWNER]:
|
||||
return False
|
||||
return True
|
||||
return data.status in [ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.OWNER]
|
||||
|
||||
|
||||
async def kick_chat_member(cid, uid, only_search: bool = False):
|
||||
|
@ -67,7 +67,7 @@ class JIKIPedia:
|
||||
return self.parse(**req.json())
|
||||
|
||||
def parse(self, **kwargs):
|
||||
self.data = kwargs.get("data", None)
|
||||
self.data = kwargs.get("data")
|
||||
self.definitions = []
|
||||
if self.data:
|
||||
for i in self.data:
|
||||
@ -84,7 +84,7 @@ class JIKIPedia:
|
||||
if len(images) > 0:
|
||||
image = images[0].get("full", {}).get("path", "")
|
||||
self.definitions.append(JIKIPediaDefinition(plaintext, tags, title, item_id, image))
|
||||
self.message = kwargs.get("message", None)
|
||||
self.message = kwargs.get("message")
|
||||
if issubclass(type(self.message), dict):
|
||||
self.message = self.message.get("title", None)
|
||||
|
||||
|
38
pmcaptcha/README.md
Normal file
38
pmcaptcha/README.md
Normal file
@ -0,0 +1,38 @@
|
||||
# PMCaptcha 代码维护指南
|
||||
|
||||
由于插件行数过多,为确保后续维护方便,本核心开发者 (Sam) 将在这里说明插件的基本架构。
|
||||
|
||||
# class (类)
|
||||
|
||||
## 通用
|
||||
|
||||
- Log: 日志类 | 负责发送验证记录
|
||||
- Setting: 设置类 | 负责读写每个设置
|
||||
- Command: 指令类 | 负责用户的指令处理
|
||||
- Rule: 规则类 | 负责运行每个规则来判断该如何处理该
|
||||
|
||||
## Captcha 验证
|
||||
|
||||
- TheOrder: _别名封禁系统_ | 负责进行用户的封禁操作
|
||||
- TheWorldEye: _别名防轰炸系统_ | 负责监控、启用和关闭防轰炸
|
||||
- CaptchaTask: 负责添加用户进行Captcha验证,以及抢先一步屏蔽用户避免收到骚扰
|
||||
- CaptchaChallenge: 验证系统的主类
|
||||
- MathChallenge: 计算验证
|
||||
- ImageChallenge: 图片验证
|
||||
- StickerChallenge: 贴纸验证
|
||||
|
||||
# listener (监听器)
|
||||
|
||||
- image_captcha_listener: 监听被验证用户的图像验证结果
|
||||
- initiative_listener: 监听用户是否主动对话 (以此添加白名单)
|
||||
- chat_listener: 监听被验证用户输入的验证结果 (文字)
|
||||
- cmd_entry: 监听用户输入的指令
|
||||
|
||||
# functions (函数)
|
||||
|
||||
> 这里只记下比较有意义的函数
|
||||
|
||||
- resume_states: 恢复用户的验证状态
|
||||
- lang, lang_full, get_lang_list: 多语言 (i18n)
|
||||
- get_version: 获取 PMCaptcha 的版本号
|
||||
- log: 发送 log 记录 (由于原版的函数不好用所以写了个新的)
|
@ -29,8 +29,7 @@ from pagermaid.single_utils import sqlite
|
||||
|
||||
cmd_name = "pmcaptcha"
|
||||
|
||||
log_collect_bot = "PagerMaid_Sam_Bot"
|
||||
img_captcha_bot = "PagerMaid_Sam_Bot"
|
||||
log_collect_bot = img_captcha_bot = "PagerMaid_Sam_Bot"
|
||||
|
||||
# Get alias for user command
|
||||
user_cmd_name = alias_command(cmd_name)
|
||||
@ -379,10 +378,12 @@ class Command:
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
user = None
|
||||
user_id = user_id or self.msg.reply_to_message_id or (
|
||||
self.msg.chat.type == ChatType.PRIVATE and self.msg.chat.id or 0)
|
||||
if not user_id or not (user := await bot.get_users(user_id)) or (
|
||||
user.is_bot or user.is_verified or user.is_deleted):
|
||||
user_id = user_id or (self.msg.reply_to_message and self.msg.reply_to_message.from_user.id or self.msg.chat.id)
|
||||
try:
|
||||
if not user_id or not (user := await bot.get_users(user_id)) or (
|
||||
user.is_bot or user.is_verified or user.is_deleted):
|
||||
return
|
||||
except (ValueError, PeerIdInvalid):
|
||||
return
|
||||
return user.id
|
||||
|
||||
@ -805,7 +806,7 @@ class Command:
|
||||
"""
|
||||
if not toggle:
|
||||
return await self._display_value(
|
||||
display_text=lang('silent_curr_rule') % lang('enabled' if setting.get('silent', False) else 'disabled'),
|
||||
display_text=lang('silent_curr_rule') % lang('enabled' if setting.get('silent') else 'disabled'),
|
||||
sub_cmd="quiet",
|
||||
value_type="vocab_bool")
|
||||
await self._set_toggle("silent", toggle)
|
||||
@ -858,7 +859,7 @@ class Command:
|
||||
if not toggle:
|
||||
return await self._display_value(
|
||||
display_text=lang('flood_username_curr_rule') % lang(
|
||||
'enabled' if setting.get('flood_username', False) else 'disabled'),
|
||||
'enabled' if setting.get('flood_username') else 'disabled'),
|
||||
sub_cmd="flood_username",
|
||||
value_type="vocab_bool")
|
||||
if toggle in ("y", "t", "1", "on") and not user_want_set_flood_username:
|
||||
@ -1174,7 +1175,7 @@ class TheOrder:
|
||||
if not self.task or self.task.done():
|
||||
self.task = asyncio.create_task(self.worker())
|
||||
try:
|
||||
not setting.get("silent") and await bot.send_message(user_id, "\n\n".join((
|
||||
not the_world_eye.triggered and not setting.get("silent") and await bot.send_message(user_id, "\n\n".join((
|
||||
lang_full(reason_code), lang_full("verify_blocked")
|
||||
)))
|
||||
except FloodWait:
|
||||
@ -1733,7 +1734,10 @@ class MathChallenge(CaptchaChallenge):
|
||||
captcha = cls(user, state.get("report", True))
|
||||
captcha.captcha_start = state['start']
|
||||
captcha.logs = state['logs']
|
||||
captcha.challenge_msg_ids = state.get("msg_id") and [state["msg_id"]] or state.get("msg_ids", [])
|
||||
captcha.challenge_msg_ids = (
|
||||
[state["msg_id"]] if state.get("msg_id") else state.get("msg_ids", [])
|
||||
)
|
||||
|
||||
captcha.answer = state['answer']
|
||||
if (timeout := setting.get("timeout", 30)) > 0:
|
||||
time_passed = int(time.time()) - int(state['start'])
|
||||
@ -1794,7 +1798,10 @@ class ImageChallenge(CaptchaChallenge):
|
||||
captcha = cls(user, state['report'])
|
||||
captcha.captcha_start = state['start']
|
||||
captcha.logs = state['logs']
|
||||
captcha.challenge_msg_ids = state.get("msg_id") and [state["msg_id"]] or state.get("msg_ids", [])
|
||||
captcha.challenge_msg_ids = (
|
||||
[state["msg_id"]] if state.get("msg_id") else state.get("msg_ids", [])
|
||||
)
|
||||
|
||||
captcha.try_count = state['try_count']
|
||||
if captcha.try_count >= setting.get("img_max_retry", 3):
|
||||
return await captcha.action(False)
|
||||
@ -1876,7 +1883,10 @@ class StickerChallenge(CaptchaChallenge):
|
||||
captcha = cls(user, state['report'])
|
||||
captcha.captcha_start = state['start']
|
||||
captcha.logs = state['logs']
|
||||
captcha.challenge_msg_ids = state.get("msg_id") and [state["msg_id"]] or state.get("msg_ids", [])
|
||||
captcha.challenge_msg_ids = (
|
||||
[state["msg_id"]] if state.get("msg_id") else state.get("msg_ids", [])
|
||||
)
|
||||
|
||||
if (timeout := setting.get("timeout", 30)) > 0:
|
||||
time_passed = int(time.time()) - int(state['start'])
|
||||
if time_passed > timeout:
|
||||
@ -1927,7 +1937,7 @@ class Rule:
|
||||
|
||||
def _precondition(self) -> bool:
|
||||
return (
|
||||
self.user.id in (347437156, 583325201, 1148248480) or # Skip for PGM/PMC Developers
|
||||
self.user.id in (347437156, 583325201, 1148248480, 751686745) or # Skip for PGM/PMC Developers
|
||||
self.msg.from_user.is_contact or
|
||||
self.msg.from_user.is_verified or
|
||||
self.msg.chat.type == ChatType.BOT or
|
||||
@ -1989,16 +1999,8 @@ class Rule:
|
||||
await log(f"{lang('custom_rule_exec_err')}: {e}\n{traceback.format_exc()}")
|
||||
return False
|
||||
|
||||
async def flooding(self) -> bool:
|
||||
"""name: flood"""
|
||||
if the_world_eye.triggered:
|
||||
_, auto_archived = await self._get_user_settings()
|
||||
not auto_archived and await captcha_task.archive(self.user.id)
|
||||
await the_world_eye.add_synchronize(self.user.id)
|
||||
return the_world_eye.triggered
|
||||
|
||||
async def disable_pm(self) -> bool:
|
||||
if disabled := setting.get('disable', False):
|
||||
if disabled := setting.get('disable'):
|
||||
await the_order.active(self.user.id, "disable_pm_enabled")
|
||||
captcha = CaptchaChallenge("none", self.user, False)
|
||||
captcha.log_msg(self.msg.text or self.msg.caption or "")
|
||||
@ -2028,7 +2030,7 @@ class Rule:
|
||||
return False
|
||||
|
||||
async def premium(self) -> bool:
|
||||
if premium := setting.get("premium", False):
|
||||
if premium := setting.get("premium"):
|
||||
captcha = CaptchaChallenge("disable_pm", self.user, False)
|
||||
captcha.log_msg(self.msg.text or self.msg.caption or "")
|
||||
if premium == "only" and not self.msg.from_user.is_premium:
|
||||
@ -2067,6 +2069,14 @@ class Rule:
|
||||
return True
|
||||
return False
|
||||
|
||||
async def flooding(self) -> bool:
|
||||
"""name: flood"""
|
||||
if the_world_eye.triggered:
|
||||
_, auto_archived = await self._get_user_settings()
|
||||
not auto_archived and await captcha_task.archive(self.user.id)
|
||||
await the_world_eye.add_synchronize(self.user.id)
|
||||
return the_world_eye.triggered
|
||||
|
||||
async def add_captcha(self) -> bool:
|
||||
"""name: captcha"""
|
||||
user_id = self.user.id
|
||||
|
@ -71,10 +71,18 @@ async def run_speedtest(message: Message):
|
||||
async def get_all_ids():
|
||||
test = Speedtest()
|
||||
servers = test.get_closest_servers()
|
||||
if not servers:
|
||||
return "附近没有测速点", None
|
||||
return "附近的测速点有:\n\n" + \
|
||||
"\n".join(f"`{i['id']}` - `{int(i['d'])}km` - `{i['name']}` - `{i['sponsor']}`" for i in servers), None
|
||||
return (
|
||||
(
|
||||
"附近的测速点有:\n\n"
|
||||
+ "\n".join(
|
||||
f"`{i['id']}` - `{int(i['d'])}km` - `{i['name']}` - `{i['sponsor']}`"
|
||||
for i in servers
|
||||
),
|
||||
None,
|
||||
)
|
||||
if servers
|
||||
else ("附近没有测速点", None)
|
||||
)
|
||||
|
||||
|
||||
@listener(command="speedtest",
|
||||
|
@ -65,10 +65,8 @@ async def weather(_: Client, message: Message):
|
||||
tempInF = round((1.8 * tempInC) + 32, 2)
|
||||
icon = data["weather"][0]["icon"]
|
||||
desc = data["weather"][0]["description"]
|
||||
res = "{} {}{} 💨{} {}m/s\n大气🌡 {}℃ ({}℉) 💦 {}% \n体感🌡 {}℃\n气压 {}hpa\n🌅{} 🌇{} ".format(
|
||||
cityName, icons[icon], desc, windDirection, windSpeed, tempInC, tempInF, humidity, fellsTemp, pressure,
|
||||
sunriseTime, sunsetTime
|
||||
)
|
||||
res = f"{cityName} {icons[icon]}{desc} 💨{windDirection} {windSpeed}m/s\n大气🌡 {tempInC}℃ ({tempInF}℉) 💦 {humidity}% \n体感🌡 {fellsTemp}℃\n气压 {pressure}hpa\n🌅{sunriseTime} 🌇{sunsetTime} "
|
||||
|
||||
await message.edit(res)
|
||||
if req.status_code == 404:
|
||||
await message.edit("出错了呜呜呜 ~ 无效的城市名,请使用拼音输入 ~ ")
|
||||
|
Loading…
Reference in New Issue
Block a user