PagerMaid_Plugins/groupword.py

107 lines
4.5 KiB
Python
Raw Permalink Normal View History

from wordcloud import WordCloud
from io import BytesIO
from os.path import exists
from os import makedirs, sep, remove
from PIL import Image
from collections import defaultdict
from requests import get
2022-01-18 08:47:20 +00:00
from pagermaid import version
from pagermaid.utils import execute, alias_command, pip_install
from pagermaid.listener import listener
pip_install("jieba")
import jieba
punctuation = {33: ' ', 34: ' ', 35: ' ', 36: ' ', 37: ' ', 38: ' ', 39: ' ', 40: ' ', 41: ' ', 42: ' ', 43: ' ',
44: ' ', 45: ' ', 46: ' ', 47: ' ', 58: ' ', 59: ' ', 60: ' ', 61: ' ', 62: ' ', 63: ' ', 64: ' ',
91: ' ', 92: ' ', 93: ' ', 94: ' ', 95: ' ', 96: ' ', 123: ' ', 124: ' ', 125: ' ', 126: ' ',
65311: ' ', 65292: ' ', 65281: ' ', 12304: ' ', 12305: ' ', 65288: ' ', 65289: ' ', 12289: ' ',
12290: ' ', 65306: ' ', 65307: ' ', 8217: ' ', 8216: ' ', 8230: ' ', 65509: ' ', 183: ' '}
@listener(is_plugin=True, outgoing=True, command=alias_command("groupword"),
description="拉取最新 500 条消息生成词云,回复图片可自定义背景图。(图片白色区域为不显示部分)",
parameters="[任意内容启用AI分词]")
async def group_word(context):
if not exists(f"plugins{sep}groupword"):
makedirs(f"plugins{sep}groupword")
# 自定义背景图
reply = await context.get_reply_message()
if reply and reply.photo:
if exists(f"plugins{sep}groupword{sep}circle.jpg"):
remove(f"plugins{sep}groupword{sep}circle.jpg")
await context.client.download_media(reply, file=f"plugins{sep}groupword{sep}circle.jpg")
return await context.edit("自定义背景图设置完成。")
imported_1 = False
if len(context.parameter) >= 1:
pip_install("paddlepaddle-tiny", alias="paddle")
imported_1 = True
try:
await context.edit('正在生成中。。。')
except:
return
if not exists(f"plugins{sep}groupword{sep}wqy-microhei.ttc"):
await context.edit('正在拉取中文字体文件。。。(等待时间请评估你的服务器)')
r = get('https://cdn.jsdelivr.net/gh/anthonyfok/fonts-wqy-microhei/wqy-microhei.ttc')
with open(f"plugins{sep}groupword{sep}wqy-microhei.ttc", "wb") as code:
code.write(r.content)
words = defaultdict(int)
count = 0
try:
if imported_1:
try:
jieba.enable_paddle()
except:
imported_1 = False
async for msg in context.client.iter_messages(context.chat, limit=500):
if msg.id == context.id:
continue
if msg.text and not msg.text.startswith('/') and not msg.text.startswith('-') and not '//' in msg.text:
try:
if imported_1:
for word in jieba.cut(msg.text.translate(punctuation), use_paddle=True):
word = word.lower()
words[word] += 1
else:
for word in jieba.cut(msg.text.translate(punctuation)):
word = word.lower()
words[word] += 1
count += 1
except:
pass
except:
if count == 0:
try:
await context.edit('您已被 TG 官方限制。')
return
except:
return
try:
if not exists(f"plugins{sep}groupword{sep}circle.jpg"):
image = WordCloud(font_path=f"plugins{sep}groupword{sep}wqy-microhei.ttc", width=800,
height=400).generate_from_frequencies(
words).to_image()
else:
import numpy as np
background = Image.open(f"plugins{sep}groupword{sep}circle.jpg")
graph = np.array(background) # noqa
image = WordCloud(font_path=f"plugins{sep}groupword{sep}wqy-microhei.ttc", width=800,
height=800,
mask=graph,
scale=5,
background_color='white').generate_from_frequencies(
words).to_image()
stream = BytesIO()
image.save(stream, 'PNG')
except:
await context.edit('词云生成失败。')
return
try:
await context.client.send_message(context.chat, f'对最近的 {count} 条消息进行了分析。', file=stream.getvalue())
await context.delete()
except:
return