This commit is contained in:
XYenon 2021-06-03 23:36:22 +08:00
parent 3f2a22e61b
commit 98d39a2663

View File

@ -1,3 +1,4 @@
import copy
import logging
import tempfile
import threading
@ -5,7 +6,7 @@ import time
import uuid
from datetime import timedelta, datetime
from gettext import translation
from typing import Any, Dict, List, BinaryIO, Tuple
from typing import Any, Dict, List, BinaryIO, Tuple, Optional
import cherrypy
import cqhttp
@ -49,6 +50,7 @@ class GoCQHttp(BaseClient):
friend_list = []
friend_dict: Dict[int, dict] = {}
stranger_dict: Dict[int, dict] = {}
group_list = []
group_dict: Dict[int, dict] = {}
group_member_dict: Dict[int, Dict[str, Any]] = {}
@ -82,7 +84,7 @@ class GoCQHttp(BaseClient):
@self.coolq_bot.on_message
def handle_msg(context):
self.logger.debug(repr(context))
msg_element = context['message']
msg_elements = context['message']
main_text: str = ''
messages: List[Message] = []
qq_uid = context['user_id']
@ -90,11 +92,10 @@ class GoCQHttp(BaseClient):
chat: Chat
author: ChatMember
remark = self.get_friend_remark(qq_uid)
user = self.get_user_info(qq_uid)
if context['message_type'] == 'private':
context['alias'] = remark
context['alias'] = user['remark']
chat: PrivateChat = self.chat_manager.build_efb_chat_as_private(context)
# efb_msg.chat: EFBChat = self.chat_manager.build_efb_chat_as_user(context, True)
else:
chat = self.chat_manager.build_efb_chat_as_group(context)
@ -108,12 +109,9 @@ class GoCQHttp(BaseClient):
uid=ChatID("__{context[uid_prefix]}__".format(context=context))
)
else:
if remark is not None:
context['nickname'] = remark
g_id = context['group_id']
member_info = self.get_group_member_info(group_id=g_id, user_id=qq_uid)
if member_info is not None:
context['alias'] = member_info['card']
user = self.get_user_info(qq_uid, group_id=context['group_id'])
context['nickname'] = user['remark']
context['alias'] = user['in_group_info']['card']
author = self.chat_manager.build_or_get_efb_member(chat, context)
elif context['message_type'] == 'private':
author = chat.other
@ -122,9 +120,9 @@ class GoCQHttp(BaseClient):
else: # anonymous user in group
author = self.chat_manager.build_efb_chat_as_anonymous_user(chat, context)
for i in range(len(msg_element)):
msg_type = msg_element[i]['type']
msg_data = msg_element[i]['data']
for msg_element in msg_elements:
msg_type = msg_element['type']
msg_data = msg_element['data']
if msg_type == 'text':
main_text += msg_data['text']
elif msg_type == 'face':
@ -141,17 +139,12 @@ class GoCQHttp(BaseClient):
my_uid = self.get_qq_uid()
self.logger.debug('My QQ uid: %s\n'
'QQ mentioned: %s\n', my_uid, msg_data['qq'])
group_card = ''
if str(msg_data['qq']) == 'all':
group_card = 'all'
else:
member_info = self.get_group_member_info(group_id=g_id, user_id=msg_data['qq'])
group_card = ""
if member_info:
group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname']
member_info = self.get_user_info(msg_data['qq'], group_id=g_id)['in_group_info']
group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname']
self.logger.debug('Group card: {}'.format(group_card))
substitution_begin = 0
substitution_end = 0
if main_text == '':
substitution_begin = len(main_text)
substitution_end = len(main_text) + len(group_card) + 1
@ -248,7 +241,7 @@ class GoCQHttp(BaseClient):
@self.coolq_bot.on_notice('group_upload')
def handle_group_file_upload_msg(context):
context['event_description'] = self._("\u2139 Group File Upload Event")
context['uid_prefix'] = 'group_upload'
original_group = self.get_group_info(context['group_id'], False)
group_name = context['group_id']
if original_group is not None and 'group_name' in original_group:
@ -257,7 +250,7 @@ class GoCQHttp(BaseClient):
file_info_msg = self._('File ID: {file[id]}\n'
'Filename: {file[name]}\n'
'File size: {file[size]}').format(file=context['file'])
member_info = self.get_group_member_info(group_id=context['group_id'], user_id=context['user_id'])
member_info = self.get_user_info(context['user_id'], group_id=context['group_id'])['in_group_info']
group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname']
text = self._('{member_card}({context[user_id]}) uploaded a file to group({group_name})\n')
text = text.format(member_card=group_card,
@ -405,8 +398,9 @@ class GoCQHttp(BaseClient):
self.check_status_periodically(None)
return 'Done'
def get_stranger_info(self, user_id):
return self.coolq_api_query('get_stranger_info', user_id=user_id)
def get_stranger_info(self, user_id: int, no_cache: bool = False) -> Dict[str, Any]:
user_id = int(user_id)
return self.get_user_info(user_id, no_cache=no_cache)
def get_login_info(self) -> Dict[Any, Any]:
res = self.coolq_bot.get_status()
@ -553,40 +547,44 @@ class GoCQHttp(BaseClient):
else:
return None
def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> dict:
group_member_info = self.group_member_info_dict.get((group_id, user_id))
if no_cache or (group_member_info is None):
group_member_info = self.coolq_api_query('get_group_member_info', group_id=group_id, user_id=user_id,
no_cache=no_cache)
self.group_member_info_dict[(group_id, user_id)] = group_member_info
return group_member_info
def get_group_member_list(self, group_id, no_cache=False):
def get_group_member_list(self, group_id, no_cache=False) -> List[Dict[str, Any]]:
if no_cache or (group_id not in self.group_member_dict) \
or (datetime.now() - self.group_member_dict[group_id]['time'] > timedelta(hours=1)): # Force Update
try:
member_list = self.coolq_api_query('get_group_member_list', group_id=group_id)
member_list = self.coolq_api_query('get_group_member_list', group_id=group_id, no_cache=no_cache)
except CoolQAPIFailureException as e:
self.deliver_alert_to_master(self._("Failed the get group member detail.") + "{}".format(e))
return None
return []
self.group_member_dict[group_id] = {
'members': member_list,
'time': datetime.now()
}
return self.group_member_dict[group_id]['members']
def get_user_info(self, user_id: int, no_cache=False):
if no_cache or not self.friend_list:
def get_user_info(self, user_id: int, group_id: Optional[str] = None, no_cache=False):
user_id = int(user_id)
if no_cache or (not self.friend_list) or (user_id not in self.friend_dict):
self.update_friend_list()
user = {'user_id': user_id}
friend = self.friend_dict.get(user_id)
if friend:
user = copy.deepcopy(friend)
user['is_friend'] = True
user['nickname'] = friend['nickname']
user['remark'] = friend['remark']
else:
user = self.stranger_dict.get(user_id)
if no_cache or (user is None):
user = self.coolq_api_query('get_stranger_info', user_id=user_id)
self.stranger_dict[user_id] = user
user['is_friend'] = False
user['nickname'] = self.get_stranger_info(user_id)['nickname']
if group_id is not None:
user['is_in_group'] = False
for member in self.get_group_member_list(group_id):
if member['user_id'] == user_id:
user['is_in_group'] = True
user['in_group_info'] = member
break
remark = user.get('remark')
if not remark:
user['remark'] = user['nickname']
return user
def get_group_info(self, group_id, no_cache=False):