mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-25 09:37:30 +00:00
✨ Use ChatMemberHandler
to Get Chat Member Updates
使用 ChatMemberHandler 获取 chat member updates 解决在部分群开启了隐藏成员列表后Bot无法工作的问题
This commit is contained in:
parent
7aac944f1e
commit
471ed052ea
@ -10,6 +10,7 @@ from typing import Any, Callable, ClassVar, Dict, Iterator, List, NoReturn, Opti
|
||||
import genshin
|
||||
import pytz
|
||||
from async_timeout import timeout
|
||||
from telegram import Update
|
||||
from telegram import __version__ as tg_version
|
||||
from telegram.error import NetworkError, TimedOut
|
||||
from telegram.ext import (
|
||||
@ -34,8 +35,6 @@ from metadata.scripts.metadatas import make_github_fast
|
||||
from utils.const import PLUGIN_DIR, PROJECT_ROOT
|
||||
from utils.log import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from telegram import Update
|
||||
|
||||
__all__ = ["bot"]
|
||||
|
||||
@ -276,6 +275,7 @@ class Bot:
|
||||
write_timeout=self.config.write_timeout,
|
||||
connect_timeout=self.config.connect_timeout,
|
||||
pool_timeout=self.config.pool_timeout,
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
)
|
||||
break
|
||||
except TimedOut:
|
||||
|
@ -198,8 +198,8 @@ class _ChatJoinRequest(_Handler):
|
||||
|
||||
|
||||
class _ChatMember(_Handler):
|
||||
def __init__(self, chat_member_types: int = -1):
|
||||
super().__init__(chat_member_types=chat_member_types)
|
||||
def __init__(self, chat_member_types: int = -1, block: DVInput[bool] = DEFAULT_TRUE):
|
||||
super().__init__(chat_member_types=chat_member_types, block=block)
|
||||
|
||||
|
||||
class _ChosenInlineResult(_Handler):
|
||||
|
@ -3,16 +3,18 @@ import random
|
||||
import time
|
||||
from typing import Tuple, Union, Dict, List, Optional
|
||||
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, ChatMember
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ChatPermissions, ChatMember, Message, User
|
||||
from telegram.constants import ParseMode
|
||||
from telegram.error import BadRequest
|
||||
from telegram.ext import CallbackContext, CallbackQueryHandler
|
||||
from telegram.error import BadRequest, RetryAfter
|
||||
from telegram.ext import CallbackContext, CallbackQueryHandler, ChatMemberHandler
|
||||
from telegram.helpers import escape_markdown
|
||||
|
||||
from core.base.mtproto import MTProto
|
||||
from core.base.redisdb import RedisDB
|
||||
from core.bot import bot
|
||||
from core.plugin import Plugin, handler
|
||||
from core.quiz import QuizService
|
||||
from utils.chatmember import extract_status_change
|
||||
from utils.decorators.error import error_callable
|
||||
from utils.decorators.restricts import restricts
|
||||
from utils.log import logger
|
||||
@ -22,8 +24,16 @@ try:
|
||||
|
||||
PYROGRAM_AVAILABLE = True
|
||||
except ImportError:
|
||||
MTPBadRequest = ValueError
|
||||
MTPFloodWait = IndexError
|
||||
PYROGRAM_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import ujson as jsonlib
|
||||
|
||||
except ImportError:
|
||||
import json as jsonlib
|
||||
|
||||
FullChatPermissions = ChatPermissions(
|
||||
can_send_messages=True,
|
||||
can_send_media_messages=True,
|
||||
@ -39,7 +49,7 @@ FullChatPermissions = ChatPermissions(
|
||||
class GroupJoiningVerification(Plugin):
|
||||
"""群验证模块"""
|
||||
|
||||
def __init__(self, quiz_service: QuizService = None, mtp: MTProto = None):
|
||||
def __init__(self, quiz_service: QuizService = None, mtp: MTProto = None, redis: RedisDB = None):
|
||||
self.quiz_service = quiz_service
|
||||
self.time_out = 120
|
||||
self.kick_time = 120
|
||||
@ -47,6 +57,7 @@ class GroupJoiningVerification(Plugin):
|
||||
self.chat_administrators_cache: Dict[Union[str, int], Tuple[float, List[ChatMember]]] = {}
|
||||
self.is_refresh_quiz = False
|
||||
self.mtp = mtp.client
|
||||
self.redis = redis.client
|
||||
|
||||
async def __async_init__(self):
|
||||
logger.info("群验证模块正在刷新问题列表")
|
||||
@ -109,6 +120,18 @@ class GroupJoiningVerification(Plugin):
|
||||
logger.error(f"Auth模块在 chat_id[{chat_id}] user_id[{user_id}] 执行restore失败")
|
||||
logger.exception(exc)
|
||||
|
||||
async def get_new_chat_members_message(self, user: User, context: CallbackContext) -> Optional[Message]:
|
||||
qname = f"plugin:auth:new_chat_members_message:{user.id}"
|
||||
result = await self.redis.get(qname)
|
||||
if result:
|
||||
data = jsonlib.loads(str(result, encoding="utf-8"))
|
||||
return Message.de_json(data, context.bot)
|
||||
return None
|
||||
|
||||
async def set_new_chat_members_message(self, user: User, message: Message):
|
||||
qname = f"plugin:auth:new_chat_members_message:{user.id}"
|
||||
await self.redis.set(qname, message.to_json(), ex=60)
|
||||
|
||||
@handler(CallbackQueryHandler, pattern=r"^auth_admin\|", block=False)
|
||||
@error_callable
|
||||
@restricts(without_overlapping=True)
|
||||
@ -236,7 +259,7 @@ class GroupJoiningVerification(Plugin):
|
||||
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user.id}|auth_kick"):
|
||||
schedule.remove()
|
||||
|
||||
@handler.message.new_chat_members(priority=2)
|
||||
@handler.message.new_chat_members(priority=1)
|
||||
@error_callable
|
||||
async def new_mem(self, update: Update, context: CallbackContext) -> None:
|
||||
message = update.effective_message
|
||||
@ -252,35 +275,56 @@ class GroupJoiningVerification(Plugin):
|
||||
for user in message.new_chat_members:
|
||||
if user.id == context.bot.id:
|
||||
return
|
||||
logger.info(f"用户 {user.full_name}[{user.id}] 尝试加入群 {chat.title}[{chat.id}]")
|
||||
not_enough_rights = context.chat_data.get("not_enough_rights", False)
|
||||
if not_enough_rights:
|
||||
logger.debug("用户 %s[%s] 加入群 %s[%s]", user.full_name, user.id, chat.title, chat.id)
|
||||
await self.set_new_chat_members_message(user, message)
|
||||
|
||||
@handler.chat_member(chat_member_types=ChatMemberHandler.CHAT_MEMBER, block=False)
|
||||
@error_callable
|
||||
async def track_users(self, update: Update, context: CallbackContext) -> None:
|
||||
chat = update.effective_chat
|
||||
if len(bot.config.verify_groups) >= 1:
|
||||
for verify_group in bot.config.verify_groups:
|
||||
if verify_group == chat.id:
|
||||
break
|
||||
else:
|
||||
return
|
||||
else:
|
||||
return
|
||||
chat_administrators = await self.get_chat_administrators(context, chat_id=chat.id)
|
||||
if self.is_admin(chat_administrators, message.from_user.id):
|
||||
await message.reply_text("派蒙检测到管理员邀请,自动放行了!")
|
||||
new_chat_member = update.chat_member.new_chat_member
|
||||
from_user = update.chat_member.from_user
|
||||
user = new_chat_member.user
|
||||
result = extract_status_change(update.chat_member)
|
||||
if result is None:
|
||||
return
|
||||
for user in message.new_chat_members:
|
||||
was_member, is_member = result
|
||||
if was_member and not is_member:
|
||||
logger.info("用户 %s[%s] 退出群聊 %s[%s]", user.full_name, user.id, chat.title, chat.id)
|
||||
return
|
||||
elif not was_member and is_member:
|
||||
logger.info("用户 %s[%s] 尝试加入群 %s[%s]", user.full_name, user.id, chat.title, chat.id)
|
||||
if user.is_bot:
|
||||
continue
|
||||
return
|
||||
chat_administrators = await self.get_chat_administrators(context, chat_id=chat.id)
|
||||
if self.is_admin(chat_administrators, from_user.id):
|
||||
await chat.send_message("派蒙检测到管理员邀请,自动放行了!")
|
||||
return
|
||||
question_id_list = await self.quiz_service.get_question_id_list()
|
||||
if len(question_id_list) == 0:
|
||||
await message.reply_text("旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
|
||||
await chat.send_message("旅行者!!!派蒙的问题清单你还没给我!!快去私聊我给我问题!")
|
||||
return
|
||||
try:
|
||||
await context.bot.restrict_chat_member(
|
||||
chat_id=message.chat.id, user_id=user.id, permissions=ChatPermissions(can_send_messages=False)
|
||||
)
|
||||
except BadRequest as err:
|
||||
if "Not enough rights" in str(err):
|
||||
logger.warning(f"权限不够 chat_id[{message.chat_id}]")
|
||||
# reply_message = await message.reply_markdown_v2(f"派蒙无法修改 {user.mention_markdown_v2()} 的权限!"
|
||||
# f"请检查是否给派蒙授权管理了")
|
||||
context.chat_data["not_enough_rights"] = True
|
||||
# await context.bot.delete_message(chat.id, reply_message.message_id)
|
||||
await chat.restrict_member(user_id=user.id, permissions=ChatPermissions(can_send_messages=False))
|
||||
except BadRequest as exc:
|
||||
if "Not enough rights" in exc.message:
|
||||
logger.warning("%s[%s] 权限不够", chat.title, chat.id)
|
||||
await chat.send_message(
|
||||
f"派蒙无法修改 {user.mention_html()} 的权限!请检查是否给派蒙授权管理了",
|
||||
parse_mode=ParseMode.HTML,
|
||||
)
|
||||
return
|
||||
else:
|
||||
raise err
|
||||
raise exc
|
||||
new_chat_members_message = await self.get_new_chat_members_message(user, context)
|
||||
question_id = random.choice(question_id_list) # nosec
|
||||
question = await self.quiz_service.get_question(question_id)
|
||||
buttons = [
|
||||
@ -305,20 +349,40 @@ class GroupJoiningVerification(Plugin):
|
||||
),
|
||||
]
|
||||
)
|
||||
reply_message = (
|
||||
f"*欢迎来到「提瓦特」世界!* \n" f"问题: {escape_markdown(question.text, version=2)} \n" f"请在 {self.time_out}S 内回答问题"
|
||||
)
|
||||
if new_chat_members_message:
|
||||
reply_message = (
|
||||
f"*欢迎 {user.mention_markdown_v2()} 来到「提瓦特」世界!* \n"
|
||||
f"问题: {escape_markdown(question.text, version=2)} \n"
|
||||
f"请在*{self.time_out}*秒内回答问题"
|
||||
)
|
||||
else:
|
||||
reply_message = (
|
||||
f"*欢迎来到「提瓦特」世界!* \n"
|
||||
f"问题: {escape_markdown(question.text, version=2)} \n"
|
||||
f"请在*{self.time_out}*秒内回答问题"
|
||||
)
|
||||
logger.debug(
|
||||
f"发送入群验证问题 question_id[{question.question_id}] question[{question.text}] \n"
|
||||
f"给{user.full_name}[{user.id}] 在 {chat.title}[{chat.id}]"
|
||||
"发送入群验证问题 %s[%s] \n给%s[%s] 在 %s[%s]",
|
||||
question.text,
|
||||
question.question_id,
|
||||
user.full_name,
|
||||
user.id,
|
||||
chat.title,
|
||||
chat.id,
|
||||
)
|
||||
try:
|
||||
question_message = await message.reply_markdown_v2(
|
||||
reply_message, reply_markup=InlineKeyboardMarkup(buttons)
|
||||
)
|
||||
question_message.forward_from
|
||||
if new_chat_members_message:
|
||||
question_message = await new_chat_members_message.reply_markdown_v2(
|
||||
reply_message, reply_markup=InlineKeyboardMarkup(buttons), allow_sending_without_reply=True
|
||||
)
|
||||
else:
|
||||
question_message = await chat.send_message(
|
||||
reply_message,
|
||||
reply_markup=InlineKeyboardMarkup(buttons),
|
||||
parse_mode=ParseMode.MARKDOWN_V2,
|
||||
)
|
||||
except BadRequest as exc:
|
||||
await message.reply_text("派蒙分心了一下,不小心忘记你了,你只能先退出群再重新进来吧。")
|
||||
await chat.send_message("派蒙分心了一下,不小心忘记你了,你只能先退出群再重新进来吧。")
|
||||
raise exc
|
||||
context.job_queue.run_once(
|
||||
callback=self.kick_member_job,
|
||||
@ -328,15 +392,16 @@ class GroupJoiningVerification(Plugin):
|
||||
user_id=user.id,
|
||||
job_kwargs={"replace_existing": True, "id": f"{chat.id}|{user.id}|auth_kick"},
|
||||
)
|
||||
context.job_queue.run_once(
|
||||
callback=self.clean_message_job,
|
||||
when=self.time_out,
|
||||
data=message.message_id,
|
||||
name=f"{chat.id}|{user.id}|auth_clean_join_message",
|
||||
chat_id=chat.id,
|
||||
user_id=user.id,
|
||||
job_kwargs={"replace_existing": True, "id": f"{chat.id}|{user.id}|auth_clean_join_message"},
|
||||
)
|
||||
if new_chat_members_message:
|
||||
context.job_queue.run_once(
|
||||
callback=self.clean_message_job,
|
||||
when=self.time_out,
|
||||
data=new_chat_members_message.message_id,
|
||||
name=f"{chat.id}|{user.id}|auth_clean_join_message",
|
||||
chat_id=chat.id,
|
||||
user_id=user.id,
|
||||
job_kwargs={"replace_existing": True, "id": f"{chat.id}|{user.id}|auth_clean_join_message"},
|
||||
)
|
||||
context.job_queue.run_once(
|
||||
callback=self.clean_message_job,
|
||||
when=self.time_out,
|
||||
@ -346,11 +411,16 @@ class GroupJoiningVerification(Plugin):
|
||||
user_id=user.id,
|
||||
job_kwargs={"replace_existing": True, "id": f"{chat.id}|{user.id}|auth_clean_question_message"},
|
||||
)
|
||||
if PYROGRAM_AVAILABLE and self.mtp and (question_message.id - message.id - 1):
|
||||
if PYROGRAM_AVAILABLE and self.mtp:
|
||||
try:
|
||||
messages_list = await self.mtp.get_messages(
|
||||
chat.id, message_ids=list(range(message.id + 1, question_message.id))
|
||||
)
|
||||
if new_chat_members_message:
|
||||
if question_message.id - new_chat_members_message.id - 1:
|
||||
message_ids = list(range(new_chat_members_message.id + 1, question_message.id))
|
||||
else:
|
||||
return
|
||||
else:
|
||||
message_ids = [question_message.id - 3, question_message.id]
|
||||
messages_list = await self.mtp.get_messages(chat.id, message_ids=message_ids)
|
||||
for find_message in messages_list:
|
||||
if find_message.empty:
|
||||
continue
|
||||
@ -369,9 +439,11 @@ class GroupJoiningVerification(Plugin):
|
||||
await question_message.edit_text(text, reply_markup=InlineKeyboardMarkup(button))
|
||||
if schedule := context.job_queue.scheduler.get_job(f"{chat.id}|{user.id}|auth_kick"):
|
||||
schedule.remove()
|
||||
logger.info(f"用户 {user.full_name}[{user.id}] 在群 {chat.title}[{chat.id}] 验证缝隙间发送消息" "现已删除")
|
||||
logger.info(
|
||||
"用户 %s[%s] 在群 %s[%s] 验证缝隙间发送消息 现已删除", user.full_name, user.id, chat.title, chat.id
|
||||
)
|
||||
except BadRequest as exc:
|
||||
logger.error(f"后验证处理中发生错误 {repr(exc)}")
|
||||
logger.error("后验证处理中发生错误 %s", exc.message)
|
||||
logger.exception(exc)
|
||||
except MTPFloodWait:
|
||||
logger.warning("调用 mtp 触发洪水限制")
|
||||
|
82
plugins/system/chat_member.py
Normal file
82
plugins/system/chat_member.py
Normal file
@ -0,0 +1,82 @@
|
||||
from telegram import Update, Chat, User
|
||||
from telegram.ext import CallbackContext, ChatMemberHandler
|
||||
|
||||
from core.admin.services import BotAdminService
|
||||
from core.config import config, JoinGroups
|
||||
from core.cookies.error import CookiesNotFoundError
|
||||
from core.cookies.services import CookiesService
|
||||
from core.plugin import Plugin, handler
|
||||
from core.user.error import UserNotFoundError
|
||||
from core.user.services import UserService
|
||||
from utils.chatmember import extract_status_change
|
||||
from utils.decorators.error import error_callable
|
||||
from utils.log import logger
|
||||
|
||||
|
||||
class ChatMember(Plugin):
|
||||
def __init__(
|
||||
self,
|
||||
bot_admin_service: BotAdminService = None,
|
||||
user_service: UserService = None,
|
||||
cookies_service: CookiesService = None,
|
||||
):
|
||||
self.cookies_service = cookies_service
|
||||
self.user_service = user_service
|
||||
self.bot_admin_service = bot_admin_service
|
||||
|
||||
@handler.chat_member(chat_member_types=ChatMemberHandler.MY_CHAT_MEMBER, block=False)
|
||||
@error_callable
|
||||
async def track_chats(self, update: Update, context: CallbackContext) -> None:
|
||||
result = extract_status_change(update.my_chat_member)
|
||||
if result is None:
|
||||
return
|
||||
was_member, is_member = result
|
||||
user = update.effective_user
|
||||
chat = update.effective_chat
|
||||
if chat.type == Chat.PRIVATE:
|
||||
if not was_member and is_member:
|
||||
logger.info("用户 %s[%s] 启用了机器人", user.full_name, user.id)
|
||||
elif was_member and not is_member:
|
||||
logger.info("用户 %s[%s] 屏蔽了机器人", user.full_name, user.id)
|
||||
elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
|
||||
if not was_member and is_member:
|
||||
logger.info("用户 %s[%s] 邀请BOT进入群 %s[%s]", user.full_name, user.id, chat.title, chat.id)
|
||||
await self.greet(user, chat, context)
|
||||
elif was_member and not is_member:
|
||||
logger.info("用户 %s[%s] 从 %s[%s] 群移除Bot", user.full_name, user.id, chat.title, chat.id)
|
||||
else:
|
||||
if not was_member and is_member:
|
||||
logger.info("用户 %s[%s] 邀请BOT进入频道 %s[%s]", user.full_name, user.id, chat.title, chat.id)
|
||||
elif was_member and not is_member:
|
||||
logger.info("用户 %s[%s] 从 %s[%s] 频道移除Bot", user.full_name, user.id, chat.title, chat.id)
|
||||
|
||||
async def greet(self, user: User, chat: Chat, context: CallbackContext) -> None:
|
||||
quit_status = True
|
||||
if config.join_groups == JoinGroups.NO_ALLOW:
|
||||
try:
|
||||
admin_list = await self.bot_admin_service.get_admin_list()
|
||||
if user.id in admin_list:
|
||||
quit_status = False
|
||||
else:
|
||||
logger.warning("不是管理员邀请!退出群聊")
|
||||
except Exception as exc: # pylint: disable=W0703
|
||||
logger.error("获取信息出现错误", exc_info=exc)
|
||||
elif config.join_groups == JoinGroups.ALLOW_AUTH_USER:
|
||||
try:
|
||||
user_info = await self.user_service.get_user_by_id(user.id)
|
||||
await self.cookies_service.get_cookies(user.id, user_info.region)
|
||||
except (UserNotFoundError, CookiesNotFoundError):
|
||||
logger.warning("用户 %s[%s] 邀请请求被拒绝", user.full_name, user.id)
|
||||
except Exception as exc:
|
||||
logger.error("获取信息出现错误", exc_info=exc)
|
||||
else:
|
||||
quit_status = False
|
||||
elif config.join_groups == JoinGroups.ALLOW_ALL:
|
||||
quit_status = False
|
||||
else:
|
||||
quit_status = True
|
||||
if quit_status:
|
||||
await context.bot.send_message(chat.id, "派蒙不想进去!不是旅行者的邀请!")
|
||||
await context.bot.leave_chat(chat.id)
|
||||
else:
|
||||
await context.bot.send_message(chat.id, "感谢邀请小派蒙到本群!请使用 /help 查看咱已经学会的功能。")
|
@ -1,65 +0,0 @@
|
||||
from telegram import Update
|
||||
from telegram.ext import CallbackContext
|
||||
|
||||
from core.admin.services import BotAdminService
|
||||
from core.config import config, JoinGroups
|
||||
from core.cookies.error import CookiesNotFoundError
|
||||
from core.cookies.services import CookiesService
|
||||
from core.plugin import Plugin, handler
|
||||
from core.user.error import UserNotFoundError
|
||||
from core.user.services import UserService
|
||||
from utils.log import logger
|
||||
|
||||
|
||||
class BotJoiningGroupsVerification(Plugin):
|
||||
def __init__(
|
||||
self,
|
||||
bot_admin_service: BotAdminService = None,
|
||||
user_service: UserService = None,
|
||||
cookies_service: CookiesService = None,
|
||||
):
|
||||
self.cookies_service = cookies_service
|
||||
self.user_service = user_service
|
||||
self.bot_admin_service = bot_admin_service
|
||||
|
||||
@handler.message.new_chat_members(priority=1)
|
||||
async def new_member(self, update: Update, context: CallbackContext) -> None:
|
||||
if config.join_groups == JoinGroups.ALLOW_ALL:
|
||||
return None
|
||||
message = update.effective_message
|
||||
chat = message.chat
|
||||
from_user = message.from_user
|
||||
for new_chat_members_user in message.new_chat_members:
|
||||
if new_chat_members_user.id == context.bot.id:
|
||||
logger.info("有人邀请BOT进入群 %s[%s]", chat.title, chat.id)
|
||||
quit_status = True
|
||||
if from_user is not None:
|
||||
logger.info(f"用户 {from_user.full_name}[{from_user.id}] 在群 {chat.title}[{chat.id}] 邀请BOT")
|
||||
if config.join_groups == JoinGroups.NO_ALLOW:
|
||||
try:
|
||||
admin_list = await self.bot_admin_service.get_admin_list()
|
||||
if from_user.id in admin_list:
|
||||
quit_status = False
|
||||
else:
|
||||
logger.warning("不是管理员邀请!退出群聊")
|
||||
except Exception as exc: # pylint: disable=W0703
|
||||
logger.error("获取信息出现错误", exc_info=exc)
|
||||
elif config.join_groups == JoinGroups.ALLOW_AUTH_USER:
|
||||
try:
|
||||
user_info = await self.user_service.get_user_by_id(from_user.id)
|
||||
await self.cookies_service.get_cookies(from_user.id, user_info.region)
|
||||
except (UserNotFoundError, CookiesNotFoundError):
|
||||
logger.warning("用户 %s[%s] 邀请请求被拒绝", from_user.full_name, from_user.id)
|
||||
except Exception as exc:
|
||||
logger.error("获取信息出现错误", exc_info=exc)
|
||||
else:
|
||||
quit_status = False
|
||||
else:
|
||||
quit_status = True
|
||||
else:
|
||||
logger.info("未知用户 在群 %s[%s] 邀请BOT", chat.title, chat.id)
|
||||
if quit_status:
|
||||
await context.bot.send_message(message.chat_id, "派蒙不想进去!不是旅行者的邀请!")
|
||||
await context.bot.leave_chat(chat.id)
|
||||
else:
|
||||
await context.bot.send_message(message.chat_id, "感谢邀请小派蒙到本群!请使用 /help 查看咱已经学会的功能。")
|
29
utils/chatmember.py
Normal file
29
utils/chatmember.py
Normal file
@ -0,0 +1,29 @@
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from telegram import ChatMemberUpdated, ChatMember
|
||||
|
||||
|
||||
def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
|
||||
"""Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
|
||||
of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
|
||||
the status didn't change.
|
||||
"""
|
||||
status_change = chat_member_update.difference().get("status")
|
||||
old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
|
||||
|
||||
if status_change is None:
|
||||
return None
|
||||
|
||||
old_status, new_status = status_change
|
||||
was_member = old_status in [
|
||||
ChatMember.MEMBER,
|
||||
ChatMember.OWNER,
|
||||
ChatMember.ADMINISTRATOR,
|
||||
] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
|
||||
is_member = new_status in [
|
||||
ChatMember.MEMBER,
|
||||
ChatMember.OWNER,
|
||||
ChatMember.ADMINISTRATOR,
|
||||
] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
|
||||
|
||||
return was_member, is_member
|
Loading…
Reference in New Issue
Block a user