import copy import logging import tempfile import threading import time import uuid from datetime import timedelta, datetime from gettext import translation from typing import Any, Dict, List, BinaryIO, Tuple, Optional, Union import cherrypy import cqhttp from PIL import Image from cherrypy._cpserver import Server from cherrypy.process.wspbus import states from cqhttp import CQHttp from efb_qq_slave import BaseClient, QQMessengerChannel from ehforwarderbot import Message, MsgType, Chat, coordinator, Status from ehforwarderbot.chat import SelfChatMember, ChatMember, SystemChatMember, PrivateChat from ehforwarderbot.exceptions import EFBMessageError, EFBOperationNotSupported, EFBChatNotFound from ehforwarderbot.message import MessageCommands, MessageCommand from ehforwarderbot.status import MessageRemoval from ehforwarderbot.types import ChatID from ehforwarderbot.utils import extra from pkg_resources import resource_filename from requests import RequestException from .ChatMgr import ChatManager from .Exceptions import CoolQDisconnectedException, CoolQAPIFailureException, CoolQOfflineException from .MsgDecorator import QQMsgProcessor from .Utils import qq_emoji_list, async_send_messages_to_master, process_quote_text, coolq_text_encode, \ download_user_avatar, download_group_avatar, download_file class GoCQHttp(BaseClient): client_name: str = "GoCQHttp Client" client_id: str = "GoCQHttp" client_config: Dict[str, Any] coolq_bot: CQHttp = None logger: logging.Logger = logging.getLogger(__name__) channel: QQMessengerChannel translator = translation("efb_qq_slave", resource_filename('efb_qq_slave', 'Clients/CoolQ/locale'), fallback=True) _ = translator.gettext ngettext = translator.ngettext friend_list = [] friend_dict: Dict[int, dict] = {} stranger_dict: Dict[int, dict] = {} group_list = [] group_dict: Dict[int, dict] = {} group_member_dict: Dict[int, Dict[str, Any]] = {} group_member_info_dict: Dict[Tuple[int, int], dict] = {} discuss_list = [] extra_group_list = [] repeat_counter = 0 update_repeat_counter = 0 event = threading.Event() update_contacts_timer: threading.Timer self_update_timer: threading.Timer check_status_timer: threading.Timer cherryServer: Server can_send_image: bool = False can_send_voice: bool = False def __init__(self, client_id: str, config: Dict[str, Any], channel): super().__init__(client_id, config) self.client_config = config[self.client_id] self.coolq_bot = CQHttp(api_root=self.client_config['api_root'], access_token=self.client_config['access_token'] ) self.channel = channel self.chat_manager = ChatManager(channel) self.is_connected = False self.is_logged_in = False self.msg_decorator = QQMsgProcessor(instance=self) def message_element_wrapper(context: Dict[str, Any], msg_element: Dict[str, Any], chat: Chat) -> \ Tuple[str, List[Message], List[Tuple[Tuple[int, int], Union[Chat, ChatMember]]]]: msg_type = msg_element['type'] msg_data = msg_element['data'] main_text: str = '' messages: List[Message] = [] at_list: List[Tuple[Tuple[int, int], Union[Chat, ChatMember]]] = [] if msg_type == 'text': main_text = msg_data['text'] elif msg_type == 'face': qq_face = int(msg_data['id']) if qq_face in qq_emoji_list: main_text = qq_emoji_list[qq_face] else: main_text = '\u2753' # ❓ elif msg_type == 'sface': main_text = '\u2753' # ❓ elif msg_type == 'at': # todo Recheck if bug exists g_id = context['group_id'] my_uid = self.get_qq_uid() self.logger.debug('My QQ uid: %s\n' 'QQ mentioned: %s\n', my_uid, msg_data['qq']) if str(msg_data['qq']) == 'all': group_card = 'all' else: member_info = self.get_user_info(msg_data['qq'], group_id=g_id)['in_group_info'] group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname'] self.logger.debug('Group card: {}'.format(group_card)) substitution_begin = len(main_text) substitution_end = len(main_text) + len(group_card) + 1 main_text = '@{} '.format(group_card) if str(my_uid) == str(msg_data['qq']) or str(msg_data['qq']) == 'all': at_dict = ((substitution_begin, substitution_end), chat.self) at_list.append(at_dict) elif msg_type == 'reply': ref_user = self.get_user_info(msg_data['qq']) main_text = f'「{ref_user["remark"]}({ref_user["nickname"]}):{msg_data["text"]}」\n' \ '- - - - - - - - - - - - - - -\n' else: messages.extend(self.call_msg_decorator(msg_type, msg_data, chat)) return main_text, messages, at_list def message_elements_wrapper(context: Dict[str, Any], msg_elements: List[Dict[str, Any]], chat: Chat) -> \ Tuple[str, List[Message], Dict[Tuple[int, int], Union[Chat, ChatMember]]]: messages: List[Message] = [] main_text: str = '' at_dict: Dict[Tuple[int, int], Union[Chat, ChatMember]] = {} for msg_element in msg_elements: sub_main_text, sub_messages, sub_at_list = message_element_wrapper(context, msg_element, chat) main_text_len = len(main_text) for at_tuple in sub_at_list: pos = (at_tuple[0][0] + main_text_len, at_tuple[0][1] + main_text_len) at_dict[pos] = at_tuple[1] main_text += sub_main_text messages.extend(sub_messages) return main_text, messages, at_dict @self.coolq_bot.on_message def handle_msg(context): self.logger.debug(repr(context)) msg_elements = context['message'] qq_uid = context['user_id'] chat: Chat author: ChatMember user = self.get_user_info(qq_uid) if context['message_type'] == 'private': context['alias'] = user['remark'] chat: PrivateChat = self.chat_manager.build_efb_chat_as_private(context) else: chat = self.chat_manager.build_efb_chat_as_group(context) if 'anonymous' not in context or context['anonymous'] is None: if context['message_type'] == 'group': if context['sub_type'] == 'notice': context['event_description'] = self._("System Notification") context['uid_prefix'] = 'group_notification' author = chat.add_system_member( name=context['event_description'], uid=ChatID("__{context[uid_prefix]}__".format(context=context)) ) else: user = self.get_user_info(qq_uid, group_id=context['group_id']) context['nickname'] = user['remark'] context['alias'] = user['in_group_info']['card'] author = self.chat_manager.build_or_get_efb_member(chat, context) elif context['message_type'] == 'private': author = chat.other else: author = self.chat_manager.build_or_get_efb_member(chat, context) else: # anonymous user in group author = self.chat_manager.build_efb_chat_as_anonymous_user(chat, context) main_text, messages, at_dict = message_elements_wrapper(context, msg_elements, chat) if main_text != "": messages.append(self.msg_decorator.qq_text_simple_wrapper(main_text, at_dict)) uid: str = str(uuid.uuid4()) coolq_msg_id = context['message_id'] for i in range(len(messages)): if not isinstance(messages[i], Message): continue efb_msg: Message = messages[i] efb_msg.uid = uid + '_' + str(coolq_msg_id) + '_' + str(i) efb_msg.chat = chat efb_msg.author = author # if qq_uid != '80000000': # Append discuss group into group list if context['message_type'] == 'discuss' and efb_msg.chat not in self.discuss_list: self.discuss_list.append(efb_msg.chat) efb_msg.deliver_to = coordinator.master def send_message_wrapper(*args, **kwargs): threading.Thread(target=async_send_messages_to_master, args=args, kwargs=kwargs).start() send_message_wrapper(efb_msg) @self.coolq_bot.on_notice('group_increase') def handle_group_increase_msg(context): context['event_description'] = self._('\u2139 Group Member Increase Event') if (context['sub_type']) == 'invite': text = self._('{nickname}({context[user_id]}) joined the group({group_name}) via invitation') else: text = self._('{nickname}({context[user_id]}) joined the group({group_name})') original_group = self.get_group_info(context['group_id'], False) group_name = context['group_id'] if original_group is not None and 'group_name' in original_group: group_name = original_group['group_name'] text = text.format(nickname=self.get_stranger_info(context['user_id'])['nickname'], context=context, group_name=group_name) context['message'] = text self.send_efb_group_notice(context) @self.coolq_bot.on_notice('group_decrease') def handle_group_decrease_msg(context): context['event_description'] = self._("\u2139 Group Member Decrease Event") original_group = self.get_group_info(context['group_id'], False) group_name = context['group_id'] if original_group is not None and 'group_name' in original_group: group_name = original_group['group_name'] text = '' if context['sub_type'] == 'kick_me': text = self._("You've been kicked from the group({})").format(group_name) else: if context['sub_type'] == 'leave': text = self._('{nickname}({context[user_id]}) quited the group({group_name})') else: text = self._('{nickname}({context[user_id]}) was kicked from the group({group_name})') text = text.format(nickname=self.get_stranger_info(context['user_id'])['nickname'], context=context, group_name=group_name) context['message'] = text self.send_efb_group_notice(context) @self.coolq_bot.on_notice('offline_file') def handle_offline_file_upload_msg(context): context['event_description'] = self._("\u2139 Offline File Upload Event") context['uid_prefix'] = 'offline_file' file_info_msg = self._('Filename: {file[name]}\n' 'File size: {file[size]}').format(file=context['file']) user = self.get_user_info(context['user_id']) text = self._('{remark}({nickname}) uploaded a file to you\n') text = text.format(remark=user['remark'], nickname=user['nickname']) + file_info_msg context['message'] = text self.send_msg_to_master(context) param_dict = { 'context': context, 'download_url': context['file']['url'], } threading.Thread(target=self.async_download_file, args=[], kwargs=param_dict).start() @self.coolq_bot.on_notice('group_upload') def handle_group_file_upload_msg(context): context['event_description'] = self._("\u2139 Group File Upload Event") context['uid_prefix'] = 'group_upload' original_group = self.get_group_info(context['group_id'], False) group_name = context['group_id'] if original_group is not None and 'group_name' in original_group: group_name = original_group['group_name'] file_info_msg = self._('File ID: {file[id]}\n' 'Filename: {file[name]}\n' 'File size: {file[size]}').format(file=context['file']) member_info = self.get_user_info(context['user_id'], group_id=context['group_id'])['in_group_info'] group_card = member_info['card'] if member_info['card'] != '' else member_info['nickname'] text = self._('{member_card}({context[user_id]}) uploaded a file to group({group_name})\n') text = text.format(member_card=group_card, context=context, group_name=group_name) + file_info_msg context['message'] = text self.send_efb_group_notice(context) param_dict = { 'context': context, 'group_id': context['group_id'], 'file_id': context['file']['id'], 'busid': context['file']['busid'] } threading.Thread(target=self.async_download_group_file, args=[], kwargs=param_dict).start() @self.coolq_bot.on_notice('friend_add') def handle_friend_add_msg(context): context['event_description'] = self._('\u2139 New Friend Event') context['uid_prefix'] = 'friend_add' text = self._('{nickname}({context[user_id]}) has become your friend!') text = text.format(nickname=self.get_stranger_info(context['user_id'])['nickname'], context=context) context['message'] = text self.send_msg_to_master(context) @self.coolq_bot.on_request('friend') # Add friend request def handle_add_friend_request(context): self.logger.debug(repr(context)) context['event_description'] = self._('\u2139 New Friend Request') context['uid_prefix'] = 'friend_request' text = self._('{nickname}({context[user_id]}) wants to be your friend!\n' 'Here is the verification comment:\n' '{context[comment]}') text = text.format(nickname=self.get_stranger_info(context['user_id'])['nickname'], context=context) context['message'] = text commands = [MessageCommand( name=self._("Accept"), callable_name="process_friend_request", kwargs={'result': 'accept', 'flag': context['flag']} ), MessageCommand( name=self._("Decline"), callable_name="process_friend_request", kwargs={'result': 'decline', 'flag': context['flag']} )] context['commands'] = commands self.send_msg_to_master(context) @self.coolq_bot.on_request('group') def handle_group_request(context): self.logger.debug(repr(context)) context['uid_prefix'] = 'group_request' context['group_name'] = self._('[Request]') + self.get_group_info(context['group_id'])['group_name'] context['group_id_orig'] = context['group_id'] context['group_id'] = str(context['group_id']) + "_notification" context['message_type'] = 'group' context['event_description'] = '\u2139 New Group Join Request' original_group = self.get_group_info(context['group_id'], False) group_name = context['group_id'] if original_group is not None and 'group_name' in original_group: group_name = original_group['group_name'] msg = Message() msg.uid = 'group' + '_' + str(context['group_id']) msg.author = self.chat_manager.build_efb_chat_as_system_user(context) msg.chat = self.chat_manager.build_efb_chat_as_group(context) msg.deliver_to = coordinator.master msg.type = MsgType.Text name = "" if not self.get_friend_remark(context['user_id']): name = "{}({})[{}] ".format( self.get_stranger_info(context['user_id'])['nickname'], self.get_friend_remark(context['user_id']), context['user_id']) else: name = "{}[{}] ".format(self.get_stranger_info(context['user_id'])['nickname'], context['user_id']) msg.text = "{} wants to join the group {}({}). \nHere is the comment: {}".format( name, group_name, context['group_id_orig'], context['comment'] ) msg.commands = MessageCommands([MessageCommand( name=self._("Accept"), callable_name="process_group_request", kwargs={'result': 'accept', 'flag': context['flag'], 'sub_type': context['sub_type']} ), MessageCommand( name=self._("Decline"), callable_name="process_group_request", kwargs={'result': 'decline', 'flag': context['flag'], 'sub_type': context['sub_type']} )]) coordinator.send_message(msg) self.check_status_periodically(threading.Event()) self.update_contacts_timer = threading.Timer(1800, self.update_contacts_periodically, [threading.Event()]) self.update_contacts_timer.start() # threading.Thread(target=self.check_running_status).start() def run_instance(self, *args, **kwargs): # threading.Thread(target=self.coolq_bot.run, args=args, kwargs=kwargs, daemon=True).start() cherrypy.tree.graft(self.coolq_bot.wsgi, "/") cherrypy.server.unsubscribe() self.cherryServer = Server() self.cherryServer.socket_host = self.client_config['host'] self.cherryServer.socket_port = self.client_config['port'] self.cherryServer.subscribe() cherrypy.engine.start() cherrypy.engine.wait(states.EXITING) @extra(name=_("Restart CoolQ Client"), desc=_("Force CoolQ to restart\n" "Usage: {function_name} [-l] [-c] [-e]\n" " -l: Restart and clean log\n" " -c: Restart and clean cache\n" " -e: Restart and clean event\n")) def relogin(self, param: str = ""): param_dict = dict() if param: params = param.split(' ') for each_param in params: if each_param == ' ': continue if each_param == '-l': param_dict['clean_log'] = 'true' elif each_param == '-c': param_dict['clean_cache'] = 'true' elif each_param == '-e': param_dict['clean_event'] = 'true' else: return self._("Unknown parameter: {}.").format(param) self.logger.debug(repr(param_dict)) self.coolq_api_query('_set_restart', **param_dict) return 'Done. Please wait for a while.' def logout(self): raise NotImplementedError @extra(name=_("Check CoolQ Status"), desc=_("Force efb-qq-slave to refresh status from CoolQ Client.\n" "Usage: {function_name}")) def login(self, param: str = ""): self.check_status_periodically(None) return 'Done' def get_stranger_info(self, user_id: int, no_cache: bool = False) -> Dict[str, Any]: user_id = int(user_id) return self.get_user_info(user_id, no_cache=no_cache) def get_login_info(self) -> Dict[Any, Any]: res = self.coolq_bot.get_status() if 'good' in res or 'online' in res: data = self.coolq_bot.get_login_info() return {'status': 0, 'data': {'uid': data['user_id'], 'nickname': data['nickname']}} else: return {'status': 1} def get_groups(self) -> List: # todo Add support for discuss group iteration self.update_group_list() # Force update group list res = self.group_list # res = self.coolq_bot.get_group_list() groups = [] for i in range(len(res)): context = {'message_type': 'group', 'group_id': res[i]['group_id']} efb_chat = self.chat_manager.build_efb_chat_as_group(context) groups.append(efb_chat) for i in range(len(self.extra_group_list)): does_exist = False for j in range(len(res)): if str(self.extra_group_list[i]['group_id']) == str(res[i]['group_id']): does_exist = True break if does_exist: continue context = {'message_type': 'group', 'group_id': self.extra_group_list[i]['group_id']} efb_chat = self.chat_manager.build_efb_chat_as_group(context) groups.append(efb_chat) return groups + self.discuss_list def get_friends(self) -> List: try: self.update_friend_list() # Force update friend list except CoolQAPIFailureException: self.deliver_alert_to_master(self._('Failed to retrieve the friend list.\n' 'Only groups are shown.')) return [] users = [] for current_user in self.friend_list: context = {'user_id': str(current_user['user_id']), 'nickname': current_user['nickname'], 'alias': current_user['remark']} efb_chat = self.chat_manager.build_efb_chat_as_private(context) users.append(efb_chat) return users def receive_message(self): # Replaced by handle_msg() pass def send_message(self, msg: 'Message') -> 'Message': # todo Add support for edited message """ self.logger.info("[%s] Sending message to WeChat:\n" "uid: %s\n" "Type: %s\n" "Text: %s\n" "Target Chat: %s\n" "Target uid: %s\n", msg.uid, msg.chat.chat_uid, msg.type, msg.text, repr(msg.target.chat), msg.target.uid) """ m = QQMsgProcessor(instance=self) chat_type = msg.chat.uid.split('_') self.logger.debug('[%s] Is edited: %s', msg.uid, msg.edit) if msg.edit: try: uid_type = msg.uid.split('_') self.recall_message(uid_type[1]) except CoolQAPIFailureException: raise EFBOperationNotSupported(self._("Failed to recall the message!\n" "This message may have already expired.")) if msg.type in [MsgType.Text, MsgType.Link]: if msg.text == "kick`": group_id = chat_type[1] user_id = msg.target.author.uid self.coolq_api_query("set_group_kick", group_id=group_id, user_id=user_id) else: if isinstance(msg.target, Message): max_length = 50 tgt_text = coolq_text_encode(process_quote_text(msg.target.text, max_length)) tgt_alias = "" if chat_type[0] != 'private' and not isinstance(msg.target.author, SelfChatMember): tgt_alias += m.coolq_code_at_wrapper(msg.target.author.uid) else: tgt_alias = "" msg.text = "%s%s\n\n%s" % (tgt_alias, tgt_text, coolq_text_encode(msg.text)) msg.uid = self.coolq_send_message(chat_type[0], chat_type[1], msg.text) self.logger.debug('[%s] Sent as a text message. %s', msg.uid, msg.text) elif msg.type in (MsgType.Image, MsgType.Sticker, MsgType.Animation): self.logger.info("[%s] Image/Sticker/Animation %s", msg.uid, msg.type) text = '' if not self.can_send_image: self.check_features() # Force checking features raise EFBOperationNotSupported(self._("Unable to send image now. Please check your CoolQ version " "or retry later")) if msg.type != MsgType.Sticker: text += m.coolq_code_image_wrapper(msg.file, msg.path) else: with tempfile.NamedTemporaryFile(suffix=".gif") as f: img = Image.open(msg.file) try: alpha = img.split()[3] mask = Image.eval(alpha, lambda a: 255 if a <= 128 else 0) except IndexError: mask = Image.eval(img.split()[0], lambda a: 0) img = img.convert('RGB').convert('P', palette=Image.ADAPTIVE, colors=255) img.paste(255, mask) img.save(f, transparency=255) msg.file.close() f.seek(0) text += m.coolq_code_image_wrapper(f, f.name) msg.uid = self.coolq_send_message(chat_type[0], chat_type[1], text) if msg.text: self.coolq_send_message(chat_type[0], chat_type[1], msg.text) # todo More MsgType Support elif msg.type is MsgType.Voice: if not self.can_send_voice: self.check_features() # Force checking features raise EFBOperationNotSupported(self._("Unable to send voice now. Please check your CoolQ version " " and install CoolQ audio library or retry later")) text = m.coolq_voice_image_wrapper(msg.file, msg.path) msg.uid = self.coolq_send_message(chat_type[0], chat_type[1], text) if msg.text: self.coolq_send_message(chat_type[0], chat_type[1], msg.text) return msg def call_msg_decorator(self, msg_type: str, *args) -> List[Message]: try: func = getattr(self.msg_decorator, 'qq_{}_wrapper'.format(msg_type)) except AttributeError: msg = f"Unsupported message type: {msg_type}" self.logger.error(msg) return self.msg_decorator.qq_unsupported_wrapper(msg) else: return func(*args) def get_qq_uid(self): res = self.get_login_info() if res['status'] == 0: return res['data']['uid'] else: return None def get_group_member_list(self, group_id, no_cache=False) -> List[Dict[str, Any]]: if no_cache or (group_id not in self.group_member_dict) \ or (datetime.now() - self.group_member_dict[group_id]['time'] > timedelta(hours=1)): # Force Update try: member_list = self.coolq_api_query('get_group_member_list', group_id=group_id, no_cache=no_cache) except CoolQAPIFailureException as e: self.deliver_alert_to_master(self._("Failed the get group member detail.") + "{}".format(e)) return [] self.group_member_dict[group_id] = { 'members': member_list, 'time': datetime.now() } return self.group_member_dict[group_id]['members'] def get_user_info(self, user_id: int, group_id: Optional[str] = None, no_cache=False): user_id = int(user_id) if no_cache or (not self.friend_list) or (user_id not in self.friend_dict): self.update_friend_list() friend = copy.deepcopy(self.friend_dict.get(user_id)) if friend: user = friend user['is_friend'] = True else: user = copy.deepcopy(self.stranger_dict.get(user_id)) if no_cache or (user is None): user = self.coolq_api_query('get_stranger_info', user_id=user_id) self.stranger_dict[user_id] = copy.deepcopy(user) user['is_friend'] = False if group_id is not None: user['is_in_group'] = False for member in self.get_group_member_list(group_id): if member['user_id'] == user_id: user['is_in_group'] = True user['in_group_info'] = member break remark = user.get('remark') if not remark: user['remark'] = user['nickname'] return user def get_group_info(self, group_id, no_cache=False): if no_cache or not self.group_list: self.update_group_list() group = self.group_dict.get(group_id) if group: return group if no_cache: for extra_group in self.extra_group_list: if extra_group['group_id'] == group_id: return extra_group try: external_group = self.get_external_group_info(group_id) except CoolQAPIFailureException: self.logger.error(f'Get external group({group_id}) info failed.') return None else: self.extra_group_list.append(external_group) return external_group def coolq_send_message(self, msg_type, uid, message): keyword = msg_type if msg_type != 'private' else 'user' res = self.coolq_api_query('send_msg', message_type=msg_type, **{keyword + '_id': uid}, message=message) return str(uuid.uuid4()) + '_' + str(res['message_id']) def _coolq_api_wrapper(self, func_name, **kwargs): try: func = getattr(self.coolq_bot, func_name) res = func(**kwargs) except RequestException as e: raise CoolQDisconnectedException(self._('Unable to connect to CoolQ Client!' 'Error Message:\n{}').format(str(e))) except cqhttp.Error as ex: api_ex = CoolQAPIFailureException(self._('CoolQ HTTP API encountered an error!\n' 'Status Code:{} ' 'Return Code:{}').format(ex.status_code, ex.retcode)) setattr(api_ex, 'status_code', ex.status_code) setattr(api_ex, 'retcode', ex.retcode) raise api_ex else: return res def check_running_status(self): res = self._coolq_api_wrapper('get_status') if res['good'] or res['online']: return True else: raise CoolQOfflineException(self._("CoolQ Client isn't working correctly!")) def coolq_api_query(self, func_name, **kwargs): """ # Do not call get_status too frequently if self.check_running_status(): return self._coolq_api_wrapper(func_name, **kwargs) """ if self.is_logged_in and self.is_connected: return self._coolq_api_wrapper(func_name, **kwargs) elif self.repeat_counter < 3: self.deliver_alert_to_master(self._('Your status is offline.\n' 'You may try login with /0_login')) self.repeat_counter += 1 def check_status_periodically(self, t_event): self.logger.debug('Start checking status...') flag = True interval = 300 try: flag = self.check_running_status() except CoolQDisconnectedException as e: if self.repeat_counter < 3: self.deliver_alert_to_master(self._("We're unable to communicate with CoolQ Client.\n" "Please check the connection and credentials provided.\n" "{}").format(str(e))) self.repeat_counter += 1 self.is_connected = False self.is_logged_in = False interval = 3600 except (CoolQOfflineException, CoolQAPIFailureException): if self.repeat_counter < 3: self.deliver_alert_to_master(self._('CoolQ is running in abnormal status.\n' 'You may need to relogin your account ' 'or have a check in CoolQ Client.\n')) self.repeat_counter += 1 self.is_connected = True self.is_logged_in = False interval = 3600 else: if not flag: if self.repeat_counter < 3: self.deliver_alert_to_master(self._("We don't know why, but status check failed.\n" "Please enable debug mode and consult the log " "for more details.")) self.repeat_counter += 1 self.is_connected = True self.is_logged_in = False interval = 3600 else: self.logger.debug('Status: OK') self.is_connected = True self.is_logged_in = True self.repeat_counter = 0 self.check_features() if t_event is not None and not t_event.is_set(): self.check_status_timer = threading.Timer(interval, self.check_status_periodically, [t_event]) self.check_status_timer.start() def deliver_alert_to_master(self, message: str): context = {'message': message, 'uid_prefix': 'alert', 'event_description': self._('CoolQ Alert')} self.send_msg_to_master(context) def update_friend_list(self): self.friend_list = self.coolq_api_query('get_friend_list') if self.friend_list: self.logger.debug('Update friend list completed. Entries: %s', len(self.friend_list)) for friend in self.friend_list: if friend['remark'] == '': friend['remark'] = friend['nickname'] self.friend_dict[friend['user_id']] = friend else: self.logger.warning('Failed to update friend list') def update_group_list(self): self.group_list = self.coolq_api_query('get_group_list') if self.group_list: self.logger.debug('Update group list completed. Entries: %s', len(self.group_list)) for group in self.group_list: self.group_dict[group['group_id']] = group else: self.logger.warning('Failed to update group list') def update_contacts_periodically(self, t_event): self.logger.debug('Start updating friend & group list') interval = 1800 if self.is_connected and self.is_logged_in: try: self.update_friend_list() self.update_group_list() except CoolQAPIFailureException as ex: if getattr(ex, 'status_code') == 200 and getattr(ex, 'retcode') == 104 \ and self.update_repeat_counter < 3: self.send_cookie_expired_alarm() if self.update_repeat_counter < 3: self.deliver_alert_to_master(self._('Errors occurred when updating contacts: ') + getattr(ex, 'message')) self.update_repeat_counter += 1 else: self.update_repeat_counter = 0 self.logger.debug('Update completed') if t_event is not None and not t_event.is_set(): self.update_contacts_timer = threading.Timer(interval, self.update_contacts_periodically, [t_event]) self.update_contacts_timer.start() def get_friend_remark(self, uid): if (not self.friend_list) or (uid not in self.friend_dict): self.update_friend_list() if uid not in self.friend_dict: return None # I don't think you have such a friend return self.friend_dict[uid]['remark'] def send_efb_group_notice(self, context): context['message_type'] = 'group' self.logger.debug(repr(context)) chat = self.chat_manager.build_efb_chat_as_group(context) try: author = chat.get_member(SystemChatMember.SYSTEM_ID) except KeyError: author = chat.add_system_member() msg = Message( uid="__group_notice__.%s" % int(time.time()), type=MsgType.Text, chat=chat, author=author, text=context['message'], deliver_to=coordinator.master ) coordinator.send_message(msg) def send_msg_to_master(self, context): self.logger.debug(repr(context)) if not getattr(coordinator, 'master', None): # Master Channel not initialized raise Exception(context['message']) chat = self.chat_manager.build_efb_chat_as_system_user(context) try: author = chat.get_member(SystemChatMember.SYSTEM_ID) except KeyError: author = chat.add_system_member() msg = Message( uid="__{context[uid_prefix]}__.{uni_id}".format(context=context, uni_id=str(int(time.time()))), type=MsgType.Text, chat=chat, author=author, deliver_to=coordinator.master ) if 'message' in context: msg.text = context['message'] if 'commands' in context: msg.commands = MessageCommands(context['commands']) coordinator.send_message(msg) # As the old saying goes # A programmer spent 20% of time on coding # while the rest 80% on considering a variable/function/class name def get_external_group_info(self, group_id): # Special thanks to @lwl12 for thinking of this name res = self.coolq_api_query('get_group_info', group_id=group_id) return res def send_status(self, status: 'Status'): if isinstance(status, MessageRemoval): if not isinstance(status.message.author, SelfChatMember): raise EFBMessageError(self._('You can only recall your own messages.')) try: uid_type = status.message.uid.split('_') self.recall_message(uid_type[1]) except CoolQAPIFailureException: raise EFBMessageError( self._('Failed to recall the message. This message may have already expired.')) else: raise EFBOperationNotSupported() # todo def recall_message(self, message_id): self.coolq_api_query('delete_msg', message_id=message_id) def send_cookie_expired_alarm(self): self.deliver_alert_to_master(self._('Your cookie of CoolQ Client seems to be expired. ' 'Although it will not affect the normal functioning of sending/receiving ' 'messages, however, you may encounter issues like failing to retrieve ' 'friend list. Please consult ' 'https://github.com/milkice233/efb-qq-slave/wiki/Workaround-for-expired' '-cookies-of-CoolQ for solutions.')) def process_friend_request(self, result, flag): res = 'true' if result == 'accept' else 'false' try: self.coolq_api_query('set_friend_add_request', approve=res, flag=flag) except CoolQAPIFailureException as e: return (self._('Failed to process request! Error Message:\n') + getattr(e, 'message', repr(e))) return 'Done' def process_group_request(self, result, flag, sub_type): res = 'true' if result == 'accept' else 'false' try: self.coolq_api_query('set_group_add_request', approve=res, flag=flag, sub_type=sub_type) except CoolQAPIFailureException as e: return (self._('Failed to process request! Error Message:\n') + getattr(e, 'message', repr(e))) return 'Done' def async_download_file(self, context, download_url): res = download_file(download_url) if isinstance(res, str): context['message'] = self._("[Download] ") + res self.send_efb_group_notice(context) elif res is None: pass else: data = {'file': res, 'filename': context['file']['name']} context['message_type'] = 'group' efb_msg = self.msg_decorator.qq_file_after_wrapper(data) efb_msg.uid = str(context['user_id']) + '_' + str(uuid.uuid4()) + '_' + str(1) efb_msg.text = 'Sent a file\n{}'.format(context['file']['name']) if context['uid_prefix'] == 'offline_file': efb_msg.chat = self.chat_manager.build_efb_chat_as_private(context) elif context['uid_prefix'] == 'group_upload': efb_msg.chat = self.chat_manager.build_efb_chat_as_group(context) efb_msg.author = self.chat_manager.build_or_get_efb_member(efb_msg.chat, context) efb_msg.deliver_to = coordinator.master async_send_messages_to_master(efb_msg) def async_download_group_file(self, context, group_id, file_id, busid): file = self.coolq_api_query('get_group_file_url', group_id=group_id, file_id=file_id, busid=busid) download_url = file['url'] self.async_download_file(context, download_url) def get_chat_picture(self, chat: 'Chat') -> BinaryIO: chat_type = chat.uid.split('_') if chat_type[0] == 'private': return download_user_avatar(chat_type[1]) elif chat_type[0] == 'group': return download_group_avatar(chat_type[1]) else: return download_group_avatar("") def get_chats(self): qq_chats = self.get_friends() group_chats = self.get_groups() return qq_chats + group_chats def get_chat(self, chat_uid: ChatID) -> 'Chat': # todo what is member_uid used for? chat_type = chat_uid.split('_') if chat_type[0] == 'private': qq_uid = int(chat_type[1]) remark = self.get_friend_remark(qq_uid) context = {"user_id": qq_uid} if remark is not None: context['alias'] = remark return self.chat_manager.build_efb_chat_as_private(context) elif chat_type[0] == 'group': group_id = int(chat_type[1]) context = {'message_type': 'group', 'group_id': group_id} return self.chat_manager.build_efb_chat_as_group(context, update_member=True) elif chat_type[0] == 'discuss': discuss_id = int(chat_type[1]) context = {'message_type': 'discuss', 'discuss_id': discuss_id} return self.chat_manager.build_efb_chat_as_group(context) raise EFBChatNotFound() def check_self_update(self, t_event): interval = 60 * 60 * 24 latest_version = self.channel.check_updates() if latest_version is not None: self.deliver_alert_to_master("New version({version}) of EFB-QQ-Slave has released! " "Please manually update EQS by stopping ehForwarderbot first and then execute " "pip3 install --upgrade efb-qq-slave" .format(version=latest_version)) else: if t_event is not None and not t_event.is_set(): self.self_update_timer = threading.Timer(interval, self.check_self_update, [t_event]) self.self_update_timer.start() def poll(self): self.check_self_update(threading.Event()) self.run_instance(host=self.client_config['host'], port=self.client_config['port'], debug=False) self.logger.debug("EQS gracefully shut down") def stop_polling(self): self.update_contacts_timer.cancel() self.check_status_timer.cancel() self.self_update_timer.cancel() cherrypy.engine.exit() def check_features(self): self.can_send_image = self.coolq_api_query('can_send_image')['yes'] self.can_send_voice = self.coolq_api_query('can_send_record')['yes']