word_cloud_bot/task.py

338 lines
13 KiB
Python
Raw Permalink Normal View History

2021-05-05 06:50:25 +00:00
import re
2021-05-08 02:18:24 +00:00
import queue
2021-05-05 02:59:30 +00:00
import jieba
import jieba.posseg as pseg
2021-05-05 06:50:25 +00:00
import wordcloud
2021-05-05 02:59:30 +00:00
import imageio
2021-05-05 07:35:04 +00:00
import telegram
import time
import os
2021-05-05 07:50:25 +00:00
import connector
2021-06-20 10:16:06 +00:00
from config import TOKEN, FRONT, CHANNEL
2021-05-05 02:59:30 +00:00
2021-05-05 07:50:25 +00:00
bot = telegram.Bot(token=TOKEN)
2021-05-05 06:50:25 +00:00
2021-05-08 02:18:24 +00:00
task_queue = queue.Queue()
2021-06-21 08:18:25 +00:00
user_task_queue = queue.Queue()
2021-05-07 15:28:05 +00:00
2021-05-05 02:59:30 +00:00
2021-05-08 02:18:24 +00:00
def schedule_task():
2021-05-07 15:28:05 +00:00
try:
r = connector.get_connection()
key_list = r.keys()
group_list = []
for i in key_list:
if "chat_content" in i:
group_list.append(i[:i.find("_")])
# print(group_list)
2021-06-20 10:16:06 +00:00
print("运行定时任务,向任务队列中添加任务,任务数量:{}".format(len(group_list)))
2021-05-07 15:28:05 +00:00
for group in group_list:
try:
2021-06-20 08:56:30 +00:00
# 向任务队列中添加任务
2021-05-08 02:18:24 +00:00
task_queue.put(group)
2021-05-07 15:28:05 +00:00
except Exception as e:
print("群组:{} | 词云数据分析生成失败,请查看报错信息".format(group))
print(e)
continue
except Exception as e:
print("数据库连接失败,请查看报错信息")
print(e)
2021-05-05 06:50:25 +00:00
2021-05-05 02:59:30 +00:00
2021-05-08 02:18:24 +00:00
def do_task():
while True:
group = task_queue.get()
try:
print("---------------------------")
2021-05-09 04:56:38 +00:00
print("群组: {} | 分析处理中... | 剩余任务数量 {}".format(group, task_queue.qsize()))
2021-05-08 02:18:24 +00:00
start_time = float(time.time())
generate(group)
stop_time = float(time.time())
print("当前群组处理耗时:" + str(stop_time - start_time))
print("---------------------------")
2021-06-20 10:16:06 +00:00
ctext = f'#WORDCLOUD \n' \
f'群组 ID`{group}`\n' \
f'执行操作:`生成词云`\n' \
f'结果:`成功`\n' \
2021-06-20 10:21:43 +00:00
f'处理耗时:`{str(stop_time - start_time)[:5]}`'
2021-05-08 02:18:24 +00:00
except Exception as e:
2021-05-09 04:44:53 +00:00
print("群组: {} | 处理失败,可能是机器人已经被移出群组,请检查报错!".format(group))
2021-05-08 02:18:24 +00:00
print(e)
2021-06-20 10:16:06 +00:00
ctext = f'#WORDCLOUD #SCHEDULE \n' \
f'群组 ID`{group}`\n' \
f'执行操作:`生成词云`\n' \
f'结果:`失败`\n'
if not CHANNEL == 0:
bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown")
2021-05-08 02:18:24 +00:00
time.sleep(1)
2021-06-21 08:18:25 +00:00
def do_user_task():
while True:
temp = user_task_queue.get()
group = temp.split('|')[0]
uid = temp.split('|')[1]
try:
print("---------------------------")
print("群组: {} | 用户: {} | 分析处理中... | 剩余任务数量 {}".format(group, uid, task_queue.qsize()))
start_time = float(time.time())
generate_user(group, uid)
stop_time = float(time.time())
print("当前群组处理耗时:" + str(stop_time - start_time))
print("---------------------------")
2021-06-21 08:27:17 +00:00
ctext = f'#WORDCLOUD #id{uid} \n' \
2021-06-21 08:18:25 +00:00
f'群组 ID`{group}`\n' \
f'用户 ID`{uid}`\n' \
f'执行操作:`生成用户词云`\n' \
f'结果:`成功`\n' \
f'处理耗时:`{str(stop_time - start_time)[:5]}`'
except Exception as e:
print("群组: {} | 用户: {} | 处理失败,可能是机器人已经被移出群组,请检查报错!".format(group, uid))
print(e)
2021-06-21 08:27:17 +00:00
ctext = f'#WORDCLOUD #SCHEDULE #id{uid} \n' \
2021-06-21 08:18:25 +00:00
f'群组 ID`{group}`\n' \
f'用户 ID`{uid}`\n' \
f'执行操作:`生成用户词云`\n' \
f'结果:`失败`\n'
if not CHANNEL == 0:
bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown")
time.sleep(1)
2021-05-08 02:18:24 +00:00
def add_task(group):
task_queue.put(group)
2021-06-21 08:18:25 +00:00
def add_user_task(group, uid):
user_task_queue.put(f'{group}|{uid}')
2021-05-08 02:18:24 +00:00
# 核心函数,分词统计
2021-05-07 15:28:05 +00:00
def generate(group):
2021-06-20 08:56:30 +00:00
mk = imageio.imread("circle.png")
2021-05-08 02:18:24 +00:00
# 构建并配置词云对象w注意要加scale参数提高清晰度
w = wordcloud.WordCloud(width=800,
height=800,
background_color='white',
2021-06-20 08:56:30 +00:00
font_path=FRONT,
2021-05-08 02:18:24 +00:00
mask=mk,
scale=5)
2021-05-07 15:28:05 +00:00
r = connector.get_connection()
print("当前处理的群组:" + str(group))
# 生成词云图片
jieba.enable_paddle() # 启动paddle模式。 0.40版之后开始支持,早期版本不支持
2021-05-08 02:18:24 +00:00
chat_content = r.get("{}_chat_content".format(group))
2021-05-08 02:59:27 +00:00
2021-05-08 02:18:24 +00:00
if chat_content is None:
2021-06-20 08:56:30 +00:00
print("数据库中不存在此群组 {} 数据".format(group))
2021-05-08 02:18:24 +00:00
return
2021-05-07 15:28:05 +00:00
word_list = []
words = pseg.cut(chat_content, use_paddle=True) # paddle模式
for word, flag in words:
# print(word + "\t" + flag)
if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]:
# 判断该词是否有效,不为空格
if re.match(r"^\s+?$", word) is None and len(word) > 1:
word_list.append(word)
# print(word_list)
2021-05-05 02:59:30 +00:00
2021-05-07 15:28:05 +00:00
# 获取消息总数
total_message_amount = r.get("{}_total_message_amount".format(group))
2021-05-05 02:59:30 +00:00
2021-05-07 15:28:05 +00:00
# print("总发言数: " + total_message_amount)
2021-05-05 02:59:30 +00:00
2021-05-07 15:28:05 +00:00
# 获取发言用户数
user_amount = len(r.hkeys("{}_user_message_amount".format(group)))
# 获取所有用户发言数字典
user_message_amount = r.hgetall("{}_user_message_amount".format(group))
2021-05-09 03:26:00 +00:00
user_message_amount = sorted(user_message_amount.items(), key=lambda kv: (int(kv[1])), reverse=True)
2021-05-08 02:59:27 +00:00
2021-06-20 09:33:02 +00:00
# 截至时间
date = time.strftime("%Y年%m月%d", time.localtime()) + '' + time.strftime("%H:%M", time.localtime())
text = f'📅 截至 {date}\n'
2021-06-20 08:56:30 +00:00
# 分析高频词
2021-05-08 02:59:27 +00:00
if len(word_list) > 0:
word_amount = {}
# print(word_amount)
for word in word_list:
if re.search(
r"[。||、|||,|.|!|?|\\|/|+|\-|`|~|·|@|#|¥|$|%|^|&|*|(|)|;||||“|”|'|_|=|•|·|…|\"]",
2021-05-08 02:59:27 +00:00
word) is not None:
continue
# 判断该词是否之前已经出现
if word_amount.get(word) is not None:
word_amount[word] = word_amount.get(word) + 1
else:
word_amount[word] = 1
# print(word_amount)
2021-05-09 03:26:00 +00:00
word_amount = sorted(word_amount.items(), key=lambda kv: (int(kv[1])), reverse=True)
2021-05-08 02:59:27 +00:00
if len(word_amount) > 0:
# print("排序后的热词:" + str(word_amount))
hot_word_string = ""
# 默认展示前5位少于5个则全部展示
for i in range(min(5, len(word_amount))):
2021-06-20 09:33:02 +00:00
hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + "" + str(
2021-05-08 02:59:27 +00:00
word_amount[i][1]) + "\n"
# print(hot_word_string)
2021-06-21 08:18:25 +00:00
text += f"🗣️ 本群 {user_amount} 位成员共产生 {total_message_amount} 条纯文本消息\n" \
2021-06-20 09:33:02 +00:00
f"🤹‍ 大家今天讨论最多的是:\n\n{hot_word_string}\n"
2021-06-20 08:56:30 +00:00
else:
2021-06-20 09:33:02 +00:00
text += '无法分析出当前群组的热词列表,可能是数据量过小,嗨起来吧~\n'
2021-05-08 02:59:27 +00:00
else:
2021-06-20 09:33:02 +00:00
text += '无法分析出当前群组的热词列表,可能是数据量过小,嗨起来吧~\n'
2021-05-08 02:59:27 +00:00
2021-06-20 08:56:30 +00:00
# 分析活跃用户
2021-05-08 02:59:27 +00:00
if len(user_message_amount) > 0:
# print("排序后的用户:" + str(user_message_amount))
top_5_user = ""
# 默认展示前5位少于5个则全部展示
for i in range(min(5, len(user_message_amount))):
dis_name = str(user_message_amount[i][0])
2021-06-20 09:35:29 +00:00
top_5_user += "\t\t\t\t\t\t\t\t" + "🎖 `" + dis_name[:min(10, len(dis_name))] + "`" + " 贡献:" + str(
2021-05-08 02:59:27 +00:00
user_message_amount[i][1]) + "\n"
# print(top_5_user)
2021-06-20 09:33:02 +00:00
text += f"🏵 今日活跃用户排行榜:\n\n{top_5_user}"
2021-05-08 02:59:27 +00:00
else:
2021-06-20 08:56:30 +00:00
text = '无法分析出当前群组的活跃用户列表,可能是数据量过小,嗨起来吧~'
2021-05-08 02:59:27 +00:00
2021-06-20 08:56:30 +00:00
# 开始创建词云
img_path = 'images/default.png'
2021-05-08 02:59:27 +00:00
try:
2021-05-09 04:44:53 +00:00
string = " ".join(word_list)
# 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)
2021-06-20 08:56:30 +00:00
# 将词云图片导出到 images 文件夹
w.to_file('images/{}_chat_word_cloud.png'.format(group))
img_path = 'images/{}_chat_word_cloud.png'.format(group)
except Exception as e:
print(e)
print("词云图片生成失败")
# 发送结果
try:
2021-05-08 02:59:27 +00:00
bot.send_photo(
chat_id=group,
2021-06-20 08:56:30 +00:00
photo=open(img_path, "rb"),
caption=text,
2021-06-20 10:16:06 +00:00
parse_mode='Markdown',
2021-06-20 08:56:30 +00:00
disable_notification=True
2021-05-08 02:59:27 +00:00
)
except Exception as e:
print(e)
r.delete('{}_chat_content'.format(group))
2021-06-20 08:56:30 +00:00
print("发送结果失败")
# 删除图片
try:
os.remove("images/{}_chat_word_cloud.png".format(group))
except Exception as e:
print(e)
print("删除图片失败")
2021-05-05 07:35:04 +00:00
2021-05-05 10:17:21 +00:00
2021-06-21 08:18:25 +00:00
# 核心函数,用户分词统计
def generate_user(group, uid):
mk = imageio.imread("circle.png")
# 构建并配置词云对象w注意要加scale参数提高清晰度
w = wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path=FRONT,
mask=mk,
scale=5)
r = connector.get_connection()
print(f"当前处理的群组:{group} | {uid}")
# 生成词云图片
jieba.enable_paddle() # 启动paddle模式。 0.40版之后开始支持,早期版本不支持
chat_content = r.get("{}_{}_user_content".format(group, uid))
if chat_content is None:
print("数据库中不存在此用户 {} | {} 数据".format(group, uid))
return
word_list = []
words = pseg.cut(chat_content, use_paddle=True) # paddle模式
for word, flag in words:
# print(word + "\t" + flag)
if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]:
# 判断该词是否有效,不为空格
if re.match(r"^\s+?$", word) is None:
word_list.append(word)
# print(word_list)
# 获取消息总数
total_message_amount = r.get("{}_{}_user_message_amount".format(group, uid))
# 截至时间
date = time.strftime("%Y年%m月%d", time.localtime()) + '' + time.strftime("%H:%M", time.localtime())
text = f'📅 截至 {date}\n'
# 分析高频词
if len(word_list) > 0:
word_amount = {}
# print(word_amount)
for word in word_list:
if re.search(
r"[。||、|||,|.|!|?|\\|/|+|\-|`|~|·|@|#|¥|$|%|^|&|*|(|)|;||||“|”|'|_|=|•|·|…|\"]",
word) is not None:
continue
# 判断该词是否之前已经出现
if word_amount.get(word) is not None:
word_amount[word] = word_amount.get(word) + 1
else:
word_amount[word] = 1
# print(word_amount)
word_amount = sorted(word_amount.items(), key=lambda kv: (int(kv[1])), reverse=True)
if len(word_amount) > 0:
# print("排序后的热词:" + str(word_amount))
hot_word_string = ""
# 默认展示前5位少于5个则全部展示
for i in range(min(5, len(word_amount))):
hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + "" + str(
word_amount[i][1]) + "\n"
# print(hot_word_string)
2021-06-21 08:32:44 +00:00
text += f"🗣️ [此成员](tg://user?id={uid})共产生 {total_message_amount} 条纯文本消息\n" \
2021-06-21 08:18:25 +00:00
f"🤹‍ Ta 今天讨论最多的是:\n\n{hot_word_string}\n"
else:
text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n'
else:
text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n'
# 开始创建词云
img_path = 'images/default.png'
try:
string = " ".join(word_list)
# 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)
# 将词云图片导出到 images 文件夹
w.to_file('images/{}_chat_word_cloud.png'.format(group))
img_path = 'images/{}_chat_word_cloud.png'.format(group)
except Exception as e:
print(e)
print("词云图片生成失败")
# 发送结果
try:
bot.send_photo(
chat_id=group,
photo=open(img_path, "rb"),
caption=text,
parse_mode='Markdown',
disable_notification=True
)
except Exception as e:
print(e)
r.delete('{}_{}_user_content'.format(group, uid))
print("发送结果失败")
# 删除图片
try:
os.remove("images/{}_chat_word_cloud.png".format(group))
except Exception as e:
print(e)
print("删除图片失败")
2021-05-05 10:17:21 +00:00
def flush_redis():
r = connector.get_connection()
2021-05-05 08:21:06 +00:00
r.flushall()
print("已清空数据库")