Refactor parsing message

This commit is contained in:
KurimuzonAkuma 2024-09-04 10:47:24 +03:00
parent 909656b806
commit 86e26cc496
5 changed files with 158 additions and 171 deletions

View File

@ -69,6 +69,8 @@ class Dispatcher:
self.groups = OrderedDict() self.groups = OrderedDict()
async def message_parser(update, users, chats): async def message_parser(update, users, chats):
connection_id = getattr(update, "connection_id", None)
return ( return (
await pyrogram.types.Message._parse( await pyrogram.types.Message._parse(
self.client, self.client,
@ -76,8 +78,9 @@ class Dispatcher:
users, users,
chats, chats,
is_scheduled=isinstance(update, UpdateNewScheduledMessage), is_scheduled=isinstance(update, UpdateNewScheduledMessage),
business_connection_id=getattr(update, "connection_id", None), replies=0 if getattr(update, "connection_id", None) else 1,
reply_to_message=getattr(update, "reply_to_message", None) business_connection_id=connection_id,
raw_reply_to_message=getattr(update, "reply_to_message", None)
), ),
MessageHandler MessageHandler
) )

View File

@ -91,10 +91,7 @@ class GetDialogs:
chat_id = utils.get_peer_id(message.peer_id) chat_id = utils.get_peer_id(message.peer_id)
try: messages[chat_id] = await types.Message._parse(self, message, users, chats)
messages[chat_id] = await types.Message._parse(self, message, users, chats)
except KeyError:
pass
dialogs = [] dialogs = []
@ -102,10 +99,7 @@ class GetDialogs:
if not isinstance(dialog, raw.types.Dialog): if not isinstance(dialog, raw.types.Dialog):
continue continue
try: dialogs.append(types.Dialog._parse(self, dialog, messages, users, chats))
dialogs.append(types.Dialog._parse(self, dialog, messages, users, chats))
except KeyError:
pass
if not dialogs: if not dialogs:
return return

View File

@ -117,6 +117,9 @@ class ForumTopic(Object):
@staticmethod @staticmethod
def _parse(client: "pyrogram.Client", forum_topic: "raw.types.ForumTopic", messages: dict = {}, users: dict = {}, chats: dict = {}) -> "ForumTopic": def _parse(client: "pyrogram.Client", forum_topic: "raw.types.ForumTopic", messages: dict = {}, users: dict = {}, chats: dict = {}) -> "ForumTopic":
if not forum_topic:
return None
if isinstance(forum_topic, raw.types.ForumTopicDeleted): if isinstance(forum_topic, raw.types.ForumTopicDeleted):
return ForumTopic(id=forum_topic.id, is_deleted=True) return ForumTopic(id=forum_topic.id, is_deleted=True)

View File

@ -628,14 +628,14 @@ class Message(Object, Update):
@staticmethod @staticmethod
async def _parse( async def _parse(
client: "pyrogram.Client", client: "pyrogram.Client",
message: raw.base.Message, message: "raw.base.Message",
users: dict, users: dict,
chats: dict, chats: dict,
topics: dict = None, topics: dict = None,
is_scheduled: bool = False, is_scheduled: bool = False,
replies: int = 1, replies: int = 1,
business_connection_id: str = None, business_connection_id: str = None,
reply_to_message: "raw.base.Message" = None raw_reply_to_message: "raw.base.Message" = None
): ):
if isinstance(message, raw.types.MessageEmpty): if isinstance(message, raw.types.MessageEmpty):
return Message( return Message(
@ -846,10 +846,9 @@ class Message(Object, Update):
) )
parsed_message.service = enums.MessageServiceType.PINNED_MESSAGE parsed_message.service = enums.MessageServiceType.PINNED_MESSAGE
except MessageIdsEmpty: except (MessageIdsEmpty, ChannelPrivate):
pass pass
elif isinstance(action, raw.types.MessageActionGameScore):
if isinstance(action, raw.types.MessageActionGameScore):
parsed_message.game_high_score = types.GameHighScore._parse_action(client, message, users) parsed_message.game_high_score = types.GameHighScore._parse_action(client, message, users)
if message.reply_to and replies: if message.reply_to and replies:
@ -861,7 +860,7 @@ class Message(Object, Update):
) )
parsed_message.service = enums.MessageServiceType.GAME_HIGH_SCORE parsed_message.service = enums.MessageServiceType.GAME_HIGH_SCORE
except MessageIdsEmpty: except (MessageIdsEmpty, ChannelPrivate):
pass pass
client.message_cache[(parsed_message.chat.id, parsed_message.id)] = parsed_message client.message_cache[(parsed_message.chat.id, parsed_message.id)] = parsed_message
@ -870,7 +869,7 @@ class Message(Object, Update):
if message.reply_to.reply_to_top_id: if message.reply_to.reply_to_top_id:
parsed_message.message_thread_id = message.reply_to.reply_to_top_id parsed_message.message_thread_id = message.reply_to.reply_to_top_id
else: else:
parsed_message.message_thread_id = message.reply_to.reply_to_msg_id parsed_message.message_thread_id = message.reply_to.reply_to_msg_id or 1
return parsed_message return parsed_message
@ -958,11 +957,14 @@ class Message(Object, Update):
elif isinstance(media, raw.types.MessageMediaStory): elif isinstance(media, raw.types.MessageMediaStory):
if media.story: if media.story:
story = await types.Story._parse(client, media.story, users, chats, media.peer) story = await types.Story._parse(client, media.story, users, chats, media.peer)
else: elif client.me and not client.me.is_bot:
try: try:
story = await client.get_stories(utils.get_peer_id(media.peer), media.id) story = await client.get_stories(utils.get_peer_id(media.peer), media.id)
except (BotMethodInvalid, ChannelPrivate): except ChannelPrivate:
story = await types.Story._parse(client, media, users, chats, media.peer) pass
if not story:
story = await types.Story._parse(client, media, users, chats, media.peer)
media_type = enums.MessageMediaType.STORY media_type = enums.MessageMediaType.STORY
elif isinstance(media, raw.types.MessageMediaDocument): elif isinstance(media, raw.types.MessageMediaDocument):
@ -1132,53 +1134,56 @@ class Message(Object, Update):
parsed_message.quote = True parsed_message.quote = True
if message.reply_to: if message.reply_to:
parsed_message.reply_to_message_id = message.reply_to.reply_to_msg_id
parsed_message.reply_to_top_message_id = message.reply_to.reply_to_top_id
if isinstance(message.reply_to, raw.types.MessageReplyHeader): if isinstance(message.reply_to, raw.types.MessageReplyHeader):
if message.reply_to.forum_topic: if message.reply_to.forum_topic:
if message.reply_to.reply_to_top_id: if message.reply_to.reply_to_top_id:
thread_id = message.reply_to.reply_to_top_id parsed_message.message_thread_id = message.reply_to.reply_to_top_id
parsed_message.reply_to_message_id = message.reply_to.reply_to_msg_id
else: else:
thread_id = message.reply_to.reply_to_msg_id parsed_message.message_thread_id = message.reply_to.reply_to_msg_id or 1
parsed_message.message_thread_id = thread_id
if topics: if topics:
parsed_message.topic = types.ForumTopic._parse(client, topics[thread_id], users=users, chats=chats) parsed_message.topic = types.ForumTopic._parse(client, topics.get(parsed_message.message_thread_id), users=users, chats=chats)
else: elif message.reply_to.quote:
if message.reply_to.quote: quote_entities = [types.MessageEntity._parse(client, entity, users) for entity in message.reply_to.quote_entities]
quote_entities = [types.MessageEntity._parse(client, entity, users) for entity in message.reply_to.quote_entities] quote_entities = types.List(filter(lambda x: x is not None, quote_entities))
quote_entities = types.List(filter(lambda x: x is not None, quote_entities))
parsed_message.quote = message.reply_to.quote parsed_message.quote = message.reply_to.quote
parsed_message.quote_text = ( parsed_message.quote_text = (
Str(message.reply_to.quote_text).init(quote_entities) or None Str(message.reply_to.quote_text).init(quote_entities) or None
if media is None or web_page is not None if media is None or web_page is not None
else None else None
) )
parsed_message.quote_entities = ( parsed_message.quote_entities = (
quote_entities or None quote_entities or None
if media is None or web_page is not None if media is None or web_page is not None
else None else None
) )
parsed_message.reply_to_message_id = message.reply_to.reply_to_msg_id
parsed_message.reply_to_top_message_id = message.reply_to.reply_to_top_id
elif isinstance(message.reply_to, raw.types.MessageReplyStoryHeader): elif isinstance(message.reply_to, raw.types.MessageReplyStoryHeader):
parsed_message.reply_to_story_id = message.reply_to.story_id parsed_message.reply_to_story_id = message.reply_to.story_id
parsed_message.reply_to_story_user_id = utils.get_peer_id(message.reply_to.peer) parsed_message.reply_to_story_user_id = utils.get_peer_id(message.reply_to.peer)
if replies: if replies:
if parsed_message.reply_to_message_id: if raw_reply_to_message:
is_cross_chat = getattr(message.reply_to, "reply_to_peer_id", None) and getattr(message.reply_to.reply_to_peer_id, "channel_id", None) parsed_message.reply_to_message = await types.Message._parse(
client,
raw_reply_to_message,
users,
chats,
business_connection_id=business_connection_id,
replies=0
)
else:
if isinstance(message.reply_to, raw.types.MessageReplyHeader):
if message.reply_to.reply_to_peer_id:
key = (utils.get_peer_id(message.reply_to.reply_to_peer_id), message.reply_to.reply_to_msg_id)
reply_to_params = {"chat_id": key[0], 'message_ids': key[1]}
else:
key = (parsed_message.chat.id, parsed_message.reply_to_message_id)
reply_to_params = {'chat_id': key[0], 'reply_to_message_ids': message.id}
if is_cross_chat:
key = (utils.get_channel_id(message.reply_to.reply_to_peer_id.channel_id), message.reply_to.reply_to_msg_id)
reply_to_params = {"chat_id": key[0], 'message_ids': key[1]}
else:
key = (parsed_message.chat.id, parsed_message.reply_to_message_id)
reply_to_params = {'chat_id': key[0], 'reply_to_message_ids': message.id}
try:
reply_to_message = client.message_cache[key] reply_to_message = client.message_cache[key]
if not reply_to_message: if not reply_to_message:
@ -1187,32 +1192,24 @@ class Message(Object, Update):
replies=replies - 1, replies=replies - 1,
**reply_to_params **reply_to_params
) )
except ChannelPrivate: except (ChannelPrivate, MessageIdsEmpty):
pass pass
if reply_to_message and not reply_to_message.forum_topic_created:
parsed_message.reply_to_message = reply_to_message
except ChannelPrivate:
pass
except MessageIdsEmpty:
pass
elif parsed_message.reply_to_story_id:
try:
reply_to_story = await client.get_stories(
parsed_message.reply_to_story_user_id,
parsed_message.reply_to_story_id
)
except BotMethodInvalid:
pass
else:
parsed_message.reply_to_story = reply_to_story
if parsed_message.topic is None and parsed_message.chat.is_forum: parsed_message.reply_to_message = reply_to_message
elif isinstance(message.reply_to, raw.types.MessageReplyStoryHeader):
if client.me and not client.me.is_bot:
parsed_message.reply_to_story = await client.get_stories(
utils.get_peer_id(message.reply_to.peer),
message.reply_to.story_id
)
if not parsed_message.topic and parsed_message.chat.is_forum and client.me and not client.me.is_bot:
try: try:
parsed_message.topic = await client.get_forum_topics_by_id( parsed_message.topic = await client.get_forum_topics_by_id(
chat_id=parsed_message.chat.id, chat_id=parsed_message.chat.id,
topic_ids=parsed_message.message_thread_id or 1 topic_ids=parsed_message.message_thread_id or 1
) )
except (BotMethodInvalid, ChannelForumMissing): except (ChannelPrivate, ChannelForumMissing):
pass pass
if not parsed_message.poll: # Do not cache poll messages if not parsed_message.poll: # Do not cache poll messages

View File

@ -92,121 +92,112 @@ def get_input_media_from_file_id(
async def parse_messages( async def parse_messages(
client, client: "pyrogram.Client",
messages: "raw.types.messages.Messages", messages: Union["raw.base.messages.Messages", "raw.base.Updates"],
replies: int = 1, replies: int = 1,
business_connection_id: str = None business_connection_id: str = None
) -> List["types.Message"]: ) -> List["types.Message"]:
users = {i.id: i for i in messages.users} users = {i.id: i for i in getattr(messages, "users", [])}
chats = {i.id: i for i in messages.chats} chats = {i.id: i for i in getattr(messages, "chats", [])}
topics = {i.id: i for i in messages.topics} if hasattr(messages, "topics") else None topics = {i.id: i for i in getattr(messages, "topics", [])}
if not messages.messages:
return types.List()
parsed_messages = [] parsed_messages = []
for message in messages.messages: if isinstance(
parsed_messages.append( messages,
await types.Message._parse( (
client, raw.types.messages.ChannelMessages,
message, raw.types.messages.Messages,
users, raw.types.messages.MessagesNotModified,
chats, raw.types.messages.MessagesSlice
topics,
replies=0,
business_connection_id=business_connection_id
)
) )
):
if not messages.messages:
return types.List()
if replies: for message in messages.messages:
messages_with_replies = { parsed_messages.append(
i.id: i.reply_to await types.Message._parse(
for i in messages.messages client=client,
if not isinstance(i, raw.types.MessageEmpty) and i.reply_to and isinstance(i.reply_to, raw.types.MessageReplyHeader) message=message,
} users=users,
chats=chats,
topics=topics,
replies=0,
business_connection_id=business_connection_id
)
)
message_reply_to_story = { if replies:
i.id: {'user_id': i.reply_to.user_id, 'story_id': i.reply_to.story_id} messages_with_replies = {}
for i in messages.messages messages_with_story_replies = {}
if not isinstance(i, raw.types.MessageEmpty) and i.reply_to and isinstance(i.reply_to, raw.types.MessageReplyStoryHeader)
}
if messages_with_replies: for m in messages.messages:
# We need a chat id, but some messages might be empty (no chat attribute available) if isinstance(m, raw.types.MessageEmpty):
# Scan until we find a message with a chat available (there must be one, because we are fetching replies)
for m in parsed_messages:
if not isinstance(m, types.Message):
continue continue
if m.chat: if m.reply_to and isinstance(m.reply_to, raw.types.MessageReplyHeader):
chat_id = m.chat.id messages_with_replies[m.id] = m.reply_to
break
else:
chat_id = 0
is_all_within_chat = not any( if m.reply_to and isinstance(m.reply_to, raw.types.MessageReplyStoryHeader):
value.reply_to_peer_id messages_with_story_replies[m.id] = m.reply_to
for value in messages_with_replies.values()
) if messages_with_replies:
reply_messages: List[pyrogram.types.Message] = [] # We need a chat id, but some messages might be empty (no chat attribute available)
if is_all_within_chat: # Scan until we find a message with a chat available (there must be one, because we are fetching replies)
# fast path: fetch all messages within the same chat chat_id = next((m.chat.id for m in parsed_messages if m.chat), 0)
reply_messages = await client.get_messages(
chat_id, is_all_replies_in_same_chat = not any(m.reply_to_peer_id for m in messages_with_replies.values())
reply_to_message_ids=messages_with_replies.keys(), reply_messages: List["types.Message"] = []
replies=replies - 1
) if is_all_replies_in_same_chat:
else: reply_messages = await client.get_messages(
# slow path: fetch all messages individually chat_id,
for target_reply_to in messages_with_replies.values(): reply_to_message_ids=list(messages_with_replies.keys()),
to_be_added_msg = None
the_chat_id = chat_id
if target_reply_to.reply_to_peer_id:
the_chat_id = get_channel_id(target_reply_to.reply_to_peer_id.channel_id)
to_be_added_msg = await client.get_messages(
chat_id=the_chat_id,
message_ids=target_reply_to.reply_to_msg_id,
replies=replies - 1 replies=replies - 1
) )
if isinstance(to_be_added_msg, list): else:
for current_to_be_added in to_be_added_msg: for reply_header in messages_with_replies.values():
reply_messages.append(current_to_be_added) reply_messages.append(
elif to_be_added_msg: await client.get_messages(
reply_messages.append(to_be_added_msg) chat_id=get_peer_id(reply_header.reply_to_peer_id) if getattr(reply_header, "reply_to_peer_id", None) else chat_id,
message_ids=reply_header.reply_to_msg_id,
replies=replies - 1
)
)
for message in parsed_messages: for message in parsed_messages:
reply_to = messages_with_replies.get(message.id, None) reply_to = messages_with_replies.get(message.id, None)
if not reply_to:
continue
reply_id = reply_to.reply_to_msg_id if not reply_to:
continue
for reply in reply_messages: for reply in reply_messages:
if reply.id == reply_id and not reply.forum_topic_created: if reply.id == reply_to.reply_to_msg_id:
message.reply_to_message = reply message.reply_to_message = reply
else:
if message_reply_to_story: for u in getattr(messages, "updates", []):
for m in parsed_messages: if isinstance(
if not isinstance(m, types.Message): u,
continue (
raw.types.UpdateNewMessage,
if m.chat: raw.types.UpdateNewChannelMessage,
chat_id = m.chat.id raw.types.UpdateNewScheduledMessage,
break raw.types.UpdateBotNewBusinessMessage,
else: )
chat_id = 0 ):
parsed_messages.append(
reply_messages = {} await types.Message._parse(
for msg_id in message_reply_to_story: client,
reply_messages[msg_id] = await client.get_stories( u.message,
message_reply_to_story[msg_id]['user_id'], users,
message_reply_to_story[msg_id]['story_id'] chats,
is_scheduled=isinstance(u, raw.types.UpdateNewScheduledMessage),
business_connection_id=getattr(u, "connection_id", business_connection_id),
raw_reply_to_message=getattr(u, "reply_to_message", None),
replies=0
)
) )
for message in parsed_messages:
if message.id in reply_messages:
message.reply_to_story = reply_messages[message.id]
return types.List(parsed_messages) return types.List(parsed_messages)
@ -214,7 +205,6 @@ async def parse_messages(
def parse_deleted_messages(client, update, users, chats) -> List["types.Message"]: def parse_deleted_messages(client, update, users, chats) -> List["types.Message"]:
messages = update.messages messages = update.messages
channel_id = getattr(update, "channel_id", None) channel_id = getattr(update, "channel_id", None)
business_connection_id = getattr(update, "connection_id", None)
peer = getattr(update, "peer", None) peer = getattr(update, "peer", None)
chat = None chat = None
@ -246,7 +236,7 @@ def parse_deleted_messages(client, update, users, chats) -> List["types.Message"
types.Message( types.Message(
id=message, id=message,
chat=chat, chat=chat,
business_connection_id=business_connection_id, business_connection_id=getattr(update, "connection_id", None),
client=client client=client
) )
) )