Initial Commit

This commit is contained in:
milkice233 2021-02-15 23:32:47 +08:00
commit da15b9bae4
9 changed files with 2359 additions and 0 deletions

0
README.rst Normal file
View File

View File

@ -0,0 +1,148 @@
# coding: utf-8
import contextlib
import logging
from ehforwarderbot import Chat
from ehforwarderbot.chat import GroupChat, PrivateChat, SystemChat
from ehforwarderbot.types import ChatID
from efb_qq_slave import QQMessengerChannel
class ChatManager:
def __init__(self, channel: 'QQMessengerChannel'):
self.channel: 'QQMessengerChannel' = channel
self.logger: logging.Logger = logging.getLogger(__name__)
self.MISSING_GROUP: GroupChat = GroupChat(
channel=self.channel,
uid=ChatID("__error_group__"),
name="Group Missing"
)
self.MISSING_CHAT: PrivateChat = PrivateChat(
channel=self.channel,
uid=ChatID("__error_chat__"),
name="Chat Missing"
)
"""
def build_efb_chat_as_user(self, uid, is_chat, in_group=None, is_discuss=False):
efb_chat = EFBChat(self.channel)
efb_chat.chat_uid = 'user' + str(uid)
i: dict = self.channel.QQClient.get_stranger_info(uid)
efb_chat.chat_name = i['nickname']
efb_chat.chat_alias = None
efb_chat.chat_type = ChatType.User
efb_chat.is_chat = is_chat
efb_chat.vendor_specific = {'is_anonymous': False}
if in_group is not None:
efb_chat.group = self.build_efb_chat_as_group(in_group, is_discuss)
return efb_chat
def build_efb_chat_as_group(self, uid, discuss=False):
efb_chat = EFBChat(self.channel)
if not discuss:
efb_chat.chat_uid = 'group' + str(uid)
i = self.channel.QQClient.get_group_info(uid)
efb_chat.chat_name = i['group_name']
efb_chat.chat_type = ChatType.Group
efb_chat.vendor_specific = {'is_discuss': False}
# todo Add user to efb_chat.member
else:
efb_chat.chat_uid = 'discuss' + str(uid)
efb_chat.chat_name = 'Discuss Group' # todo Find a way to distinguish from different discuss group
efb_chat.chat_type = ChatType.Group
efb_chat.vendor_specific = {'is_discuss': True}
return efb_chat
def build_efb_chat_as_anonymous_user(self, nickname, flag, anonymous_id, group_id, is_discuss):
efb_chat = EFBChat(self.channel)
efb_chat.chat_uid = flag
efb_chat.chat_name = nickname
efb_chat.chat_type = ChatType.User
efb_chat.is_chat = False
efb_chat.vendor_specific = {'is_anonymous': True,
'anonymous_id': anonymous_id}
efb_chat.group = self.build_efb_chat_as_group(group_id, is_discuss)
return efb_chat
"""
def build_efb_chat_as_private(self, context):
uid = context['user_id']
if 'sender' not in context or 'nickname' not in context['sender']:
i: dict = self.channel.QQClient.get_stranger_info(uid)
chat_name = ""
if i:
chat_name = i['nickname']
else:
chat_name = context['sender']['nickname']
efb_chat = PrivateChat(channel=self.channel,
uid='private' + '_' + str(uid),
name=str(chat_name),
alias=None if 'alias' not in context else str(context['alias']))
return efb_chat
def build_or_get_efb_member(self, chat: Chat, context):
member_uid = context['user_id']
with contextlib.suppress(KeyError):
return chat.get_member(str(member_uid))
chat_name = ''
if 'nickname' not in context:
i: dict = self.channel.QQClient.get_stranger_info(member_uid)
chat_name = ""
if i:
chat_name = i['nickname']
else:
chat_name = context['nickname']
return chat.add_member(name=str(chat_name),
alias=None if 'alias' not in context else str(context['alias']),
uid=str(member_uid))
def build_efb_chat_as_group(self, context, update_member=False): # Should be cached
is_discuss = False if context['message_type'] == 'group' else True
chat_uid = context['discuss_id'] if is_discuss else context['group_id']
efb_chat = GroupChat(
channel=self.channel,
uid=str(chat_uid)
)
if not is_discuss:
efb_chat.uid = 'group' + '_' + str(chat_uid)
i = self.channel.QQClient.get_group_info(chat_uid)
if i is not None:
efb_chat.name = str(i['group_name']) if 'group_name' not in context else str(context['group_name'])
else:
efb_chat.name = str(chat_uid)
efb_chat.vendor_specific = {'is_discuss': False}
if update_member:
members = self.channel.QQClient.get_group_member_list(chat_uid, False)
if members:
for member in members:
efb_chat.add_member(name=str(member['card']),
alias=str(member['nickname']),
uid=str(member['user_id']))
else:
efb_chat.uid = 'discuss' + '_' + str(chat_uid)
efb_chat.name = 'Discuss Group' + '_' + str(chat_uid)
# todo Find a way to distinguish from different discuss group
efb_chat.vendor_specific = {'is_discuss': True}
return efb_chat
def build_efb_chat_as_anonymous_user(self, chat: Chat, context):
anonymous_data = context['anonymous']
member_uid = 'anonymous' + '_' + anonymous_data['flag']
with contextlib.suppress(KeyError):
return chat.get_member(member_uid)
chat_name = '[Anonymous] ' + anonymous_data['name']
return chat.add_member(name=str(chat_name),
alias=None if 'alias' not in context else str(context['alias']),
uid=str(member_uid),
vendor_specific={'is_anonymous': True,
'anonymous_id': anonymous_data['id']})
def build_efb_chat_as_system_user(self, context): # System user only!
return SystemChat(channel=self.channel,
name=str(context['event_description']),
uid=ChatID("__{context[uid_prefix]}__".format(context=context)))

1059
efb_qq_plugin_coolq/CoolQ.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,22 @@
class CoolQClientException(Exception):
pass
class CoolQAPIFailureException(CoolQClientException):
pass
class CoolQCookieExpiredException(CoolQAPIFailureException):
pass
class CoolQOfflineException(CoolQClientException):
pass
class CoolQDisconnectedException(CoolQClientException):
pass
class CoolQUnknownException(CoolQClientException):
pass

View File

@ -0,0 +1,251 @@
# coding: utf-8
import base64
import html
import json
import logging
from urllib.parse import quote
import magic
from ehforwarderbot import Message, MsgType, Chat
from ehforwarderbot.message import LocationAttribute, LinkAttribute, Substitutions
from . import CoolQ
from .Utils import cq_get_image, download_voice
class QQMsgProcessor:
inst: CoolQ
logger: logging.Logger = logging.getLogger(__name__)
def __init__(self, instance: CoolQ):
self.inst = instance
self._ = instance._
pass
def qq_image_wrapper(self, data, chat: Chat = None):
efb_msg = Message()
if 'url' not in data:
efb_msg.type = MsgType.Text
efb_msg.text = self._('[Image Source missing]')
return [efb_msg]
efb_msg.file = cq_get_image(data['url'])
if efb_msg.file is None:
efb_msg.type = MsgType.Text
efb_msg.text = self._('[Download image failed, please check on your QQ client]')
return [efb_msg]
efb_msg.type = MsgType.Image
mime = magic.from_file(efb_msg.file.name, mime=True)
if isinstance(mime, bytes):
mime = mime.decode()
efb_msg.filename = data['file'] if 'file' in data else efb_msg.file.name
efb_msg.filename += '.' + str(mime).split('/')[1]
efb_msg.path = efb_msg.file.name
efb_msg.mime = mime
if "gif" in mime:
efb_msg.type = MsgType.Animation
return [efb_msg]
def qq_record_wrapper(self, data, chat: Chat = None): # Experimental!
efb_msg = Message()
try:
transformed_file = self.inst.coolq_api_query("get_record", file=data['file'], out_format='mp3')
efb_msg.type = MsgType.Audio
efb_msg.file = download_voice(transformed_file['file'],
self.inst.client_config['api_root'].rstrip("/"),
self.inst.client_config['access_token'])
mime = magic.from_file(efb_msg.file.name, mime=True)
if isinstance(mime, bytes):
mime = mime.decode()
efb_msg.path = efb_msg.file.name
efb_msg.mime = mime
except Exception:
efb_msg.type = MsgType.Unsupported
efb_msg.text = self._('[Voice Message] Please check it on your QQ')
logging.getLogger(__name__).exception("Failed to download voice")
return [efb_msg]
def qq_share_wrapper(self, data, chat: Chat = None):
efb_msg = Message(
type=MsgType.Link,
text='',
attributes=LinkAttribute(
title='' if 'title' not in data else data['title'],
description='' if 'content' not in data else data['content'],
image='' if 'image' not in data else data['image'],
url=data['url']
)
)
return [efb_msg]
def qq_location_wrapper(self, data, chat: Chat = None):
efb_msg = Message(
text=data['content'],
type=MsgType.Location,
attributes=LocationAttribute(longitude=float(data['lon']),
latitude=float(data['lat']))
)
return [efb_msg]
def qq_shake_wrapper(self, data, chat: Chat = None):
efb_msg = Message(
type=MsgType.Text,
text=self._('[Your friend shakes you!]')
)
return [efb_msg]
def qq_contact_wrapper(self, data, chat: Chat = None):
uid = data['id']
contact_type = data['type']
efb_msg = Message(
type=MsgType.Text,
text=self._("Chat Recommendation Received\nID: {}\nType: {}").format(uid, contact_type)
)
return [efb_msg]
def qq_bface_wrapper(self, data, chat: Chat = None):
efb_msg = Message(
type=MsgType.Unsupported,
text=self._('[Here comes the BigFace Emoji, please check it on your phone]')
)
return [efb_msg]
def qq_small_face_wrapper(self, data, chat: Chat = None):
# todo this function's maybe not necessary?
pass
def qq_sign_wrapper(self, data, chat: Chat = None):
location = self._('at {}').format(data['location']) if 'location' in data else self._('at Unknown Place')
title = '' if 'title' not in data else (self._('with title {}').format(data['title']))
efb_msg = Message(
type=MsgType.Text,
text=self._('signed in {location} {title}').format(title=title,
location=location)
)
return [efb_msg]
def qq_rich_wrapper(self, data: dict, chat: Chat = None): # Buggy, Help needed
efb_messages = list()
efb_msg = Message(
type=MsgType.Unsupported,
text=self._('[Here comes the Rich Text, dumping...] \n')
)
for key, value in data.items():
efb_msg.text += key + ': ' + value + '\n'
efb_messages.append(efb_msg)
# Optimizations for rich messages
# Group Broadcast
_ = self.qq_group_broadcast_wrapper(data, chat)
if _ is not None:
efb_messages.append(_)
return efb_messages
def qq_music_wrapper(self, data, chat: Chat = None):
efb_msg = Message()
if data['type'] == '163': # Netease Cloud Music
efb_msg.type = MsgType.Text
efb_msg.text = 'https://music.163.com/#/song?id=' + data['id']
else:
efb_msg.type = MsgType.Text
efb_msg.text = data['text']
return [efb_msg] # todo Port for other music platform
def qq_text_simple_wrapper(self, text: str, ats: dict): # This cute function only accepts string!
efb_msg = Message()
efb_msg.type = MsgType.Text
efb_msg.text = text
if ats: # This is used to replace specific text with @blahblah
# And Milkice really requires a brain check
efb_msg.substitutions = Substitutions(ats)
return efb_msg
def coolq_code_at_wrapper(self, uid):
return '[CQ:at,qq={}]'.format(uid)
def coolq_code_image_wrapper(self, file, file_path):
if file.closed:
file = open(file.name)
encoded_string = base64.b64encode(file.read())
# Since base64 doesn't contain characters which isn't allowed in CQ Code,
# there's no need to escape the special characters
return '[CQ:image,file=base64://{}]'.format(encoded_string.decode())
def coolq_voice_image_wrapper(self, file, file_path):
if file.closed:
file = open(file.name)
encoded_string = base64.b64encode(file.read())
# Since base64 doesn't contain characters which isn't allowed in CQ Code,
# there's no need to escape the special characters
return '[CQ:record,file=base64://{}]'.format(encoded_string.decode())
def qq_file_after_wrapper(self, data):
efb_msg = Message()
efb_msg.file = data['file']
efb_msg.type = MsgType.File
mime = magic.from_file(efb_msg.file.name, mime=True)
if isinstance(mime, bytes):
mime = mime.decode()
efb_msg.path = efb_msg.file.name
efb_msg.mime = mime
efb_msg.filename = quote(data['filename'])
return efb_msg
def qq_group_broadcast_wrapper(self, data, chat: Chat = None):
try:
at_list = {}
content_data = json.loads(data['content'])
text_data = base64.b64decode(content_data['mannounce']['text']).decode("UTF-8")
title_data = base64.b64decode(content_data['mannounce']['title']).decode("UTF-8")
text = "[群公告] 【{title}\n{text}".format(title=title_data, text=text_data)
substitution_begin = len(text) + 1
substitution_end = len(text) + len('@all') + 2
text += ' @all '
at_list[(substitution_begin, substitution_end)] = chat.self
if 'pic' in content_data['mannounce']: # Picture Attached
# Assuming there's only one picture
data['url'] = "http://gdynamic.qpic.cn/gdynamic/{}/628".format(
content_data['mannounce']['pic'][0]['url'])
efb_message = self.qq_image_wrapper(data)[0]
efb_message.text = text
efb_message.substitutions = Substitutions(at_list)
return [efb_message]
else:
return self.qq_text_simple_wrapper(text, at_list)
except Exception:
return self.qq_group_broadcast_alternative_wrapper(data)
def qq_group_broadcast_alternative_wrapper(self, data, chat: Chat = None):
try:
at_list = {}
content_data = json.loads(data['content'])
group_id = content_data['mannounce']['gc']
notice_raw_data = self.inst.coolq_api_query("_get_group_notice",
group_id=group_id)
notice_data = json.loads(notice_raw_data)
title_data = html.unescape(notice_data[0]['msg']['title'])
text_data = html.unescape(notice_data[0]['msg']['text'])
text = "[群公告] 【{title}\n{text}".format(title=title_data, text=text_data)
substitution_begin = len(text) + 1
substitution_end = len(text) + len('@all') + 2
text += ' @all '
at_list[(substitution_begin, substitution_end)] = chat.self
if 'pics' in html.unescape(notice_data[0]['msg']): # Picture Attached
# Assuming there's only one picture
data['url'] = "http://gdynamic.qpic.cn/gdynamic/{}/628".format(notice_data[0]['msg']['pics'][0]['id'])
efb_message = self.qq_image_wrapper(data)[0]
efb_message.text = text
efb_message.substitutions = Substitutions(at_list)
return [efb_message]
else:
return self.qq_text_simple_wrapper(text, at_list)
except Exception:
return None

View File

@ -0,0 +1,830 @@
# coding: utf-8
import re
import json
import logging
import ntpath
import tempfile
import urllib.request
from gettext import translation
from typing import *
from urllib.error import URLError, HTTPError, ContentTooShortError
from urllib.parse import quote
import requests
from ehforwarderbot import Message, coordinator
from pkg_resources import resource_filename
from .Exceptions import CoolQUnknownException
qq_emoji_list = { # created by JogleLew and jqqqqqqqqqq, optimized based on Tim's emoji support
0: '😮',
1: '😣',
2: '😍',
3: '😳',
4: '😎',
5: '😭',
6: '☺️',
7: '😷',
8: '😴',
9: '😭',
10: '😰',
11: '😡',
12: '😝',
13: '😃',
14: '🙂',
15: '🙁',
16: '🤓',
17: '[Empty]',
18: '😤',
19: '😨',
20: '😏',
21: '😊',
22: '🙄',
23: '😕',
24: '🤤',
25: '😪',
26: '😨',
27: '😓',
28: '😬',
29: '🤑',
30: '',
31: '😤',
32: '🤔',
33: '🤐',
34: '😵',
35: '😩',
36: '💣',
37: '💀',
38: '🔨',
39: '👋',
40: '[Empty]',
41: '😮',
42: '💑',
43: '🕺',
44: '[Empty]',
45: '[Empty]',
46: '🐷',
47: '[Empty]',
48: '[Empty]',
49: '🤷',
50: '[Empty]',
51: '[Empty]',
52: '[Empty]',
53: '🎂',
54: '',
55: '💣',
56: '🔪',
57: '⚽️',
58: '[Empty]',
59: '💩',
60: '☕️',
61: '🍚',
62: '[Empty]',
63: '🌹',
64: '🥀',
65: '[Empty]',
66: '❤️',
67: '💔️',
68: '[Empty]',
69: '🎁',
70: '[Empty]',
71: '[Empty]',
72: '[Empty]',
73: '[Empty]',
74: '🌞️',
75: '🌃',
76: '👍',
77: '👎',
78: '🤝',
79: '✌️',
80: '[Empty]',
81: '[Empty]',
82: '[Empty]',
83: '[Empty]',
84: '[Empty]',
85: '🥰',
86: '[怄火]',
87: '[Empty]',
88: '[Empty]',
89: '🍉',
90: '[Empty]',
91: '[Empty]',
92: '[Empty]',
93: '[Empty]',
94: '[Empty]',
95: '[Empty]',
96: '😅',
97: '[擦汗]',
98: '[抠鼻]',
99: '👏',
100: '[糗大了]',
101: '😏',
102: '😏',
103: '😏',
104: '🥱',
105: '[鄙视]',
106: '😭',
107: '😭',
108: '[阴险]',
109: '😚',
110: '🙀',
111: '[可怜]',
112: '🔪',
113: '🍺',
114: '🏀',
115: '🏓',
116: '❤️',
117: '🐞',
118: '[抱拳]',
119: '[勾引]',
120: '',
121: '[差劲]',
122: '🤟',
123: '🚫',
124: '👌',
125: '[转圈]',
126: '[磕头]',
127: '[回头]',
128: '[跳绳]',
129: '👋',
130: '[激动]',
131: '[街舞]',
132: '😘',
133: '[左太极]',
134: '[右太极]',
135: '[Empty]',
136: '[双喜]',
137: '🧨',
138: '🏮',
139: '💰',
140: '[K歌]',
141: '🛍️',
142: '📧',
143: '[帅]',
144: '👏',
145: '🙏',
146: '[爆筋]',
147: '🍭',
148: '🍼',
149: '[下面]',
150: '🍌',
151: '🛩',
152: '🚗',
153: '🚅',
154: '[车厢]',
155: '[高铁右车头]',
156: '🌥',
157: '下雨',
158: '💵',
159: '🐼',
160: '💡',
161: '[风车]',
162: '',
163: '🌂',
164: '[彩球]',
165: '💍',
166: '🛋',
167: '[纸巾]',
168: '💊',
169: '🔫',
170: '🐸',
171: '🍵',
172: '[眨眼睛]',
173: '😭',
174: '[无奈]',
175: '[卖萌]',
176: '[小纠结]',
177: '[喷血]',
178: '[斜眼笑]',
179: '[doge]',
180: '[惊喜]',
181: '[骚扰]',
182: '😹',
183: '[我最美]',
184: '🦀',
185: '[羊驼]',
186: '[Empty]',
187: '👻',
188: '🥚',
189: '[Empty]',
190: '🌼',
191: '[Empty]',
192: '🧧',
193: '😄',
194: '😞',
195: '[Empty]',
196: '[Empty]',
197: '[冷漠]',
198: '[呃]',
199: '👍',
200: '👋',
201: '👍',
202: '[无聊]',
203: '[托脸]',
204: '[吃]',
205: '💐',
206: '😨',
207: '[花痴]',
208: '[小样儿]',
209: '[Empty]',
210: '😭',
211: '[我不看]',
212: '[托腮]',
213: '[Empty]',
214: '😙',
215: '[糊脸]',
216: '[拍头]',
217: '[扯一扯]',
218: '[舔一舔]',
219: '[蹭一蹭]',
220: '[拽炸天]',
221: '[顶呱呱]',
222: '🤗',
223: '[暴击]',
224: '🔫',
225: '[撩一撩]',
226: '[拍桌]',
227: '👏',
228: '[恭喜]',
229: '🍻',
230: '[嘲讽]',
231: '[哼]',
232: '[佛系]',
233: '[掐一掐]',
234: '😮',
235: '[颤抖]',
236: '[啃头]',
237: '[偷看]',
238: '[扇脸]',
239: '[原谅]',
240: '[喷脸]',
241: '🎂',
242: '[Empty]',
243: '[Empty]',
244: '[Empty]',
245: '[Empty]',
246: '[Empty]',
247: '[Empty]',
248: '[Empty]',
249: '[Empty]',
250: '[Empty]',
251: '[Empty]',
252: '[Empty]',
253: '[Empty]',
254: '[Empty]',
255: '[Empty]',
}
# original text copied from Tim
qq_emoji_text_list = {
0: '[惊讶]',
1: '[撇嘴]',
2: '[色]',
3: '[发呆]',
4: '[得意]',
5: '[流泪]',
6: '[害羞]',
7: '[闭嘴]',
8: '[睡]',
9: '[大哭]',
10: '[尴尬]',
11: '[发怒]',
12: '[调皮]',
13: '[呲牙]',
14: '[微笑]',
15: '[难过]',
16: '[酷]',
17: '[Empty]',
18: '[抓狂]',
19: '[吐]',
20: '[偷笑]',
21: '[可爱]',
22: '[白眼]',
23: '[傲慢]',
24: '[饥饿]',
25: '[困]',
26: '[惊恐]',
27: '[流汗]',
28: '[憨笑]',
29: '[悠闲]',
30: '[奋斗]',
31: '[咒骂]',
32: '[疑问]',
33: '[嘘]',
34: '[晕]',
35: '[折磨]',
36: '[衰]',
37: '[骷髅]',
38: '[敲打]',
39: '[再见]',
40: '[Empty]',
41: '[发抖]',
42: '[爱情]',
43: '[跳跳]',
44: '[Empty]',
45: '[Empty]',
46: '[猪头]',
47: '[Empty]',
48: '[Empty]',
49: '[拥抱]',
50: '[Empty]',
51: '[Empty]',
52: '[Empty]',
53: '[蛋糕]',
54: '[闪电]',
55: '[炸弹]',
56: '[刀]',
57: '[足球]',
58: '[Empty]',
59: '[便便]',
60: '[咖啡]',
61: '[饭]',
62: '[Empty]',
63: '[玫瑰]',
64: '[凋谢]',
65: '[Empty]',
66: '[爱心]',
67: '[心碎]',
68: '[Empty]',
69: '[礼物]',
70: '[Empty]',
71: '[Empty]',
72: '[Empty]',
73: '[Empty]',
74: '[太阳]',
75: '[月亮]',
76: '[赞]',
77: '[踩]',
78: '[握手]',
79: '[胜利]',
80: '[Empty]',
81: '[Empty]',
82: '[Empty]',
83: '[Empty]',
84: '[Empty]',
85: '[飞吻]',
86: '[怄火]',
87: '[Empty]',
88: '[Empty]',
89: '[西瓜]',
90: '[Empty]',
91: '[Empty]',
92: '[Empty]',
93: '[Empty]',
94: '[Empty]',
95: '[Empty]',
96: '[冷汗]',
97: '[擦汗]',
98: '[抠鼻]',
99: '[鼓掌]',
100: '[糗大了]',
101: '[坏笑]',
102: '[左哼哼]',
103: '[右哼哼]',
104: '[哈欠]',
105: '[鄙视]',
106: '[委屈]',
107: '[快哭了]',
108: '[阴险]',
109: '[亲亲]',
110: '[吓]',
111: '[可怜]',
112: '[菜刀]',
113: '[啤酒]',
114: '[篮球]',
115: '[乒乓]',
116: '[示爱]',
117: '[瓢虫]',
118: '[抱拳]',
119: '[勾引]',
120: '[拳头]',
121: '[差劲]',
122: '[爱你]',
123: '[NO]',
124: '[OK]',
125: '[转圈]',
126: '[磕头]',
127: '[回头]',
128: '[跳绳]',
129: '[挥手]',
130: '[激动]',
131: '[街舞]',
132: '[献吻]',
133: '[左太极]',
134: '[右太极]',
135: '[Empty]',
136: '[双喜]',
137: '[鞭炮]',
138: '[灯笼]',
139: '[发财]',
140: '[K歌]',
141: '[购物]',
142: '[邮件]',
143: '[帅]',
144: '[喝彩]',
145: '[祈祷]',
146: '[爆筋]',
147: '[棒棒糖]',
148: '[喝奶]',
149: '[下面]',
150: '[香蕉]',
151: '[飞机]',
152: '[开车]',
153: '[高铁左车头]',
154: '[车厢]',
155: '[高铁右车头]',
156: '[多云]',
157: '[下雨]',
158: '[钞票]',
159: '[熊猫]',
160: '[灯泡]',
161: '[风车]',
162: '[闹钟]',
163: '[打伞]',
164: '[彩球]',
165: '[钻戒]',
166: '[沙发]',
167: '[纸巾]',
168: '[药]',
169: '[手枪]',
170: '[青蛙]',
171: '[茶]',
172: '[眨眼睛]',
173: '[泪奔]',
174: '[无奈]',
175: '[卖萌]',
176: '[小纠结]',
177: '[喷血]',
178: '[斜眼笑]',
179: '[doge]',
180: '[惊喜]',
181: '[骚扰]',
182: '[笑哭]',
183: '[我最美]',
184: '[河蟹]',
185: '[羊驼]',
186: '[Empty]',
187: '[幽灵]',
188: '[蛋]',
189: '[Empty]',
190: '[菊花]',
191: '[Empty]',
192: '[红包]',
193: '[大笑]',
194: '[不开心]',
195: '[Empty]',
196: '[Empty]',
197: '[冷漠]',
198: '[呃]',
199: '[好棒]',
200: '[拜托]',
201: '[点赞]',
202: '[无聊]',
203: '[托脸]',
204: '[吃]',
205: '[送花]',
206: '[害怕]',
207: '[花痴]',
208: '[小样儿]',
209: '[Empty]',
210: '[飙泪]',
211: '[我不看]',
212: '[托腮]',
213: '[Empty]',
214: '[啵啵]',
215: '[糊脸]',
216: '[拍头]',
217: '[扯一扯]',
218: '[舔一舔]',
219: '[蹭一蹭]',
220: '[拽炸天]',
221: '[顶呱呱]',
222: '[抱抱]',
223: '[暴击]',
224: '[开枪]',
225: '[撩一撩]',
226: '[拍桌]',
227: '[拍手]',
228: '[恭喜]',
229: '[干杯]',
230: '[嘲讽]',
231: '[哼]',
232: '[佛系]',
233: '[掐一掐]',
234: '[惊呆]',
235: '[颤抖]',
236: '[啃头]',
237: '[偷看]',
238: '[扇脸]',
239: '[原谅]',
240: '[喷脸]',
241: '[生日快乐]',
242: '[Empty]',
243: '[Empty]',
244: '[Empty]',
245: '[Empty]',
246: '[Empty]',
247: '[Empty]',
248: '[Empty]',
249: '[Empty]',
250: '[Empty]',
251: '[Empty]',
252: '[Empty]',
253: '[Empty]',
254: '[Empty]',
255: '[Empty]',
}
qq_sface_list = {
1: '[拜拜]',
2: '[鄙视]',
3: '[菜刀]',
4: '[沧桑]',
5: '[馋了]',
6: '[吃惊]',
7: '[微笑]',
8: '[得意]',
9: '[嘚瑟]',
10: '[瞪眼]',
11: '[震惊]',
12: '[鼓掌]',
13: '[害羞]',
14: '[好的]',
15: '[惊呆了]',
16: '[静静看]',
17: '[可爱]',
18: '[困]',
19: '[脸红]',
20: '[你懂的]',
21: '[期待]',
22: '[亲亲]',
23: '[伤心]',
24: '[生气]',
25: '[摇摆]',
26: '[帅]',
27: '[思考]',
28: '[震惊哭]',
29: '[痛心]',
30: '[偷笑]',
31: '[挖鼻孔]',
32: '[抓狂]',
33: '[笑着哭]',
34: '[无语]',
35: '[捂脸]',
36: '[喜欢]',
37: '[笑哭]',
38: '[疑惑]',
39: '[赞]',
40: '[眨眼]'
}
translator = translation("efb_qq_slave",
resource_filename('efb_qq_slave', 'Clients/CoolQ/locale'),
fallback=True)
_ = translator.gettext
ngettext = translator.ngettext
def cq_get_image(image_link: str) -> tempfile: # Download image from QQ
file = tempfile.NamedTemporaryFile()
try:
urllib.request.urlretrieve(image_link, file.name)
except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning('Image download failed.')
logging.getLogger(__name__).warning(str(e))
return None
else:
if file.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
file.seek(0)
return file
def async_send_messages_to_master(msg: Message):
coordinator.send_message(msg)
if msg.file:
msg.file.close()
def process_quote_text(text, max_length): # Simple wrapper for processing quoted text
qt_txt = "%s" % text
if max_length > 0:
tgt_text = qt_txt[:max_length]
if len(qt_txt) >= max_length:
tgt_text += ""
tgt_text = "%s" % tgt_text
elif max_length < 0:
tgt_text = "%s" % qt_txt
else:
tgt_text = ""
return tgt_text
def coolq_text_encode(text: str): # Escape special characters for CQ Code text
expr = (('&', '&amp;'), ('[', '&#91;'), (']', '&#93;'))
for r in expr:
text = text.replace(*r)
return text
def coolq_para_encode(text: str): # Escape special characters for CQ Code parameters
expr = (('&', '&amp;'), ('[', '&#91;'), (']', '&#93;'), (',', '&#44;'))
for r in expr:
text = text.replace(*r)
return text
def upload_image_smms(file, path, email, password): # Upload image to sm.ms and return the link
UPLOAD_URL_TOKEN = 'https://sm.ms/api/v2/token'
UPLOAD_URL_IMAGE = 'https://sm.ms/api/v2/upload'
UPLOAD_LOGIN = {'username': email,
'password': password}
UPLOAD_PARAMS = {'format': 'json', 'ssl': True}
resp = requests.post(UPLOAD_URL_TOKEN, params=UPLOAD_LOGIN)
status = json.loads(resp.text)
if status['code'] == 'success':
token = status['data']['token']
UPLOAD_HEADER = {'Authorization': token}
else:
logging.getLogger(__name__).warning(
'WARNING: {}'.format(status['msg']))
raise CoolQUnknownException(status['msg'])
with open(path, 'rb') as f:
files = {'smfile': f.read()}
resp = requests.post(UPLOAD_URL_IMAGE, files=files, headers=UPLOAD_HEADER,
params=UPLOAD_PARAMS)
status = json.loads(resp.text)
if status['code'] == 'success':
logging.getLogger(__name__).debug('INFO: upload success! url at {}'.format(status['data']['url']))
return status['data']
else:
logging.getLogger(__name__).warning('WARNING: {}'.format(status['msg']))
raise CoolQUnknownException(status['msg'])
def upload_image_vim_cn(file, path): # Upload image to img.vim-cn.com and return the link
UPLOAD_URL = 'https://img.vim-cn.com/'
with open(path, 'rb') as f:
files = {'name': f.read()}
resp = requests.post(UPLOAD_URL, files=files)
if resp.status_code != 200:
raise CoolQUnknownException("Failed to upload images to vim-cn.com")
return resp.text
def upload_image_sogou(file, path): # Upload image to pic.sogou.com and return the link
UPLOAD_URL = 'https://pic.sogou.com/pic/upload_pic.jsp'
with open(path, 'rb') as f:
files = {'pic_path': f.read()}
resp = requests.post(UPLOAD_URL, files=files)
if resp.status_code != 200:
raise CoolQUnknownException("Failed to upload images to sogou.com")
return "https" + resp.text[4:] # Replace http with https
def upload_image_mi(file, path): # Upload image to shopapi.io.mi.com and return the link
UPLOAD_URL = 'https://shopapi.io.mi.com/homemanage/shop/uploadpic'
with open(path, 'rb') as f:
files = {'pic': (ntpath.basename(path), f.read(), "image/jpeg")}
resp = requests.post(UPLOAD_URL, files=files)
if resp.status_code != 200:
raise CoolQUnknownException("Failed to upload images to mi.com")
status = json.loads(resp.text)
print(status)
if status['message'] != "ok":
raise CoolQUnknownException("Failed to upload images to mi.com")
return status['result']
def param_spliter(str_param):
params = str_param.split(";")
param = {}
for _k in params:
key, value = _k.strip().split("=")
param[key] = value
return param
def download_file_from_qzone(cookie: str, csrf_token: str, uin, group_id, file_id, filename, file_size):
cookie_arr = param_spliter(cookie)
url = "http://qun.qzone.qq.com/cgi-bin/group_share_get_downurl?uin=" + str(uin) + "&pa=/104/" + \
str(file_id) + "&groupid=" + str(group_id) + "&bussinessid=0&charset=utf-8&g_tk=" + str(csrf_token) + "&r=888"
ret = requests.get(url, cookies=cookie_arr)
data = json.loads(ret.text.split("(")[1].split(")")[0])['data']
cookie += "; FTN5K=" + str(data['cookie'])
download_url = data['url']
download_url += "/" + quote(filename)
if file_size >= 50*1024*1024: # File size is bigger than 50MiB
return _("File is too big to be downloaded")
file = tempfile.NamedTemporaryFile()
try:
opener = urllib.request.build_opener()
opener.addheaders.append(('Cookie', cookie))
urllib.request.install_opener(opener)
urllib.request.urlretrieve(download_url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e))
return _("Error occurs when downloading files: ") + str(e)
else:
if file.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
file.seek(0)
return file
'''
try:
opener = urllib.request.build_opener()
opener.addheaders.append(('Cookie', cookie))
with opener.open(download_url) as response, tempfile.NamedTemporaryFile() as f:
shutil.copyfileobj(response, f)
if f.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
f.seek(0)
return f
except Exception as e:
logging.getLogger(__name__).warning("Error occurs when downloading files" + str(e))
return url
'''
def download_user_avatar(uid: str):
file = tempfile.NamedTemporaryFile()
url = "https://q1.qlogo.cn/g?b=qq&nk={}&s=0".format(uid)
try:
opener = urllib.request.build_opener()
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e))
return _("Error occurs when downloading files: ") + str(e)
if file.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
file.seek(0)
return file
def download_group_avatar(uid: str):
file = tempfile.NamedTemporaryFile()
url = "https://p.qlogo.cn/gh/{}/{}/".format(uid, uid)
try:
opener = urllib.request.build_opener()
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e))
return _("Error occurs when downloading files: ") + str(e)
if file.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
file.seek(0)
return file
def get_friend_group_via_qq_show(cookie: str, csrf_token: str) -> Dict[str, str]:
# This function won't check before execute, instead all the exceptions will be thrown
cookie_arr = param_spliter(cookie)
url = "https://show.qq.com/cgi-bin/qqshow_user_friendgroup?g_tk={csrf_token}&omode=4" \
.format(csrf_token=csrf_token)
ret = requests.get(url, cookies=cookie_arr)
data = json.loads(ret.text)
friend_group = {}
for i in range(len(data['data']['group'])): # friend group
for j in range(len(data['data']['group'][i]['friend'])):
current_user = str(data['data']['group'][i]['friend'][j]['uin'])
current_group = data['data']['group'][i]['name']
friend_group[current_user] = current_group
return friend_group
def download_voice(filename: str, api_root: str, access_token: str):
file = tempfile.NamedTemporaryFile()
url = '{url}/data/record/{file}'.format(url=api_root, file=filename)
try:
opener = urllib.request.build_opener()
opener.addheaders = [("Authorization", "Bearer {at}".format(at=access_token))]
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, file.name)
except (URLError, HTTPError, ContentTooShortError) as e:
logging.getLogger(__name__).warning("Error occurs when downloading files: " + str(e))
return _("Error occurs when downloading files: ") + str(e)
if file.seek(0, 2) <= 0:
raise EOFError('File downloaded is Empty')
file.seek(0)
return file
def get_stranger_info_via_qzone(uin: str):
pattern = re.compile(r"\((.*)\)")
resp = requests.get("https://users.qzone.qq.com/fcg-bin/cgi_get_portrait.fcg?uins={id}".format(id=uin))
# Assume that this API is always available
data = pattern.findall(resp.text)
if not data:
return ""
try:
data = json.loads(data[0])
ret = {
"uin": uin,
"nickname": data[uin][6],
"avatar_url": data[uin][0]
}
return ret
except:
return ""

View File

@ -0,0 +1 @@
from . import CoolQ

View File

@ -0,0 +1,3 @@
# coding: utf-8
__version__ = '2.0.0a0'

45
setup.py Normal file
View File

@ -0,0 +1,45 @@
import sys
from setuptools import setup, find_packages
if sys.version_info < (3, 6):
raise Exception("Python 3.6 or higher is required. Your version is %s." % sys.version)
__version__ = ""
exec(open('efb_qq_plugin_coolq/__version__.py').read())
long_description = open('README.rst').read()
setup(
name='efb-qq-plugin-coolq',
packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]),
version=__version__,
description='EQS plugin for CoolQ API Compatible Client.',
long_description=long_description,
include_package_data=True,
author='Milkice',
author_email='milkice@milkice.me',
url='https://github.com/milkice233/efb-qq-plugin-coolq',
license='GPLv3',
python_requires='>=3.6',
keywords=['ehforwarderbot', 'EH Forwarder Bot', 'EH Forwarder Bot Slave Channel',
'qq', 'chatbot', 'EQS', 'CoolQ'],
classifiers=[
"Development Status :: 3 - Alpha",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Topic :: Communications :: Chat",
"Topic :: Utilities"
],
install_requires=[
"efb-qq-slave", "ehforwarderbot",
"PyYaml",
'requests', 'python-magic', 'Pillow', 'cqhttp>=1.3.0', 'cherrypy>=18.5.0'
],
entry_points={
'ehforwarderbot.qq.plugin': 'CoolQ = efb_qq_plugin_coolq:CoolQ'
}
)