Compare commits

...

3 Commits
dev ... master

Author SHA1 Message Date
822679869f
[extend] Functionality of skipping schedule (#4)
[patch] Ignore words length smaller than 1
2021-10-02 21:34:04 +08:00
46c2af8125
🐛 修复匿名管理员无法手动生成群组词云的问题 2021-08-15 19:04:43 +08:00
08edcb7c6e
支持单用户话题统计。 (#3) 2021-06-21 16:37:37 +08:00
4 changed files with 209 additions and 9 deletions

View File

@ -9,6 +9,9 @@ EXCLUSIVE_MODE = False
# 配置私有模式群组id列表不私有请忽略 例如:[-1001324252532, -100112415423] # 配置私有模式群组id列表不私有请忽略 例如:[-1001324252532, -100112415423]
EXCLUSIVE_LIST = [] EXCLUSIVE_LIST = []
# 跳过定时词云的群组
SKIP_SCHEDULE_GROUP = []
# 主动触发命令仅管理员有效 False:否 True:是 # 主动触发命令仅管理员有效 False:否 True:是
RANK_COMMAND_MODE = True RANK_COMMAND_MODE = True
@ -25,5 +28,6 @@ OWNER = 0
CHANNEL = 0 CHANNEL = 0
# 帮助信息 # 帮助信息
HELP = '<b>Group Word Cloud Bot</b>\n\n/start - 查看此帮助信息\n/ping - 我还活着吗?\n/rank - 手动生成词云(绒布球)\n\n' \ HELP = '<b>Group Word Cloud Bot</b>\n\n/start - 查看此帮助信息\n/ping - 我还活着吗?\n/rank - 手动生成词云(绒布球)\n' \
'/stat - 生成单用户词云\n\n' \
'此项目开源于https://git.io/JnrvH' '此项目开源于https://git.io/JnrvH'

55
func.py
View File

@ -5,7 +5,7 @@ import telegram
from telegram.ext import CommandHandler, MessageHandler, Filters from telegram.ext import CommandHandler, MessageHandler, Filters
from config import TOKEN, LIMIT_COUNT, EXCLUSIVE_MODE, RANK_COMMAND_MODE, OWNER, EXCLUSIVE_LIST, CHANNEL, HELP from config import TOKEN, LIMIT_COUNT, EXCLUSIVE_MODE, RANK_COMMAND_MODE, OWNER, EXCLUSIVE_LIST, CHANNEL, HELP
import schedule import schedule
from task import add_task from task import add_task, add_user_task
bot = telegram.Bot(token=TOKEN) bot = telegram.Bot(token=TOKEN)
@ -62,6 +62,8 @@ def rank(update, context):
try: try:
chat_member = bot.get_chat_member(chat_id, user_id) chat_member = bot.get_chat_member(chat_id, user_id)
status = chat_member["status"] status = chat_member["status"]
if user_id == 1087968824:
status = "administrator"
print("此用户在群组中身份为: {}".format(status)) print("此用户在群组中身份为: {}".format(status))
if status == "creator" or status == "administrator": if status == "creator" or status == "administrator":
print("用户权限正确") print("用户权限正确")
@ -102,6 +104,53 @@ def rank(update, context):
print(e) print(e)
def stat(update, context):
try:
r = connector.get_connection()
chat_type = update.effective_chat.type
user_id = update.effective_user.id
chat_id = update.effective_message.chat_id
try:
username = update.effective_user.username
except Exception as e:
username = update.effective_user.id
# 限制为群组
if chat_type != "supergroup":
update.message.reply_text("此命令只有在群组中有效!")
return
if r.exists("{}_{}_frequency_limit".format(chat_id, user_id)):
r.setrange("{}_{}_frequency_limit".format(chat_id, user_id), 0,
int(r.get("{}_{}_frequency_limit".format(chat_id, user_id))) + 1)
else:
struct_time = time.localtime(time.time())
# 数据过期时间为当前小时的 59 分
ex_time = datetime.datetime(
struct_time.tm_year,
struct_time.tm_mon,
struct_time.tm_mday,
struct_time.tm_hour,
59
)
r.set("{}_{}_frequency_limit".format(chat_id, user_id), 1)
r.expireat("{}_{}_frequency_limit".format(chat_id, user_id), ex_time)
count = int(r.get("{}_{}_frequency_limit".format(chat_id, user_id)))
if count > LIMIT_COUNT:
update.message.reply_text(f"[您](tg://user?id={user_id})在这个小时内的生成配额已经用完,请稍后再试~")
return
add_user_task(chat_id, user_id)
print("群组: {},用户: {} | {} 发起了主动触发请求".format(chat_id, username, user_id, ))
if not CHANNEL == 0:
ctext = f'#WORDCLOUD #APPLY #id{user_id} \n' \
f'群组 ID`{chat_id}`\n' \
f'用户 ID`{user_id}`\n' \
f'执行操作:`主动生成用户词云`'
bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown")
update.message.reply_text("统计数据将在分析完毕后发送到当前群组,请稍等~")
except Exception as e:
print("主动触发任务失败,请检查")
print(e)
def chat_content_exec(update, context): def chat_content_exec(update, context):
try: try:
r = connector.get_connection() r = connector.get_connection()
@ -133,9 +182,12 @@ def chat_content_exec(update, context):
else: else:
if text[-1] not in ["", "", "", "", "", "!", "?", ",", ":", "."]: if text[-1] not in ["", "", "", "", "", "!", "?", ",", ":", "."]:
r.append("{}_chat_content".format(chat_id), text + "") r.append("{}_chat_content".format(chat_id), text + "")
r.append("{}_{}_user_content".format(chat_id, user_id), text + "")
else: else:
r.append("{}_chat_content".format(chat_id), text) r.append("{}_chat_content".format(chat_id), text)
r.append("{}_{}_user_content".format(chat_id, user_id), text)
r.incrby("{}_total_message_amount".format(chat_id)) r.incrby("{}_total_message_amount".format(chat_id))
r.incrby("{}_{}_user_message_amount".format(chat_id, user_id))
r.hincrby("{}_user_message_amount".format(chat_id), name) r.hincrby("{}_user_message_amount".format(chat_id), name)
print("---------------------------") print("---------------------------")
except Exception as e: except Exception as e:
@ -152,4 +204,5 @@ def check_schedule():
start_handler = CommandHandler('start', start) start_handler = CommandHandler('start', start)
ping_handler = CommandHandler('ping', ping) ping_handler = CommandHandler('ping', ping)
rank_handler = CommandHandler('rank', rank) rank_handler = CommandHandler('rank', rank)
stat_handler = CommandHandler('stat', stat)
chat_content_handler = MessageHandler(Filters.text, chat_content_exec) chat_content_handler = MessageHandler(Filters.text, chat_content_exec)

View File

@ -1,8 +1,8 @@
from telegram.ext import Updater from telegram.ext import Updater
from config import TOKEN from config import TOKEN
from func import start_handler, ping_handler, chat_content_handler, check_schedule, rank_handler from func import start_handler, ping_handler, chat_content_handler, check_schedule, rank_handler, stat_handler
import schedule import schedule
from task import schedule_task, flush_redis, do_task from task import schedule_task, flush_redis, do_task, do_user_task
import threading import threading
# 开始定时任务 - 群组分析 # 开始定时任务 - 群组分析
@ -14,8 +14,9 @@ schedule.every().day.at('23:59').do(flush_redis)
# 测试代码,每分钟推送数据,非测试目的不要取消注释下一行 # 测试代码,每分钟推送数据,非测试目的不要取消注释下一行
# schedule.every(1).minutes.do(schedule_task) # schedule.every(1).minutes.do(schedule_task)
# 开启分析线程,当队列中任务时,会取出任务分析生成数据 # 开启分析线程,当队列中任务时,会取出任务分析生成数据
threading.Thread(target=do_task).start() threading.Thread(target=do_task).start()
threading.Thread(target=do_user_task).start()
threading.Thread(target=check_schedule).start() threading.Thread(target=check_schedule).start()
@ -25,6 +26,7 @@ dispatcher = updater.dispatcher
dispatcher.add_handler(start_handler) dispatcher.add_handler(start_handler)
dispatcher.add_handler(ping_handler) dispatcher.add_handler(ping_handler)
dispatcher.add_handler(rank_handler) dispatcher.add_handler(rank_handler)
dispatcher.add_handler(stat_handler)
dispatcher.add_handler(chat_content_handler) dispatcher.add_handler(chat_content_handler)
updater.start_polling() updater.start_polling()

147
task.py
View File

@ -8,11 +8,12 @@ import telegram
import time import time
import os import os
import connector import connector
from config import TOKEN, FRONT, CHANNEL from config import TOKEN, FRONT, CHANNEL, SKIP_SCHEDULE_GROUP
bot = telegram.Bot(token=TOKEN) bot = telegram.Bot(token=TOKEN)
task_queue = queue.Queue() task_queue = queue.Queue()
user_task_queue = queue.Queue()
def schedule_task(): def schedule_task():
@ -28,7 +29,10 @@ def schedule_task():
for group in group_list: for group in group_list:
try: try:
# 向任务队列中添加任务 # 向任务队列中添加任务
if int(group) not in SKIP_SCHEDULE_GROUP:
task_queue.put(group) task_queue.put(group)
else:
print(f"已跳过群组 {group} 的定时任务执行。")
except Exception as e: except Exception as e:
print("群组:{} | 词云数据分析生成失败,请查看报错信息".format(group)) print("群组:{} | 词云数据分析生成失败,请查看报错信息".format(group))
print(e) print(e)
@ -66,10 +70,46 @@ def do_task():
time.sleep(1) time.sleep(1)
def do_user_task():
while True:
temp = user_task_queue.get()
group = temp.split('|')[0]
uid = temp.split('|')[1]
try:
print("---------------------------")
print("群组: {} | 用户: {} | 分析处理中... | 剩余任务数量 {}".format(group, uid, task_queue.qsize()))
start_time = float(time.time())
generate_user(group, uid)
stop_time = float(time.time())
print("当前群组处理耗时:" + str(stop_time - start_time))
print("---------------------------")
ctext = f'#WORDCLOUD #id{uid} \n' \
f'群组 ID`{group}`\n' \
f'用户 ID`{uid}`\n' \
f'执行操作:`生成用户词云`\n' \
f'结果:`成功`\n' \
f'处理耗时:`{str(stop_time - start_time)[:5]}`'
except Exception as e:
print("群组: {} | 用户: {} | 处理失败,可能是机器人已经被移出群组,请检查报错!".format(group, uid))
print(e)
ctext = f'#WORDCLOUD #SCHEDULE #id{uid} \n' \
f'群组 ID`{group}`\n' \
f'用户 ID`{uid}`\n' \
f'执行操作:`生成用户词云`\n' \
f'结果:`失败`\n'
if not CHANNEL == 0:
bot.send_message(chat_id=CHANNEL, text=ctext, parse_mode="Markdown")
time.sleep(1)
def add_task(group): def add_task(group):
task_queue.put(group) task_queue.put(group)
def add_user_task(group, uid):
user_task_queue.put(f'{group}|{uid}')
# 核心函数,分词统计 # 核心函数,分词统计
def generate(group): def generate(group):
mk = imageio.imread("circle.png") mk = imageio.imread("circle.png")
@ -95,7 +135,7 @@ def generate(group):
# print(word + "\t" + flag) # print(word + "\t" + flag)
if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]: if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]:
# 判断该词是否有效,不为空格 # 判断该词是否有效,不为空格
if re.match(r"^\s+?$", word) is None: if re.match(r"^\s+?$", word) is None and len(word) > 1:
word_list.append(word) word_list.append(word)
# print(word_list) # print(word_list)
@ -137,7 +177,7 @@ def generate(group):
hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + "" + str( hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + "" + str(
word_amount[i][1]) + "\n" word_amount[i][1]) + "\n"
# print(hot_word_string) # print(hot_word_string)
text += f"🗣️ 本群 {user_amount} 位成员共产生 {total_message_amount}发言\n" \ text += f"🗣️ 本群 {user_amount} 位成员共产生 {total_message_amount}纯文本消息\n" \
f"🤹‍ 大家今天讨论最多的是:\n\n{hot_word_string}\n" f"🤹‍ 大家今天讨论最多的是:\n\n{hot_word_string}\n"
else: else:
text += '无法分析出当前群组的热词列表,可能是数据量过小,嗨起来吧~\n' text += '无法分析出当前群组的热词列表,可能是数据量过小,嗨起来吧~\n'
@ -193,6 +233,107 @@ def generate(group):
print("删除图片失败") print("删除图片失败")
# 核心函数,用户分词统计
def generate_user(group, uid):
mk = imageio.imread("circle.png")
# 构建并配置词云对象w注意要加scale参数提高清晰度
w = wordcloud.WordCloud(width=800,
height=800,
background_color='white',
font_path=FRONT,
mask=mk,
scale=5)
r = connector.get_connection()
print(f"当前处理的群组:{group} | {uid}")
# 生成词云图片
jieba.enable_paddle() # 启动paddle模式。 0.40版之后开始支持,早期版本不支持
chat_content = r.get("{}_{}_user_content".format(group, uid))
if chat_content is None:
print("数据库中不存在此用户 {} | {} 数据".format(group, uid))
return
word_list = []
words = pseg.cut(chat_content, use_paddle=True) # paddle模式
for word, flag in words:
# print(word + "\t" + flag)
if flag in ["n", "nr", "nz", "PER", "f", "ns", "LOC", "s", "nt", "ORG", "nw"]:
# 判断该词是否有效,不为空格
if re.match(r"^\s+?$", word) is None:
word_list.append(word)
# print(word_list)
# 获取消息总数
total_message_amount = r.get("{}_{}_user_message_amount".format(group, uid))
# 截至时间
date = time.strftime("%Y年%m月%d", time.localtime()) + '' + time.strftime("%H:%M", time.localtime())
text = f'📅 截至 {date}\n'
# 分析高频词
if len(word_list) > 0:
word_amount = {}
# print(word_amount)
for word in word_list:
if re.search(
r"[。||、|||,|.|!|?|\\|/|+|\-|`|~|·|@|#|¥|$|%|^|&|*|(|)|;||||“|”|'|_|=|•|·|…|\"]",
word) is not None:
continue
# 判断该词是否之前已经出现
if word_amount.get(word) is not None:
word_amount[word] = word_amount.get(word) + 1
else:
word_amount[word] = 1
# print(word_amount)
word_amount = sorted(word_amount.items(), key=lambda kv: (int(kv[1])), reverse=True)
if len(word_amount) > 0:
# print("排序后的热词:" + str(word_amount))
hot_word_string = ""
# 默认展示前5位少于5个则全部展示
for i in range(min(5, len(word_amount))):
hot_word_string += "\t\t\t\t\t\t\t\t" + "👥 `" + str(word_amount[i][0]) + "`" + "" + str(
word_amount[i][1]) + "\n"
# print(hot_word_string)
text += f"🗣️ [此成员](tg://user?id={uid})共产生 {total_message_amount} 条纯文本消息\n" \
f"🤹‍ Ta 今天讨论最多的是:\n\n{hot_word_string}\n"
else:
text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n'
else:
text += '无法分析出 Ta 的热词列表,可能是数据量过小,嗨起来吧~\n'
# 开始创建词云
img_path = 'images/default.png'
try:
string = " ".join(word_list)
# 将string变量传入w的generate()方法,给词云输入文字
w.generate(string)
# 将词云图片导出到 images 文件夹
w.to_file('images/{}_chat_word_cloud.png'.format(group))
img_path = 'images/{}_chat_word_cloud.png'.format(group)
except Exception as e:
print(e)
print("词云图片生成失败")
# 发送结果
try:
bot.send_photo(
chat_id=group,
photo=open(img_path, "rb"),
caption=text,
parse_mode='Markdown',
disable_notification=True
)
except Exception as e:
print(e)
r.delete('{}_{}_user_content'.format(group, uid))
print("发送结果失败")
# 删除图片
try:
os.remove("images/{}_chat_word_cloud.png".format(group))
except Exception as e:
print(e)
print("删除图片失败")
def flush_redis(): def flush_redis():
r = connector.get_connection() r = connector.get_connection()
r.flushall() r.flushall()