diff --git a/func.py b/func.py index 853d810..d0700b3 100644 --- a/func.py +++ b/func.py @@ -5,7 +5,7 @@ import telegram from telegram.ext import CommandHandler, MessageHandler, Filters from config import TOKEN, LIMIT_COUNT, EXCLUSIVE_MODE, RANK_COMMAND_MODE, OWNER, EXCLUSIVE_LIST, CHANNEL, HELP import schedule -from task import add_task +from task import add_task, add_user_task bot = telegram.Bot(token=TOKEN) @@ -102,6 +102,53 @@ def rank(update, context): print(e) +def stat(update, context): + try: + r = connector.get_connection() + chat_type = update.effective_chat.type + user_id = update.effective_user.id + chat_id = update.effective_message.chat_id + try: + username = update.effective_user.username + except Exception as e: + username = update.effective_user.id + # 限制为群组 + if chat_type != "supergroup": + update.message.reply_text("此命令只有在群组中有效!") + return + if r.exists("{}_{}_frequency_limit".format(chat_id, user_id)): + r.setrange("{}_{}_frequency_limit".format(chat_id, user_id), 0, + int(r.get("{}_{}_frequency_limit".format(chat_id, user_id))) + 1) + else: + struct_time = time.localtime(time.time()) + # 数据过期时间为当前小时的 59 分 + ex_time = datetime.datetime( + struct_time.tm_year, + struct_time.tm_mon, + struct_time.tm_mday, + struct_time.tm_hour, + 59 + ) + r.set("{}_{}_frequency_limit".format(chat_id, user_id), 1) + r.expireat("{}_{}_frequency_limit".format(chat_id, user_id), ex_time) + count = int(r.get("{}_{}_frequency_limit".format(chat_id, user_id))) + if count > LIMIT_COUNT: + update.message.reply_text("您在这个小时内的生成配额已经用完,请稍后再试~") + return + add_user_task(chat_id, user_id) + print("群组: {},用户: {} | {} 发起了主动触发请求".format(chat_id, username, user_id, )) + if not CHANNEL == 0: + ctext = f'#WORDCLOUD #APPLY #id{user_id} \n' \ + f'群组 ID:`{chat_id}`\n' \ + f'用户 ID:`{user_id}`\n' \ + f'执行操作:`主动生成用户词云`' + bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown") + update.message.reply_text("统计数据将在分析完毕后发送到当前群组,请稍等~") + except Exception as e: + print("主动触发任务失败,请检查") + print(e) + + def chat_content_exec(update, context): try: r = connector.get_connection() @@ -133,9 +180,12 @@ def chat_content_exec(update, context): else: if text[-1] not in [",", "。", "!", ":", "?", "!", "?", ",", ":", "."]: r.append("{}_chat_content".format(chat_id), text + "。") + r.append("{}_{}_user_content".format(chat_id, user_id), text + "。") else: r.append("{}_chat_content".format(chat_id), text) + r.append("{}_{}_user_content".format(chat_id, user_id), text) r.incrby("{}_total_message_amount".format(chat_id)) + r.incrby("{}_{}_user_message_amount".format(chat_id, user_id)) r.hincrby("{}_user_message_amount".format(chat_id), name) print("---------------------------") except Exception as e: @@ -152,4 +202,5 @@ def check_schedule(): start_handler = CommandHandler('start', start) ping_handler = CommandHandler('ping', ping) rank_handler = CommandHandler('rank', rank) +stat_handler = CommandHandler('stat', stat) chat_content_handler = MessageHandler(Filters.text, chat_content_exec) diff --git a/main.py b/main.py index 90b30c5..1b18046 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,8 @@ from telegram.ext import Updater from config import TOKEN -from func import start_handler, ping_handler, chat_content_handler, check_schedule, rank_handler +from func import start_handler, ping_handler, chat_content_handler, check_schedule, rank_handler, stat_handler import schedule -from task import schedule_task, flush_redis, do_task +from task import schedule_task, flush_redis, do_task, do_user_task import threading # 开始定时任务 - 群组分析 @@ -14,8 +14,9 @@ schedule.every().day.at('23:59').do(flush_redis) # 测试代码,每分钟推送数据,非测试目的不要取消注释下一行 # schedule.every(1).minutes.do(schedule_task) -# 开启分析线程,当队列中由任务时,会取出任务分析生成数据 +# 开启分析线程,当队列中有任务时,会取出任务分析生成数据 threading.Thread(target=do_task).start() +threading.Thread(target=do_user_task).start() threading.Thread(target=check_schedule).start() @@ -25,6 +26,7 @@ dispatcher = updater.dispatcher dispatcher.add_handler(start_handler) dispatcher.add_handler(ping_handler) dispatcher.add_handler(rank_handler) +dispatcher.add_handler(stat_handler) dispatcher.add_handler(chat_content_handler) updater.start_polling() diff --git a/task.py b/task.py index 13fd1bf..3f315af 100644 --- a/task.py +++ b/task.py @@ -13,6 +13,7 @@ from config import TOKEN, FRONT, CHANNEL bot = telegram.Bot(token=TOKEN) task_queue = queue.Queue() +user_task_queue = queue.Queue() def schedule_task(): @@ -66,10 +67,46 @@ def do_task(): time.sleep(1) +def do_user_task(): + while True: + temp = user_task_queue.get() + group = temp.split('|')[0] + uid = temp.split('|')[1] + try: + print("---------------------------") + print("群组: {} | 用户: {} | 分析处理中... | 剩余任务数量 {}".format(group, uid, task_queue.qsize())) + start_time = float(time.time()) + generate_user(group, uid) + stop_time = float(time.time()) + print("当前群组处理耗时:" + str(stop_time - start_time)) + print("---------------------------") + ctext = f'#WORDCLOUD #{uid} \n' \ + f'群组 ID:`{group}`\n' \ + f'用户 ID:`{uid}`\n' \ + f'执行操作:`生成用户词云`\n' \ + f'结果:`成功`\n' \ + f'处理耗时:`{str(stop_time - start_time)[:5]}`' + except Exception as e: + print("群组: {} | 用户: {} | 处理失败,可能是机器人已经被移出群组,请检查报错!".format(group, uid)) + print(e) + ctext = f'#WORDCLOUD #SCHEDULE #{uid} \n' \ + f'群组 ID:`{group}`\n' \ + f'用户 ID:`{uid}`\n' \ + f'执行操作:`生成用户词云`\n' \ + f'结果:`失败`\n' + if not CHANNEL == 0: + bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown") + time.sleep(1) + + def add_task(group): task_queue.put(group) +def add_user_task(group, uid): + user_task_queue.put(f'{group}|{uid}') + + # 核心函数,分词统计 def generate(group): mk = imageio.imread("circle.png") @@ -137,7 +174,7 @@ def generate(group): hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + ":" + str( word_amount[i][1]) + "\n" # print(hot_word_string) - text += f"🗣️ 本群 {user_amount} 位成员共产生 {total_message_amount} 条发言\n" \ + text += f"🗣️ 本群 {user_amount} 位成员共产生 {total_message_amount} 条纯文本消息\n" \ f"🤹‍ 大家今天讨论最多的是:\n\n{hot_word_string}\n" else: text += '无法分析出当前群组的热词列表,可能是数据量过小,嗨起来吧~\n' @@ -193,6 +230,110 @@ def generate(group): print("删除图片失败") +# 核心函数,用户分词统计 +def generate_user(group, uid): + mk = imageio.imread("circle.png") + # 构建并配置词云对象w,注意要加scale参数,提高清晰度 + w = wordcloud.WordCloud(width=800, + height=800, + background_color='white', + font_path=FRONT, + mask=mk, + scale=5) + r = connector.get_connection() + print(f"当前处理的群组:{group} | {uid}") + # 生成词云图片 + jieba.enable_paddle() # 启动paddle模式。 0.40版之后开始支持,早期版本不支持 + chat_content = r.get("{}_{}_user_content".format(group, uid)) + + if chat_content is None: + print("数据库中不存在此用户 {} | {} 数据".format(group, uid)) + return + word_list = [] + words = pseg.cut(chat_content, use_paddle=True) # paddle模式 + for word, flag in words: + # print(word + "\t" + flag) + if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]: + # 判断该词是否有效,不为空格 + if re.match(r"^\s+?$", word) is None: + word_list.append(word) + # print(word_list) + + # 获取消息总数 + total_message_amount = r.get("{}_{}_user_message_amount".format(group, uid)) + # 获取用户发言数字典 + user_message_amount = r.hgetall("{}_{}_user_message_amount".format(group, uid)) + user_message_amount = sorted(user_message_amount.items(), key=lambda kv: (int(kv[1])), reverse=True) + + # 截至时间 + date = time.strftime("%Y年%m月%d日", time.localtime()) + ' ⏱ ' + time.strftime("%H:%M", time.localtime()) + text = f'📅 截至 {date}\n' + # 分析高频词 + if len(word_list) > 0: + word_amount = {} + # print(word_amount) + for word in word_list: + if re.search( + r"[。|,|、|?|!|,|.|!|?|\\|/|+|\-|`|~|·|@|#|¥|$|%|^|&|*|(|)|;|;|‘|’|“|”|'|_|=|•|·|…|\"]", + word) is not None: + continue + # 判断该词是否之前已经出现 + if word_amount.get(word) is not None: + word_amount[word] = word_amount.get(word) + 1 + else: + word_amount[word] = 1 + # print(word_amount) + word_amount = sorted(word_amount.items(), key=lambda kv: (int(kv[1])), reverse=True) + if len(word_amount) > 0: + # print("排序后的热词:" + str(word_amount)) + hot_word_string = "" + # 默认展示前5位,少于5个则全部展示 + for i in range(min(5, len(word_amount))): + hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + ":" + str( + word_amount[i][1]) + "\n" + # print(hot_word_string) + text += f"🗣️ 此成员共产生 {total_message_amount} 条纯文本消息\n" \ + f"🤹‍ Ta 今天讨论最多的是:\n\n{hot_word_string}\n" + else: + text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n' + else: + text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n' + + # 开始创建词云 + img_path = 'images/default.png' + try: + string = " ".join(word_list) + # 将string变量传入w的generate()方法,给词云输入文字 + w.generate(string) + # 将词云图片导出到 images 文件夹 + w.to_file('images/{}_chat_word_cloud.png'.format(group)) + img_path = 'images/{}_chat_word_cloud.png'.format(group) + except Exception as e: + print(e) + print("词云图片生成失败") + + # 发送结果 + try: + bot.send_photo( + chat_id=group, + photo=open(img_path, "rb"), + caption=text, + parse_mode='Markdown', + disable_notification=True + ) + except Exception as e: + print(e) + r.delete('{}_{}_user_content'.format(group, uid)) + print("发送结果失败") + + # 删除图片 + try: + os.remove("images/{}_chat_word_cloud.png".format(group)) + except Exception as e: + print(e) + print("删除图片失败") + + def flush_redis(): r = connector.get_connection() r.flushall()