💥 使用 Github Repo 实现插件仓库

This commit is contained in:
xtaodada 2020-08-11 20:47:34 +08:00
parent b4f0078e68
commit 8056174d42
No known key found for this signature in database
GPG Key ID: EE4DC37B55E24736
8 changed files with 150 additions and 1002 deletions

View File

@ -1,5 +1,8 @@
""" PagerMaid module to manage plugins. """ """ PagerMaid module to manage plugins. """
import json
from re import match, I
from requests import get
from os import remove, rename, chdir, path from os import remove, rename, chdir, path
from os.path import exists from os.path import exists
from shutil import copyfile, move from shutil import copyfile, move
@ -10,9 +13,9 @@ from pagermaid.utils import upload_attachment
from pagermaid.modules import plugin_list as active_plugins, __list_plugins from pagermaid.modules import plugin_list as active_plugins, __list_plugins
@listener(is_plugin=False, outgoing=True, command="plugin", diagnostics=False, @listener(is_plugin=False, outgoing=True, command="apt", diagnostics=False,
description="用于管理安装到 PagerMaid-Modify 的插件。", description="用于管理安装到 PagerMaid-Modify 的插件。",
parameters="{status|install|remove|enable|disable|upload} <插件名称/文件>") parameters="{update|search|show|status|install|remove|enable|disable|upload} <插件名称/文件>")
async def plugin(context): async def plugin(context):
if len(context.parameter) > 2 or len(context.parameter) == 0: if len(context.parameter) > 2 or len(context.parameter) == 0:
await context.edit("出错了呜呜呜 ~ 无效的参数。") await context.edit("出错了呜呜呜 ~ 无效的参数。")
@ -44,17 +47,95 @@ async def plugin(context):
await context.edit(f"插件 {path.basename(file_path)[:-3]} 已安装PagerMaid-Modify 正在重新启动。") await context.edit(f"插件 {path.basename(file_path)[:-3]} 已安装PagerMaid-Modify 正在重新启动。")
await log(f"成功安装插件 {path.basename(file_path)[:-3]}.") await log(f"成功安装插件 {path.basename(file_path)[:-3]}.")
await context.client.disconnect() await context.client.disconnect()
elif len(context.parameter) == 2:
plugin_name = context.parameter[1]
plugin_online = \
json.loads(get("https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/list.json").content)[
'list']
if exists(f"{plugin_directory}version.json"):
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
try:
plugin_version = version_json[plugin_name]
except:
plugin_version = False
else:
temp_dict = {}
with open(f"{plugin_directory}version.json", 'w') as f:
json.dump(temp_dict, f)
plugin_version = False
for i in plugin_online:
if i['name'] == plugin_name:
if plugin_version:
if (float(i['version']) - float(plugin_version)) <= 0:
await context.edit(f"插件 {plugin_name} 为最新版本,无需更新。")
return
else:
file_path = plugin_name + ".py"
plugin_content = get(
f"https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/{plugin_name}.py").content
with open(file_path, 'wb') as f:
f.write(plugin_content)
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
version_json[plugin_name] = i['version']
with open(f"{plugin_directory}version.json", 'w') as f:
json.dump(version_json, f)
if exists(f"{plugin_directory}{file_path}"):
remove(f"{plugin_directory}{file_path}")
move(file_path, plugin_directory)
elif exists(f"{plugin_directory}{file_path}.disabled"):
remove(f"{plugin_directory}{file_path}.disabled")
move(file_path, f"{plugin_directory}{file_path}.disabled")
else:
move(file_path, plugin_directory)
await context.edit(f"插件 {path.basename(file_path)[:-3]} 已安装PagerMaid-Modify 正在重新启动。")
await log(f"成功安装插件 {path.basename(file_path)[:-3]}.")
await context.client.disconnect()
else:
file_path = plugin_name + ".py"
plugin_content = get(
f"https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/{plugin_name}.py").content
with open(file_path, 'wb') as f:
f.write(plugin_content)
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
version_json[plugin_name] = i['version']
with open(f"{plugin_directory}version.json", 'w') as f:
json.dump(version_json, f)
if exists(f"{plugin_directory}{file_path}"):
remove(f"{plugin_directory}{file_path}")
move(file_path, plugin_directory)
elif exists(f"{plugin_directory}{file_path}.disabled"):
remove(f"{plugin_directory}{file_path}.disabled")
move(file_path, f"{plugin_directory}{file_path}.disabled")
else:
move(file_path, plugin_directory)
await context.edit(f"插件 {path.basename(file_path)[:-3]} 已安装PagerMaid-Modify 正在重新启动。")
await log(f"成功安装插件 {path.basename(file_path)[:-3]}.")
await context.client.disconnect()
await context.edit(f"错误:没有找到插件 {plugin_name}")
else: else:
await context.edit("出错了呜呜呜 ~ 无效的参数。") await context.edit("出错了呜呜呜 ~ 无效的参数。")
elif context.parameter[0] == "remove": elif context.parameter[0] == "remove":
if len(context.parameter) == 2: if len(context.parameter) == 2:
if exists(f"{plugin_directory}{context.parameter[1]}.py"): if exists(f"{plugin_directory}{context.parameter[1]}.py"):
remove(f"{plugin_directory}{context.parameter[1]}.py") remove(f"{plugin_directory}{context.parameter[1]}.py")
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
version_json[context.parameter[1]] = '0.0'
with open(f"{plugin_directory}version.json", 'w') as f:
json.dump(version_json, f)
await context.edit(f"成功删除插件 {context.parameter[1]}, PagerMaid-Modify 正在重新启动。") await context.edit(f"成功删除插件 {context.parameter[1]}, PagerMaid-Modify 正在重新启动。")
await log(f"删除插件 {context.parameter[1]}.") await log(f"删除插件 {context.parameter[1]}.")
await context.client.disconnect() await context.client.disconnect()
elif exists(f"{plugin_directory}{context.parameter[1]}.py.disabled"): elif exists(f"{plugin_directory}{context.parameter[1]}.py.disabled"):
remove(f"{plugin_directory}{context.parameter[1]}.py.disabled") remove(f"{plugin_directory}{context.parameter[1]}.py.disabled")
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
version_json[context.parameter[1]] = '0.0'
with open(f"{plugin_directory}version.json", 'w') as f:
json.dump(version_json, f)
await context.edit(f"已删除的插件 {context.parameter[1]}.") await context.edit(f"已删除的插件 {context.parameter[1]}.")
await log(f"已删除的插件 {context.parameter[1]}.") await log(f"已删除的插件 {context.parameter[1]}.")
elif "/" in context.parameter[1]: elif "/" in context.parameter[1]:
@ -144,5 +225,72 @@ async def plugin(context):
await context.edit("出错了呜呜呜 ~ 指定的插件不存在。") await context.edit("出错了呜呜呜 ~ 指定的插件不存在。")
else: else:
await context.edit("出错了呜呜呜 ~ 无效的参数。") await context.edit("出错了呜呜呜 ~ 无效的参数。")
elif context.parameter[0] == "update":
unneed_update = "无需更新:"
need_update = "\n需要更新:"
with open(f"{plugin_directory}version.json", 'r', encoding="utf-8") as f:
version_json = json.load(f)
plugin_online = \
json.loads(get("https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/list.json").content)['list']
for key, value in version_json.items():
if value == "0.0":
continue
for i in plugin_online:
if key == i['name']:
if (float(i['version']) - float(value)) <= 0:
unneed_update += "\n`" + key + "`Ver " + value
else:
need_update += "\n`" + key + "`Ver " + value + " --> Ver " + i['version']
continue
if unneed_update == "无需更新:":
unneed_update = ''
if need_update == "\n需要更新:":
need_update = ''
if unneed_update == '' and need_update == '':
await context.edit("不如去安装一些插件?")
else:
await context.edit(unneed_update + need_update)
elif context.parameter[0] == "search":
if len(context.parameter) == 1:
await context.edit("没插件名我怎么搜索?")
elif len(context.parameter) == 2:
search_result = []
plugin_name = context.parameter[1]
plugin_online = \
json.loads(get("https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/list.json").content)[
'list']
for i in plugin_online:
if match(plugin_name, i['name'], I):
search_result.extend(['`' + i['name'] + '` / `' + i['version'] + '`\n ' + i['des-short']])
if len(search_result) == 0:
await context.edit("未在插件仓库中搜索到相关插件。")
else:
await context.edit('以下是插件仓库的搜索结果:\n\n' + '\n\n'.join(search_result))
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
elif context.parameter[0] == "show":
if len(context.parameter) == 1:
await context.edit("没插件名我怎么显示?")
elif len(context.parameter) == 2:
search_result = ''
plugin_name = context.parameter[1]
plugin_online = \
json.loads(get("https://raw.githubusercontent.com/xtaodada/PagerMaid_Plugins/master/list.json").content)[
'list']
for i in plugin_online:
if plugin_name == i['name']:
if i['supported']:
search_support = '仍在周期中'
else:
search_support = '已弃疗'
search_result = '插件名:`' + i['name'] + '`\n版本:`Ver ' + i['version'] + '`\n分类:`' + i[
'section'] + '`\n作者:`' + \
i['maintainer'] + '`\n大小:`' + i['size'] + '`\n支持周期:' + search_support + '\n说明:' + i[
'des-short'] + '\n\n' + i['des']
break
if search_result == '':
await context.edit("未在插件仓库中搜索到相关插件。")
else:
await context.edit(search_result)
else: else:
await context.edit("出错了呜呜呜 ~ 无效的参数。") await context.edit("出错了呜呜呜 ~ 无效的参数。")

View File

@ -1,45 +0,0 @@
""" Pagermaid autorespond plugin. """
from telethon.events import StopPropagation
from pagermaid import persistent_vars, log
from pagermaid.listener import listener
persistent_vars.update({'autorespond': {'enabled': False, 'message': None, 'amount': 0}})
@listener(outgoing=True, command="autorespond",
description="启用自动回复。",
parameters="<message>")
async def autorespond(context):
""" Enables the auto responder. """
message = "我还在睡觉... ZzZzZzZzZZz"
if context.arguments:
message = context.arguments
await context.edit("成功启用自动响应器。")
await log(f"启用自动响应器,将自动回复 `{message}`.")
persistent_vars.update({'autorespond': {'enabled': True, 'message': message, 'amount': 0}})
raise StopPropagation
@listener(outgoing=True)
async def disable_responder(context):
if persistent_vars['autorespond']['enabled']:
await log(f"禁用自动响应器。 在闲置期间 {persistent_vars['autorespond']['amount']}"
f" 条消息被自动回复")
persistent_vars.update({'autorespond': {'enabled': False, 'message': None, 'amount': 0}})
@listener(incoming=True)
async def private_autorespond(context):
if persistent_vars['autorespond']['enabled']:
if context.is_private and not (await context.get_sender()).bot:
persistent_vars['autorespond']['amount'] += 1
await context.reply(persistent_vars['autorespond']['message'])
@listener(incoming=True)
async def mention_autorespond(context):
if persistent_vars['autorespond']['enabled']:
if context.message.mentioned and not (await context.get_sender()).bot:
persistent_vars['autorespond']['amount'] += 1
await context.reply(persistent_vars['autorespond']['message'])

View File

@ -1,110 +0,0 @@
""" Module to automate message deletion. """
from asyncio import sleep
from os import path, remove
from os.path import exists
from PIL import Image
from pagermaid import redis, log, redis_status
from pagermaid.listener import listener
@listener(outgoing=True, command="dme",
description="编辑并删除当前对话您发送的特定数量的消息。限制:基于消息 ID 的 1000 条消息,大于 1000 条可能会触发删除消息过快限制。入群消息非管理员无法删除。(倒序)当数字足够大时即可实现删除所有消息。",
parameters="<数量> [文本]")
async def selfprune(context):
""" Deletes specific amount of messages you sent. """
reply = await context.get_reply_message()
if reply and reply.photo:
if exists('plugins/dme.jpg'):
remove('plugins/dme.jpg')
target_file = reply.photo
await context.client.download_media(
await context.get_reply_message(), file = "plugins/dme.jpg"
)
await context.edit("替换图片设置完成。")
elif reply and reply.sticker:
if exists('plugins/dme.jpg'):
remove('plugins/dme.jpg')
await context.client.download_media(reply.media.document, file = "plugins/dme.webp")
im = Image.open("plugins/dme.webp")
im.save('plugins/dme.png', "png")
remove('plugins/dme.webp')
target_file = await context.client.upload_file('plugins/dme.png')
await context.edit("替换图片设置完成。")
elif path.isfile("plugins/dme.jpg"):
target_file = await context.client.upload_file('plugins/dme.jpg')
elif path.isfile("plugins/dme.png"):
target_file = await context.client.upload_file('plugins/dme.png')
else:
target_file = False
await context.edit("注意:没有图片进行替换。")
try:
count = int(context.parameter[0]) + 1
except ValueError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
except IndexError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
dme_msg = "别搁这防撤回了。。。"
if len(context.parameter) == 1:
if not redis_status:
pass
else:
try:
dme_msg = redis.get("dme_msg").decode()
except:
pass
elif len(context.parameter) == 2:
dme_msg = context.parameter[1]
if not redis_status():
pass
elif not dme_msg == str(count):
try:
redis.set("dme_msg", dme_msg)
except:
pass
count_buffer = 0
async for message in context.client.iter_messages(context.chat_id, from_user="me"):
if count_buffer == count:
break
if message.forward or message.via_bot or message.sticker or message.contact or message.poll or message.game or message.geo:
pass
elif message.text or message.voice:
if not message.text == dme_msg:
try:
await message.edit(dme_msg)
except:
pass
elif message.document or message.photo or message.file or message.audio or message.video or message.gif:
if target_file:
if not message.text == dme_msg:
try:
await message.edit(dme_msg, file=target_file)
except:
pass
else:
if not message.text == dme_msg:
try:
await message.edit(dme_msg)
except:
pass
else:
pass
await message.delete()
count_buffer += 1
count -= 1
count_buffer -= 1
await log(f"批量删除了自行发送的 {str(count_buffer)} / {str(count)} 条消息。")
notification = await send_prune_notify(context, count_buffer, count)
await sleep(.5)
await notification.delete()
async def send_prune_notify(context, count_buffer, count):
return await context.client.send_message(
context.chat_id,
"删除了 "
+ str(count_buffer) + " / " + str(count)
+ " 条消息。"
)

View File

@ -1,303 +0,0 @@
""" Send Msg At A Specified Time. """
# By tg @fruitymelon
# extra requirements: dateparser
imported = True
import os, sys, time, traceback
try:
import dateparser
except ImportError:
imported = False
import asyncio
from pagermaid import log
from pagermaid.listener import listener
from datetime import datetime
from dateutil import parser
def logsync(message):
sys.stdout.writelines(f"{message}\n")
logsync("sendat: loading... If failed, please install dateparser first.")
# https://stackoverflow.com/questions/1111056/get-time-zone-information-of-the-system-in-python
def local_time_offset(t=None):
"""Return offset of local zone from GMT, either at present or at time t."""
# python2.3 localtime() can't take None
if t is None:
t = time.time()
if time.localtime(t).tm_isdst and time.daylight:
return -time.altzone
else:
return -time.timezone
offset = local_time_offset() // 3600
sign = "+" if offset >= 0 else "-"
offset = abs(offset)
offset_str = str(offset)
offset_str = offset_str if len(offset_str) == 2 else f"0{offset_str}"
settings = {'TIMEZONE': f'{sign}{offset_str}00'}
logsync(f"sendat: local time zone offset is {sign}{abs(offset)}")
mem = []
helpmsg = """
定时发送消息
-sendat 时间 | 消息内容
i.e.
-sendat 16:00:00 | 投票截止
-sendat every 23:59:59 | 又是无所事事的一天呢
-sendat every 1 minutes | 又过去了一分钟
-sendat *3 1 minutes | 此消息将出现三次间隔均为一分钟
此命令可与 autorespond, ghost, deny 命令串联
-sendat 10 minutes | -autorespond 我暂时有事离开一下
"""
@listener(outgoing=True, command="sendat", diagnostics=True, ignore_edited=False,
description=helpmsg,
parameters="<atmsg>")
async def sendatwrap(context):
await sendat(context)
async def sendat(context):
mem_id = len(mem)
chat = await context.get_chat()
args = " ".join(context.parameter).split("|")
if not imported:
await context.edit("Please install dateparser first: python3 -m pip install dateparser")
return
await context.edit(f"tz data: {time.timezone} {time.tzname} {sign}{offset}")
if len(args) != 2:
await context.edit("Invalid argument. Expected: 2")
return
if offset is None:
await context.edit("Failed to get server timezone.")
return
try:
if args[0].find("every ") == 0:
# at this point, let's assume args[0] contains relative time
# i.e. -sendat every 3 minutes
time_str = args[0][6:]
if time_str.find(":") != -1:
# then it should be absolute time
sleep_times = [abs(dateparser.parse(time_str, settings=settings).timestamp() - time.time()), 24 * 60 * 60]
index = 0
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while True:
last_time = time.time()
while time.time() < last_time + sleep_times[index]:
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
index = 1
mem[mem_id] = ""
return
sleep_time = abs(dateparser.parse(time_str, settings=settings).timestamp() - time.time())
if sleep_time < 5:
await context.edit(f"Sleep time too short. Should be longer than 5 seconds. Got {sleep_time}")
return
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while True:
last_time = time.time()
while time.time() < last_time + sleep_time:
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
return
elif args[0].find("*") == 0:
times = int(args[0][1:].split(" ")[0])
rest = " ".join(args[0][1:].split(" ")[1:])
if rest.find(":") != -1:
# then it should be absolute time
sleep_times = [abs(dateparser.parse(rest, settings=settings).timestamp() - time.time()), 24 * 60 * 60]
count = 0
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while count <= times:
last_time = time.time()
while time.time() < last_time + sleep_times[0 if count == 0 else 1]:
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
count += 1
mem[mem_id] = ""
return
sleep_time = abs(dateparser.parse(rest, settings=settings).timestamp() - time.time())
if sleep_time < 5:
await context.edit("Sleep time too short. Should be longer than 5 seconds.")
return
count = 0
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while count <= times:
last_time = time.time()
while time.time() < last_time + sleep_time:
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
count += 1
mem[mem_id] = ""
return
if args[0].find(":") == -1:
# relative time
dt = dateparser.parse(args[0])
delta = time.time() - dt.timestamp()
if delta < 3:
await context.edit("Target time before now.")
return
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while dt.timestamp() + 2*delta > time.time():
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
return
# absolute time
dt = parser.parse(args[0])
delta = dt.timestamp() - time.time()
if delta < 3:
await context.edit("Target time before now.")
return
mem.append("|".join(args))
await context.edit(f"Registered: id {mem_id}")
while delta > 0:
delta = dt.timestamp() - time.time()
await asyncio.sleep(2)
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
except Exception as e:
await log(str(e))
await log(str(traceback.format_stack()))
return
@listener(outgoing=True, command="sendatdump", diagnostics=True, ignore_edited=False,
description="导出 sendat 消息")
async def sendatdump(context):
if mem.count("") != 0:
await context.edit(".\n-sendat " + "\n-sendat ".join(mem[:].remove("")))
else:
await context.edit(".\n-sendat " + "\n-sendat ".join(mem))
@listener(outgoing=True, command="sendatparse", diagnostics=True, ignore_edited=True,
description="导入已导出的 sendat 消息")
async def sendatparse(context):
chat = await context.get_chat()
text = "\n".join(context.message.text.split("\n")[1:])
if text == "":
return
if text.find(".\n") == 0:
text = "\n".join(text.split("\n")[1:])
lines = text.split("\n")
for i in range(len(lines)):
line = lines[i]
sent = await sendmsg(context, chat, line)
sent.parameter = line.replace("-sendat ", "").split(" ")
await sendat(sent)
""" Modified pagermaid autorespond plugin. """
from telethon.events import StopPropagation
from pagermaid import persistent_vars
async def autorespond(context):
""" Enables the auto responder. """
message = "我还在睡觉... ZzZzZzZzZZz"
if context.arguments:
message = context.arguments
await context.edit("成功启用自动响应器。")
await log(f"启用自动响应器,将自动回复 `{message}`.")
persistent_vars.update({'autorespond': {'enabled': True, 'message': message, 'amount': 0}})
from pagermaid import redis, redis_status
async def ghost(context):
""" Toggles ghosting of a user. """
if not redis_status():
await context.edit("出错了呜呜呜 ~ Redis 好像离线了,无法执行命令。")
return
if len(context.parameter) != 1:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
myself = await context.client.get_me()
self_user_id = myself.id
if context.parameter[0] == "true":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.set("ghosted.chat_id." + str(context.chat_id), "true")
await context.delete()
await log(f"已成功将 ChatID {str(context.chat_id)} 添加到自动已读对话列表中。")
elif context.parameter[0] == "false":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.delete("ghosted.chat_id." + str(context.chat_id))
await context.delete()
await log(f"已成功将 ChatID {str(context.chat_id)} 从自动已读对话列表中移除。")
elif context.parameter[0] == "status":
if redis.get("ghosted.chat_id." + str(context.chat_id)):
await context.edit("emm...当前对话已被加入自动已读对话列表中。")
else:
await context.edit("emm...当前对话已从自动已读对话列表中移除。")
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
async def deny(context):
""" Toggles denying of a user. """
if not redis_status():
await context.edit("出错了呜呜呜 ~ Redis 离线,无法运行。")
return
if len(context.parameter) != 1:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
myself = await context.client.get_me()
self_user_id = myself.id
if context.parameter[0] == "true":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.set("denied.chat_id." + str(context.chat_id), "true")
await context.delete()
await log(f"ChatID {str(context.chat_id)} 已被添加到自动拒绝对话列表中。")
elif context.parameter[0] == "false":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.delete("denied.chat_id." + str(context.chat_id))
await context.delete()
await log(f"ChatID {str(context.chat_id)} 已从自动拒绝对话列表中移除。")
elif context.parameter[0] == "status":
if redis.get("denied.chat_id." + str(context.chat_id)):
await context.edit("emm...当前对话已被加入自动拒绝对话列表中。")
else:
await context.edit("emm...当前对话已从自动拒绝对话列表移除。")
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
async def sendmsg(context, chat, origin_text):
text = origin_text.strip()
msg = await context.client.send_message(chat, text)
if text.find("-autorespond") == 0:
msg.arguments = text.replace("-autorespond", "").lstrip()
await autorespond(msg)
elif text.find("-ghost ") == 0:
msg.parameter = [text[7:]]
await ghost(msg)
elif text.find("-deny ") == 0:
msg.parameter = [text[6:]]
await deny(msg)
return msg

View File

@ -1,49 +0,0 @@
""" Module to automate message deletion. """
from asyncio import sleep
import time
import random
from time import strftime
from telethon.tl.functions.account import UpdateProfileRequest
from emoji import emojize
from pagermaid import bot, log
from pagermaid.listener import listener
dizzy = emojize(":dizzy:", use_aliases=True)
cake = emojize(":cake:", use_aliases=True)
all_time_emoji_name = ["clock12", "clock1230", "clock1", "clock130", "clock2", "clock230", "clock3", "clock330", "clock4", "clock430", "clock5", "clock530", "clock6", "clock630", "clock7", "clock730", "clock8", "clock830", "clock9", "clock930", "clock10", "clock1030", "clock11", "clock1130"]
time_emoji_symb = [emojize(":%s:" %s, use_aliases=True) for s in all_time_emoji_name]
@listener(outgoing=True, command="autochangename",
description="每 30 秒更新一次 last_name")
async def change_name_auto(context):
await context.delete()
await log("开始每 30 秒更新一次 last_name")
while True:
try:
time_cur = strftime("%H:%M:%S:%p:%a", time.localtime())
hour, minu, seco, p, abbwn = time_cur.split(':')
if seco == '00' or seco == '30':
shift = 0
mult = 1
if int(minu) > 30: shift = 1
# print((int(hour)%12)*2+shift)
# hour symbols
hsym = time_emoji_symb[(int(hour) % 12) * 2 + shift]
# await client1.send_message('me', hsym)
for_fun = random.random()
if for_fun < 0.10:
last_name = '%s%s%s' % (hour, minu, hsym)
elif for_fun < 0.30:
last_name = '%s:%s %s %s %s' % (hour, minu, p, abbwn, hsym)
elif for_fun < 0.60:
last_name = '%s:%s %s UTC+8 %s' % (hour, minu, p, hsym)
elif for_fun < 0.90:
last_name = '%s' % dizzy
else:
last_name = '%s' % cake
await bot(UpdateProfileRequest(last_name=last_name))
except KeyboardInterrupt:
await bot(UpdateProfileRequest(last_name=''))
await sleep(1)

View File

@ -1,384 +0,0 @@
""" Pagermaid plugin base. """
import json, requests, re
from googletrans import Translator
from urllib.parse import urlparse
from pagermaid import bot, log
from pagermaid.listener import listener, config
from pagermaid.utils import clear_emojis, obtain_message, attach_log
from telethon.tl.types import ChannelParticipantsAdmins
from os import remove
@listener(outgoing=True, command="guess",
description="能不能好好说话? - 拼音首字母缩写释义工具(需要回复一句话)")
async def guess(context):
reply = await context.get_reply_message()
await context.edit("获取中 . . .")
if reply:
pass
else:
context.edit("宁需要回复一句话")
return True
text = {'text': str(reply.message.replace("/guess ", "").replace(" ", ""))}
guess_json = json.loads(
requests.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text, verify=False).content.decode("utf-8"))
guess_res = []
if not len(guess_json) == 0:
for num in range(0, len(guess_json)):
guess_res1 = json.loads(json.dumps(guess_json[num]))
guess_res1_name = guess_res1['name']
try:
guess_res1_ans = ", ".join(guess_res1['trans'])
except:
try:
guess_res1_ans = ", ".join(guess_res1['inputting'])
except:
guess_res1_ans = "尚未录入"
guess_res.extend(["词组:" + guess_res1_name + "\n释义:" + guess_res1_ans])
await context.edit("\n\n".join(guess_res))
else:
await context.edit("没有匹配到拼音首字母缩写")
@listener(outgoing=True, command="admin",
description="一键 AT 本群管理员(仅在群组中有效)")
async def admin(context):
await context.edit('正在获取管理员列表中...')
chat = await context.get_chat()
try:
admins = await context.client.get_participants(chat, filter=ChannelParticipantsAdmins)
except:
await context.edit('请在群组中运行。')
return True
admin_list = []
for admin in admins:
if admin.first_name is not None:
admin_list.extend(['[' + admin.first_name + '](tg://user?id=' + str(admin.id) + ')'])
await context.edit(' , '.join(admin_list))
@listener(outgoing=True, command="wiki",
description="查询维基百科词条",
parameters="<词组>")
async def wiki(context):
translator = Translator()
lang = config['application_language']
await context.edit("获取中 . . .")
try:
message = await obtain_message(context)
except ValueError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
try:
wiki_json = json.loads(requests.get("https://zh.wikipedia.org/w/api.php?action=query&list=search&format=json&formatversion=2&srsearch=" + message).content.decode(
"utf-8"))
except:
await context.edit("出错了呜呜呜 ~ 无法访问到维基百科。")
return
if not len(wiki_json['query']['search']) == 0:
wiki_title = wiki_json['query']['search'][0]['title']
wiki_content = wiki_json['query']['search'][0]['snippet'].replace('<span class=\"searchmatch\">', '**').replace('</span>', '**')
wiki_time = wiki_json['query']['search'][0]['timestamp'].replace('T', ' ').replace('Z', ' ')
try:
await context.edit("正在生成翻译中 . . .")
wiki_content = translator.translate(clear_emojis(wiki_content), dest=lang)
message = '词条: [' + wiki_title + '](https://zh.wikipedia.org/zh-cn/' + wiki_title + ')\n\n' + wiki_content.text + '...\n\n此词条最后修订于 ' + wiki_time
except ValueError:
await context.edit("出错了呜呜呜 ~ 找不到目标语言,请更正配置文件中的错误。")
return
await context.edit(message)
else:
await context.edit("没有匹配到相关词条")
@listener(outgoing=True, command="ip",
description="IPINFO (或者回复一句话)",
parameters="<ip/域名>")
async def ipinfo(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = json.loads(requests.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,lon,isp,org,as,mobile,proxy,hosting,query").content.decode(
"utf-8"))
if ipinfo_json['status'] == 'fail':
pass
elif ipinfo_json['status'] == 'success':
ipinfo_list = []
ipinfo_list.extend(["查询目标: `" + url + "`"])
if ipinfo_json['query'] == url:
pass
else:
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
ipinfo_json['city'] + "`"])
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
ipinfo_list.extend(["ISP `" + ipinfo_json['isp'] + "`"])
if not ipinfo_json['org'] == '':
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
ipinfo_list.extend(
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
if ipinfo_json['mobile']:
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
if ipinfo_json['proxy']:
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
if ipinfo_json['hosting']:
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
await context.edit('\n'.join(ipinfo_list))
return True
else:
url = urlparse(context.arguments)
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = json.loads(requests.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,lon,isp,org,as,mobile,proxy,hosting,query").content.decode(
"utf-8"))
if ipinfo_json['status'] == 'fail':
pass
elif ipinfo_json['status'] == 'success':
ipinfo_list = []
if url == '':
ipinfo_list.extend(["查询目标: `本机地址`"])
else:
ipinfo_list.extend(["查询目标: `" + url + "`"])
if ipinfo_json['query'] == url:
pass
else:
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
ipinfo_json['city'] + "`"])
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
ipinfo_list.extend(["ISP `" + ipinfo_json['isp'] + "`"])
if not ipinfo_json['org'] == '':
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
ipinfo_list.extend(
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
if ipinfo_json['mobile']:
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
if ipinfo_json['proxy']:
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
if ipinfo_json['hosting']:
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
await context.edit('\n'.join(ipinfo_list))
return True
await context.edit('没有找到要查询的 ip/域名 ...')
except:
await context.edit('没有找到要查询的 ip/域名 ...')
@listener(outgoing=True, command="ipping",
description="Ping (或者回复一句话)",
parameters="<ip/域名>")
async def ipping(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
if url.hostname:
url = url.hostname
else:
url = url.path
pinginfo = requests.get(
"https://helloacm.com/api/ping/?cached&host=" + url).content.decode(
"utf-8")
if pinginfo == 'null':
pass
elif not pinginfo == 'null':
pinginfo = pinginfo.replace('"', '').replace("\/", '/').replace('\\n', '\n', 7).replace('\\n', '')
await context.edit(pinginfo)
return True
else:
url = urlparse(context.arguments)
if url.hostname:
url = url.hostname
else:
url = url.path
if url == '':
await context.edit('没有找到要查询的 ip/域名 ...')
return True
pinginfo = requests.get(
"https://helloacm.com/api/ping/?cached&host=" + url).content.decode(
"utf-8")
if pinginfo == 'null':
pass
elif not pinginfo == 'null':
pinginfo = pinginfo.replace('"', '').replace("\/", '/').replace('\\n', '\n', 7).replace('\\n', '')
await context.edit(pinginfo)
return True
await context.edit('没有找到要查询的 ip/域名 ...')
except:
await context.edit('没有找到要查询的 ip/域名 ...')
@listener(outgoing=True, command="pixiv",
description="查询插画信息 (或者回复一条消息)",
parameters="[<图片链接>] <图片序号>")
async def pixiv(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
try:
if context.arguments.strip() == '':
pixiv_page = 1
else:
try:
pixiv_page = int(context.arguments.strip())
except:
await context.edit('呜呜呜出错了...可能参数不是数字')
return True
except:
pass
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
try:
url = str(re.findall(r"\d+\.?\d*", url.path)[0])
pixiv_json = json.loads(requests.get(
"https://api.imjad.cn/pixiv/v2/?type=illust&id=" + url).content.decode(
"utf-8"))
except:
await context.edit('呜呜呜出错了...可能是链接不上 API 服务器')
return True
try:
pixiv_tag = pixiv_json['error']['user_message']
await context.edit('没有找到要查询的 pixiv 作品...')
return True
except:
if pixiv_page > pixiv_json['illust']['page_count']:
await context.edit('呜呜呜出错了...可能是参数指定的页数大于插画页数')
return True
else:
pass
pixiv_tag = []
pixiv_num = str(pixiv_json['illust']['page_count'])
pixiv_list = '[' + pixiv_json['illust']['title'] + '](https://www.pixiv.net/artworks/' + str(
pixiv_json['illust']['id']) + ')' + ' (' + str(pixiv_page) + '/' + pixiv_num + ')'
for nums in range(0, len(pixiv_json['illust']['tags'])):
pixiv_tag.extend(['#' + pixiv_json['illust']['tags'][nums]['name']])
try:
await context.edit('正在下载图片中 ...')
try:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_single_page']['original_image_url'])
except:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_pages'][pixiv_page - 1]['image_urls']['original'])
with open("pixiv.jpg", "wb") as code:
code.write(r.content)
await context.edit('正在上传图片中 ...')
await context.client.send_file(context.chat_id, 'pixiv.jpg',
caption=pixiv_list + '\nTags: ' + ' , '.join(pixiv_tag),
reply_to=reply.id)
await context.delete()
remove('pixiv.jpg')
except:
pass
return True
else:
try:
url = urlparse(context.arguments.split()[0])
if len(context.arguments.split()) == 1:
pixiv_page = 1
else:
try:
pixiv_page = int(context.arguments.split()[1])
except:
await context.edit('呜呜呜出错了...可能参数不是数字')
return True
except:
pass
try:
url = str(re.findall(r"\d+\.?\d*", url.path)[0])
pixiv_json = json.loads(requests.get(
"https://api.imjad.cn/pixiv/v2/?type=illust&id=" + url).content.decode(
"utf-8"))
except:
await context.edit('呜呜呜出错了...可能是链接不上 API 服务器')
try:
pixiv_tag = pixiv_json['error']['user_message']
await context.edit('没有找到要查询的 pixiv 作品...')
return True
except:
if pixiv_page > pixiv_json['illust']['page_count']:
await context.edit('呜呜呜出错了...可能是参数指定的页数大于插画页数')
return True
else:
pass
pixiv_tag = []
pixiv_num = str(pixiv_json['illust']['page_count'])
pixiv_list = '[' + pixiv_json['illust']['title'] + '](https://www.pixiv.net/artworks/' + str(
pixiv_json['illust']['id']) + ')' + ' (' + str(pixiv_page) + '/' + pixiv_num + ')'
for nums in range(0, len(pixiv_json['illust']['tags'])):
pixiv_tag.extend(['#' + pixiv_json['illust']['tags'][nums]['name']])
try:
await context.edit('正在下载图片中 ...')
try:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_single_page']['original_image_url'])
except:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_pages'][pixiv_page - 1]['image_urls']['original'])
with open("pixiv.jpg", "wb") as code:
code.write(r.content)
await context.edit('正在上传图片中 ...')
await context.client.send_file(context.chat_id, 'pixiv.jpg',
caption=pixiv_list + '\nTags: ' + ' , '.join(pixiv_tag))
await context.delete()
remove('pixiv.jpg')
except:
pass
return True
await context.edit('没有找到要查询的 pixiv 作品 ...')
except:
await context.edit('没有找到要查询的 pixiv 作品 ...')
@listener(outgoing=True, command="t",
description="通过腾讯AI开放平台将目标消息翻译成指定的语言。",
parameters="<文本>")
async def tx_t(context):
""" PagerMaid universal translator. """
reply = await context.get_reply_message()
message = context.arguments
lang = 'zh'
if message:
pass
elif reply:
message = reply.text
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
try:
await context.edit("正在生成翻译中 . . .")
tx_json = json.loads(requests.get(
"https://xtaolink.cn/git/m/t.php?lang=" + lang + '&text=' + clear_emojis(message)).content.decode(
"utf-8"))
if not tx_json['msg'] == 'ok':
context.edit("出错了呜呜呜 ~ 翻译出错")
return True
else:
result = '文本翻译:\n' + tx_json['data']['target_text']
except ValueError:
await context.edit("出错了呜呜呜 ~ 找不到目标语言,请更正配置文件中的错误。")
return
if len(result) > 4096:
await context.edit("输出超出 TG 限制,正在尝试上传文件。")
await attach_log(result, context.chat_id, "translation.txt", context.id)
return
await context.edit(result)

View File

@ -1,57 +0,0 @@
""" Pagermaid plugin base. """
from os import remove
from os.path import exists
from youtube_dl import YoutubeDL
from re import compile as regex_compile
from pagermaid import bot, log
from pagermaid.listener import listener
@listener(outgoing=True, command="ybdl",
description="上传 Youtube、Bilibili 视频到 telegram",
parameters="<url>.")
async def ybdl(context):
url = context.arguments
reply = await context.get_reply_message()
reply_id = None
await context.edit("获取视频中 . . .")
if reply:
reply_id = reply.id
if url is None:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
bilibili_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?bilibili(\.com)?/.+")
youtube_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?youtu(be|.be)?(\.com)?/.+")
if youtube_pattern.match(url):
if not await fetch_video(url, context.chat_id, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
await log(f"已拉取UTB视频地址 {url}.")
await context.delete()
if bilibili_pattern.match(url):
if not await fetch_video(url, context.chat_id, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
await log(f"已拉取 Bilibili 视频,地址: {url}.")
await context.delete()
async def fetch_video(url, chat_id, reply_id):
""" Extracts and uploads YouTube video. """
youtube_dl_options = {
'format': 'bestvideo+bestaudio/best',
'outtmpl': "video.%(ext)s",
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4'
}]
}
YoutubeDL(youtube_dl_options).download([url])
if not exists("video.mp4"):
return False
await bot.send_file(
chat_id,
"video.mp4",
reply_to=reply_id
)
remove("video.mp4")
return True

View File

@ -1,52 +0,0 @@
""" Pagermaid plugin base. """
from os import remove
from os.path import exists
from youtube_dl import YoutubeDL
from re import compile as regex_compile
from pagermaid import bot, log
from pagermaid.listener import listener
@listener(outgoing=True, command="ytdl",
description="YouTube downloader.",
parameters="<url>.")
async def ytdl(context):
url = context.arguments
reply = await context.get_reply_message()
reply_id = None
await context.edit("获取视频中 . . .")
if reply:
reply_id = reply.id
if url is None:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
youtube_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?youtu(be|.be)?(\.com)?/.+")
if youtube_pattern.match(url):
if not await fetch_youtube_video(url, context.chat_id, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
await log(f"已拉取UTB视频地址 {url}.")
await context.edit("视频获取成功!")
async def fetch_youtube_video(url, chat_id, reply_id):
""" Extracts and uploads YouTube video. """
youtube_dl_options = {
'format': 'bestvideo[height=720]+bestaudio/best',
'outtmpl': "video.%(ext)s",
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4'
}]
}
YoutubeDL(youtube_dl_options).download([url])
if not exists("video.mp4"):
return False
await bot.send_file(
chat_id,
"video.mp4",
reply_to=reply_id
)
remove("video.mp4")
return True