This commit is contained in:
XYenon 2021-06-03 22:18:49 +08:00
parent a29792503f
commit 3f2a22e61b

View File

@ -5,7 +5,7 @@ import time
import uuid
from datetime import timedelta, datetime
from gettext import translation
from typing import Any, Dict, List, BinaryIO
from typing import Any, Dict, List, BinaryIO, Tuple
import cherrypy
import cqhttp
@ -48,10 +48,11 @@ class GoCQHttp(BaseClient):
ngettext = translator.ngettext
friend_list = []
friend_group = {}
friend_remark = {}
friend_dict: Dict[int, dict] = {}
group_list = []
group_member_list: Dict[str, Dict[str, Any]] = {}
group_dict: Dict[int, dict] = {}
group_member_dict: Dict[int, Dict[str, Any]] = {}
group_member_info_dict: Dict[Tuple[int, int], dict] = {}
discuss_list = []
extra_group_list = []
repeat_counter = 0
@ -110,9 +111,7 @@ class GoCQHttp(BaseClient):
if remark is not None:
context['nickname'] = remark
g_id = context['group_id']
member_info = self.coolq_api_query('get_group_member_info',
group_id=g_id,
user_id=qq_uid)
member_info = self.get_group_member_info(group_id=g_id, user_id=qq_uid)
if member_info is not None:
context['alias'] = member_info['card']
author = self.chat_manager.build_or_get_efb_member(chat, context)
@ -146,9 +145,7 @@ class GoCQHttp(BaseClient):
if str(msg_data['qq']) == 'all':
group_card = 'all'
else:
member_info = self.coolq_api_query('get_group_member_info',
group_id=g_id,
user_id=msg_data['qq'])
member_info = self.get_group_member_info(group_id=g_id, user_id=msg_data['qq'])
group_card = ""
if member_info:
group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname']
@ -260,9 +257,7 @@ class GoCQHttp(BaseClient):
file_info_msg = self._('File ID: {file[id]}\n'
'Filename: {file[name]}\n'
'File size: {file[size]}').format(file=context['file'])
member_info = self.coolq_api_query('get_group_member_info',
group_id=context['group_id'],
user_id=context['user_id'])
member_info = self.get_group_member_info(group_id=context['group_id'], user_id=context['user_id'])
group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname']
text = self._('{member_card}({context[user_id]}) uploaded a file to group({group_name})\n')
text = text.format(member_card=group_card,
@ -447,23 +442,16 @@ class GoCQHttp(BaseClient):
return groups + self.discuss_list
def get_friends(self) -> List:
# Warning: Experimental API
try:
self.update_friend_list() # Force update friend list
self.update_friend_group()
except CoolQAPIFailureException:
self.deliver_alert_to_master(self._('Failed to retrieve the friend list.\n'
'Only groups are shown.'))
return []
users = []
for current_user in self.friend_list: # friend group
if current_user['user_id'] in self.friend_group:
txt = '[{}] {}'.format(self.friend_group[current_user['user_id']], current_user['nickname'])
else:
txt = '{}'.format(current_user['nickname'])
# Disable nickname & remark comparison for it's too time-consuming
for current_user in self.friend_list:
context = {'user_id': str(current_user['user_id']),
'nickname': txt,
'nickname': current_user['nickname'],
'alias': current_user['remark']}
efb_chat = self.chat_manager.build_efb_chat_as_private(context)
users.append(efb_chat)
@ -565,31 +553,33 @@ class GoCQHttp(BaseClient):
else:
return None
def get_group_member_info(self, group_id, user_id):
res = self.coolq_api_query('get_group_member_info', group_id=group_id, user_id=user_id, no_cache=True)
# res = self.coolq_bot.get_group_member_info(group_id=group_id, user_id=user_id, no_cache=True)
return res
pass
def get_group_member_info(self, group_id: int, user_id: int, no_cache: bool = False) -> dict:
group_member_info = self.group_member_info_dict.get((group_id, user_id))
if no_cache or (group_member_info is None):
group_member_info = self.coolq_api_query('get_group_member_info', group_id=group_id, user_id=user_id,
no_cache=no_cache)
self.group_member_info_dict[(group_id, user_id)] = group_member_info
return group_member_info
def get_group_member_list(self, group_id, no_cache=True):
if no_cache or group_id not in self.group_member_list \
or datetime.now() - self.group_member_list[group_id]['time'] > timedelta(hours=1): # Force Update
def get_group_member_list(self, group_id, no_cache=False):
if no_cache or (group_id not in self.group_member_dict) \
or (datetime.now() - self.group_member_dict[group_id]['time'] > timedelta(hours=1)): # Force Update
try:
member_list = self.coolq_api_query('get_group_member_list', group_id=group_id)
except CoolQAPIFailureException as e:
self.deliver_alert_to_master(self._("Failed the get group member detail.") + "{}".format(e))
return None
self.group_member_list[group_id] = {
self.group_member_dict[group_id] = {
'members': member_list,
'time': datetime.now()
}
return self.group_member_list[group_id]['members']
return self.group_member_dict[group_id]['members']
def get_user_info(self, user_id: int, no_cache=False):
if no_cache or not self.friend_list:
self.update_friend_list()
user = {'user_id': user_id}
friend = self.friend_remark.get(user_id)
friend = self.friend_dict.get(user_id)
if friend:
user['is_friend'] = True
user['nickname'] = friend['nickname']
@ -601,31 +591,26 @@ class GoCQHttp(BaseClient):
def get_group_info(self, group_id, no_cache=False):
if no_cache or not self.group_list:
self.group_list = self.coolq_api_query('get_group_list')
res = self.group_list
if not res:
self.deliver_alert_to_master(self._("Failed the get group detail"))
return None
# res = self.coolq_bot.get_group_list()
for i in range(len(res)):
if res[i]['group_id'] == group_id:
return res[i]
self.update_group_list()
group = self.group_dict.get(group_id)
if group:
return group
if no_cache:
for extra_group in self.extra_group_list:
if extra_group['group_id'] == group_id:
return extra_group
try:
external_group = self.get_external_group_info(group_id)
except Exception:
except CoolQAPIFailureException:
self.logger.error(f'Get external group({group_id}) info failed.')
return None
else:
new_group = {'group_id': group_id, 'group_name': external_group['group_name']}
for i in range(len(self.extra_group_list)):
if str(self.extra_group_list[i]['group_id']) == str(group_id):
return new_group
self.extra_group_list.append(new_group)
return new_group
self.extra_group_list.append(external_group)
return external_group
def coolq_send_message(self, msg_type, uid, message):
keyword = msg_type if msg_type != 'private' else 'user'
res = self.coolq_api_query('send_msg', message_type=msg_type, **{keyword + '_id': uid}, message=message)
# res = self.coolq_bot.send_msg(message_type=msg_type, **{keyword + '_id': uid}, message=message)
return str(uuid.uuid4()) + '_' + str(res['message_id'])
def _coolq_api_wrapper(self, func_name, **kwargs):
@ -639,7 +624,6 @@ class GoCQHttp(BaseClient):
api_ex = CoolQAPIFailureException(self._('CoolQ HTTP API encountered an error!\n'
'Status Code:{} '
'Return Code:{}').format(ex.status_code, ex.retcode))
# if ex.status_code == 200 and ex.retcode == 104: # Cookie expired
setattr(api_ex, 'status_code', ex.status_code)
setattr(api_ex, 'retcode', ex.retcode)
raise api_ex
@ -713,10 +697,6 @@ class GoCQHttp(BaseClient):
context = {'message': message, 'uid_prefix': 'alert', 'event_description': self._('CoolQ Alert')}
self.send_msg_to_master(context)
def update_friend_group(self):
# Mirai doesn't support retrieving friend group, neither does get_credentials unfortunately
return {}
def update_friend_list(self):
self.friend_list = self.coolq_api_query('get_friend_list')
if self.friend_list:
@ -724,10 +704,7 @@ class GoCQHttp(BaseClient):
for friend in self.friend_list:
if friend['remark'] == '':
friend['remark'] = friend['nickname']
self.friend_remark[friend['user_id']] = {
'nickname': friend['nickname'],
'remark': friend['remark']
}
self.friend_dict[friend['user_id']] = friend
else:
self.logger.warning('Failed to update friend list')
@ -735,6 +712,8 @@ class GoCQHttp(BaseClient):
self.group_list = self.coolq_api_query('get_group_list')
if self.group_list:
self.logger.debug('Update group list completed. Entries: %s', len(self.group_list))
for group in self.group_list:
self.group_dict[group['group_id']] = group
else:
self.logger.warning('Failed to update group list')
@ -761,16 +740,11 @@ class GoCQHttp(BaseClient):
self.update_contacts_timer.start()
def get_friend_remark(self, uid):
if (not self.friend_list) or (uid not in self.friend_remark):
try:
self.update_friend_list()
except CoolQAPIFailureException:
# self.deliver_alert_to_master(self._('Failed to update friend remark name'))
self.logger.exception(self._('Failed to update friend remark name'))
return ''
if uid not in self.friend_remark:
if (not self.friend_list) or (uid not in self.friend_dict):
self.update_friend_list()
if uid not in self.friend_dict:
return None # I don't think you have such a friend
return self.friend_remark[uid]['remark']
return self.friend_dict[uid]['remark']
def send_efb_group_notice(self, context):
context['message_type'] = 'group'
@ -817,10 +791,8 @@ class GoCQHttp(BaseClient):
# As the old saying goes
# A programmer spent 20% of time on coding
# while the rest 80% on considering a variable/function/class name
# todo Deprecated
def get_external_group_info(self, group_id): # Special thanks to @lwl12 for thinking of this name
res = self.coolq_api_query('_get_group_info',
group_id=group_id)
res = self.coolq_api_query('get_group_info', group_id=group_id)
return res
def send_status(self, status: 'Status'):