🎉 autorm sendat weather xtao-some yb-dl

This commit is contained in:
xtaodada 2020-08-11 23:47:41 +08:00
parent 415fa306f9
commit ef231ea443
No known key found for this signature in database
GPG Key ID: EE4DC37B55E24736
7 changed files with 1032 additions and 2 deletions

View File

@ -14,7 +14,7 @@ all_time_emoji_name = ["clock12", "clock1230", "clock1", "clock130", "clock2", "
time_emoji_symb = [emojize(":%s:" %s, use_aliases=True) for s in all_time_emoji_name]
@listener(outgoing=True, command="autochangename",
@listener(is_plugin=True, outgoing=True, command="autochangename",
description="每 30 秒更新一次 last_name")
async def change_name_auto(context):
await context.delete()

200
autorm.py Normal file
View File

@ -0,0 +1,200 @@
""" Remove Sent Msg After A Specified Time. """
# By tg @fruitymelon
# extra requirements: dateparser
try:
import dateparser
imported = True
except ImportError:
imported = False
import asyncio, time, traceback
from pagermaid import log
from pagermaid.listener import listener
from datetime import datetime
# https://stackoverflow.com/questions/1111056/get-time-zone-information-of-the-system-in-python
def local_time_offset(t=None):
"""Return offset of local zone from GMT, either at present or at time t."""
# python2.3 localtime() can't take None
if t is None:
t = time.time()
if time.localtime(t).tm_isdst and time.daylight:
return -time.altzone
else:
return -time.timezone
offset = local_time_offset() // 3600
sign = "+" if offset >= 0 else "-"
offset = abs(offset)
offset_str = str(offset)
offset_str = offset_str if len(offset_str) == 2 else f"0{offset_str}"
settings = {'TIMEZONE': f'{sign}{offset_str}00'}
all_chat = False
all_chat_delta = None
chats = []
times = []
helpmsg = """
在指定的时间后删除自己的消息
默认在当前群聊中启用也可以设置为全部私聊群聊和频道通用重叠时前者优先级更高
PagerMaid 重启后将失效
i.e.
-autorm 4 seconds
-autorm 1 minutes
-autorm global 1 minutes
取消删除自己的消息
取消当前群 -autorm cancel
取消全局 -autorm global cancel
取消所有群和全局 -autorm cancelall
"""
@listener(outgoing=True, ignore_edited=True)
async def remove_message(context):
""" Event handler to infinitely remove messages. """
try:
text = context.message.text if context.message.text else ""
chat_id = context.chat_id
if chats.count(chat_id) != 0:
index = chats.index(chat_id)
delta = times[index]
if text.startswith("-autorm"):
context.arguments = text.lstrip("-autorm").lstrip()
await autorm(context)
await asyncio.sleep(delta)
await context.delete()
return
elif all_chat:
delta = all_chat_delta
if text.startswith("-autorm"):
context.arguments = text.lstrip("-autorm").lstrip()
await autorm(context)
await asyncio.sleep(delta)
await context.delete()
return
except Exception as e:
await sendmsg(context, await context.get_chat(), str(e))
@listener(is_plugin=True, outgoing=True, command="autorm", diagnostics=True, ignore_edited=False,
description=helpmsg,
parameters="<time>")
async def autorm_wrap(context):
return await autorm(context)
async def autorm(context):
try:
global all_chat, all_chat_delta, chats, times
chat = await context.get_chat()
chat_id = context.chat_id
args = context.arguments if context.arguments is not None else ""
args = args.strip()
if not imported:
await edit(context, "Please install dateparser first: python3 -m pip install dateparser")
return
if len(args) == 0:
await edit(context, "参数不能为空。使用 -help autorm 以查看帮助。")
return
if args.find("cancel") == -1 and not any(char.isdigit() for char in args):
await edit(context, "指定的参数中似乎没有时间长度。")
return
if args.find("cancel") == -1 and all(char.isdigit() for char in args):
await edit(context, "请指定时间长度的单位。")
return
if args.find(":") != -1 or args.find("-") != -1:
await edit(context, "请使用相对时间长度,而非绝对时间长度:不能含 : 或 -。")
return
if args.startswith("global"):
time_str = args[7:].strip()
if time_str.startswith("cancel"):
if all_chat == False:
await edit(context, "当前未开启全部群自动删除消息。")
else:
all_chat = False
all_chat_delta = None
await edit(context, "成功为所有群取消自动删除消息。")
return
dt = dateparser.parse(time_str, settings=settings)
if dt is None:
await edit(context, "无法解析所指定的时间长度。")
return
delta = time.time() - dt.timestamp()
if delta <= 3:
await edit(context, "所指定的时间长度过短。")
return
all_chat = True
all_chat_delta = delta
await edit(context, "已成功启用全局自动删除消息。")
return
if args.startswith("cancel"):
if args.startswith("cancelall"):
if all_chat == False and len(chats) == 0:
await edit(context, "似乎没有什么可以取消的。")
else:
all_chat = False
all_chat_delta = None
chats = []
times = []
await edit(context, "已取消全部先前创建的自动删除消息服务。")
return
if chats.count(chat_id) == 0:
await edit(context, "当前群未开启自动删除消息。")
return
index = chats.index(chat_id)
chats.pop(index)
times.pop(index)
await edit(context, "成功为当前群取消自动删除消息。")
return
dt = dateparser.parse(args, settings=settings)
if dt is None:
await edit(context, "无法解析所指定的时间长度。")
return
delta = time.time() - dt.timestamp()
if delta <= 3:
await edit(context, "所指定的时间长度过短。")
return
if chats.count(chat_id) != 0:
index = chats.index(chat_id)
chats.pop(index)
times.pop(index)
chats.append(chat_id)
times.append(delta)
await edit(context, "成功在当前群启用自动删除消息。")
return
except Exception as e:
await edit(context, f"Error: {str(e)}")
return
async def edit(context, text):
chat = await context.get_chat()
try:
return await context.edit(text)
except Exception as e:
if str(e).find("you can't do that operation") == -1:
stack = "\n".join(traceback.format_stack())
return await sendmsg(context, chat, f"{text} {str(e)} {stack}")
else:
return context
async def sendmsg(context, chat, origin_text):
text = origin_text.strip()
msg = await context.client.send_message(chat, text)
return msg

2
dme.py
View File

@ -7,7 +7,7 @@ from pagermaid import redis, log, redis_status
from pagermaid.listener import listener
@listener(outgoing=True, command="dme",
@listener(is_plugin=True, outgoing=True, command="dme",
description="编辑并删除当前对话您发送的特定数量的消息。限制:基于消息 ID 的 1000 条消息,大于 1000 条可能会触发删除消息过快限制。入群消息非管理员无法删除。(倒序)当数字足够大时即可实现删除所有消息。",
parameters="<数量> [文本]")
async def selfprune(context):

333
sendat.py Normal file
View File

@ -0,0 +1,333 @@
""" Send Msg At A Specified Time. """
# By tg @fruitymelon
# extra requirements: dateparser
imported = True
import os, sys, time, traceback
try:
import dateparser
except ImportError:
imported = False
import asyncio
from pagermaid import log
from pagermaid.listener import listener
from datetime import datetime
from dateutil import parser
def logsync(message):
sys.stdout.writelines(f"{message}\n")
logsync("sendat: loading... If failed, please install dateparser first.")
# https://stackoverflow.com/questions/1111056/get-time-zone-information-of-the-system-in-python
def local_time_offset(t=None):
"""Return offset of local zone from GMT, either at present or at time t."""
# python2.3 localtime() can't take None
if t is None:
t = time.time()
if time.localtime(t).tm_isdst and time.daylight:
return -time.altzone
else:
return -time.timezone
offset = local_time_offset() // 3600
sign = "+" if offset >= 0 else "-"
offset = abs(offset)
offset_str = str(offset)
offset_str = offset_str if len(offset_str) == 2 else f"0{offset_str}"
settings = {'TIMEZONE': f'{sign}{offset_str}00'}
logsync(f"sendat: local time zone offset is {sign}{abs(offset)}")
mem = []
helpmsg = """
定时发送消息
-sendat 时间 | 消息内容
i.e.
-sendat 16:00:00 | 投票截止
-sendat every 23:59:59 | 又是无所事事的一天呢
-sendat every 1 minutes | 又过去了一分钟
-sendat *3 1 minutes | 此消息将出现三次间隔均为一分钟
根据 Registered id删除已注册的 timer
-sendat rm 2
此命令可与 autorespond也是我写的插件, ghost, deny 命令串联
-sendat 10 minutes | -autorespond 我暂时有事离开一下
"""
@listener(is_plugin=True, outgoing=True, command="sendat", diagnostics=True, ignore_edited=True,
description=helpmsg,
parameters="<atmsg>")
async def sendatwrap(context):
await sendat(context)
async def sendat(context):
if not context.parameter:
await context.edit(helpmsg)
return
mem_id = len(mem)
mem.append("[NULL]")
chat = await context.get_chat()
args = " ".join(context.parameter).split("|")
if not imported:
await context.edit("Please install dateparser first: python3 -m pip install dateparser\nAnd restart your pagermaid process")
return
await context.edit(f"tz data: {time.timezone} {time.tzname} {sign}{offset}")
if len(args) != 2:
if args[0].find("rm ") == 0:
# clear timer
target_ids = args[0][3:].strip().split(" ")
for target_id in target_ids:
if target_id.isnumeric():
if len(mem) > int(target_id):
mem[int(target_id)] = ""
await context.edit(f"id {target_id} removed.")
else:
await context.edit("id out of range.")
return
else:
await context.edit("id should be a integer.")
return
else:
await context.edit("Invalid argument counts. Expecting: 2")
return
if offset is None:
await context.edit("Failed to get your server's timezone.")
return
try:
if args[0].find("every ") == 0:
# at this point, let's assume args[0] contains relative time
# i.e. -sendat every 3 minutes
time_str = args[0][6:]
if time_str.find(":") != -1:
# then it should be absolute time
sleep_times = [abs(dateparser.parse(time_str, settings=settings).timestamp() - time.time()), 24 * 60 * 60]
index = 0
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while True and mem[mem_id] != "":
last_time = time.time()
while time.time() < last_time + sleep_times[index] and mem[mem_id] != "":
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
index = 1
mem[mem_id] = ""
return
sleep_time = abs(dateparser.parse(time_str, settings=settings).timestamp() - time.time())
if sleep_time < 5:
await context.edit(f"Sleep time too short. Should be longer than 5 seconds. Got {sleep_time}")
return
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while True and mem[mem_id] != "":
last_time = time.time()
while time.time() < last_time + sleep_time and mem[mem_id] != "":
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
return
elif args[0].find("*") == 0:
times = int(args[0][1:].split(" ")[0])
rest = " ".join(args[0][1:].split(" ")[1:])
if rest.find(":") != -1:
# then it should be absolute time
sleep_times = [abs(dateparser.parse(rest, settings=settings).timestamp() - time.time()), 24 * 60 * 60]
count = 0
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while count <= times and mem[mem_id] != "":
last_time = time.time()
while time.time() < last_time + sleep_times[0 if count == 0 else 1] and mem[mem_id] != "":
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
count += 1
mem[mem_id] = ""
return
sleep_time = abs(dateparser.parse(rest, settings=settings).timestamp() - time.time())
if sleep_time < 5:
await context.edit("Sleep time too short. Should be longer than 5 seconds.")
return
count = 0
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while count <= times and mem[mem_id] != "":
last_time = time.time()
while time.time() < last_time + sleep_time and mem[mem_id] != "":
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
count += 1
mem[mem_id] = ""
return
if args[0].find(":") == -1:
# relative time
dt = dateparser.parse(args[0])
delta = time.time() - dt.timestamp()
if delta < 3:
await context.edit("Target time before now.")
return
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while dt.timestamp() + 2*delta > time.time() and mem[mem_id] != "":
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
return
# absolute time
dt = parser.parse(args[0])
delta = dt.timestamp() - time.time()
if delta < 3:
await context.edit("Target time before now.")
return
mem[mem_id] = "|".join(args)
await context.edit(f"Registered: id {mem_id}. You can use this id to cancel the timer.")
while delta > 0 and mem[mem_id] != "":
delta = dt.timestamp() - time.time()
await asyncio.sleep(2)
if mem[mem_id] != "":
await sendmsg(context, chat, args[1])
mem[mem_id] = ""
except Exception as e:
await log(str(e))
await log(str(traceback.format_stack()))
return
@listener(outgoing=True, command="sendatdump", diagnostics=True, ignore_edited=True,
description="导出并转储内存中的 sendat 配置")
async def sendatdump(context):
clean_mem = mem[:]
if clean_mem.count("[NULL]") != 0:
clean_mem.remove("[NULL]")
if clean_mem.count("") != 0:
clean_mem.remove("")
await context.edit(".\n-sendat " + "\n-sendat ".join(clean_mem))
@listener(outgoing=True, command="sendatparse", diagnostics=True, ignore_edited=True,
description="导入已导出的 sendat 配置")
async def sendatparse(context):
chat = await context.get_chat()
text = "\n".join(context.message.text.split("\n")[1:])
if text == "":
return
if text.find(".\n") == 0:
text = "\n".join(text.split("\n")[1:])
lines = text.split("\n")
for i in range(len(lines)):
line = lines[i]
sent = await sendmsg(context, chat, line)
sent.parameter = line.replace("-sendat ", "").split(" ")
await sendat(sent)
""" Modified pagermaid autorespond plugin. """
from telethon.events import StopPropagation
from pagermaid import persistent_vars
async def autorespond(context):
""" Enables the auto responder. """
message = "我还在睡觉... ZzZzZzZzZZz"
if context.arguments:
message = context.arguments
await context.edit("成功启用自动响应器。")
await log(f"启用自动响应器,将自动回复 `{message}`.")
persistent_vars.update({'autorespond': {'enabled': True, 'message': message, 'amount': 0}})
from pagermaid import redis, redis_status
async def ghost(context):
""" Toggles ghosting of a user. """
if not redis_status():
await context.edit("出错了呜呜呜 ~ Redis 好像离线了,无法执行命令。")
return
if len(context.parameter) != 1:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
myself = await context.client.get_me()
self_user_id = myself.id
if context.parameter[0] == "true":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.set("ghosted.chat_id." + str(context.chat_id), "true")
await context.delete()
await log(f"已成功将 ChatID {str(context.chat_id)} 添加到自动已读对话列表中。")
elif context.parameter[0] == "false":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.delete("ghosted.chat_id." + str(context.chat_id))
await context.delete()
await log(f"已成功将 ChatID {str(context.chat_id)} 从自动已读对话列表中移除。")
elif context.parameter[0] == "status":
if redis.get("ghosted.chat_id." + str(context.chat_id)):
await context.edit("emm...当前对话已被加入自动已读对话列表中。")
else:
await context.edit("emm...当前对话已从自动已读对话列表中移除。")
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
async def deny(context):
""" Toggles denying of a user. """
if not redis_status():
await context.edit("出错了呜呜呜 ~ Redis 离线,无法运行。")
return
if len(context.parameter) != 1:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
myself = await context.client.get_me()
self_user_id = myself.id
if context.parameter[0] == "true":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.set("denied.chat_id." + str(context.chat_id), "true")
await context.delete()
await log(f"ChatID {str(context.chat_id)} 已被添加到自动拒绝对话列表中。")
elif context.parameter[0] == "false":
if context.chat_id == self_user_id:
await context.edit("在?为什么要在收藏夹里面用?")
return
redis.delete("denied.chat_id." + str(context.chat_id))
await context.delete()
await log(f"ChatID {str(context.chat_id)} 已从自动拒绝对话列表中移除。")
elif context.parameter[0] == "status":
if redis.get("denied.chat_id." + str(context.chat_id)):
await context.edit("emm...当前对话已被加入自动拒绝对话列表中。")
else:
await context.edit("emm...当前对话已从自动拒绝对话列表移除。")
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
async def sendmsg(context, chat, origin_text):
text = origin_text.strip()
msg = await context.client.send_message(chat, text)
if text.find("-autorespond") == 0:
msg.arguments = text.replace("-autorespond", "").lstrip()
await autorespond(msg)
elif text.find("-ghost ") == 0:
msg.parameter = [text[7:]]
await ghost(msg)
elif text.find("-deny ") == 0:
msg.parameter = [text[6:]]
await deny(msg)
return msg

50
weather.py Normal file
View File

@ -0,0 +1,50 @@
import json
from requests import get
from pagermaid.listener import listener
from pagermaid.utils import obtain_message
icons = {
"01d": "🌞",
"01n": "🌚",
"02d": "⛅️",
"02n": "⛅️",
"03d": "☁️",
"03n": "☁️",
"04d": "☁️",
"04n": "☁️",
"09d": "🌧",
"09n": "🌧",
"10d": "🌦",
"10n": "🌦",
"11d": "🌩",
"11n": "🌩",
"13d": "🌨",
"13n": "🌨",
"50d": "🌫",
"50n": "🌫",
}
@listener(is_plugin=True, outgoing=True, command="weather",
description="查询天气",
parameters="<城市>")
async def weather(context):
await context.edit("获取中 . . .")
try:
message = await obtain_message(context)
except ValueError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
req = get("http://api.openweathermap.org/data/2.5/weather?appid=973e8a21e358ee9d30b47528b43a8746&units=metric&lang=zh_cn&q=" + message)
if req.status_code == 200:
data = json.loads(req.text)
cityName = "{}, {}".format(data["name"], data["sys"]["country"])
tempInC = round(data["main"]["temp"], 2)
tempInF = round((1.8 * tempInC) + 32, 2)
icon = data["weather"][0]["icon"]
desc = data["weather"][0]["description"]
res = "{}\n🌡{}℃ ({}F)\n{} {}".format(
cityName, tempInC, tempInF, icons[icon], desc
)
await context.edit(res)
else:
await context.edit("出错了呜呜呜 ~ 无法访问到 openweathermap.org 。")

390
xtao-some.py Normal file
View File

@ -0,0 +1,390 @@
""" Pagermaid plugin base. """
import json, requests, re
from googletrans import Translator
from urllib.parse import urlparse
from pagermaid import bot, log
from pagermaid.listener import listener, config
from pagermaid.utils import clear_emojis, obtain_message, attach_log
from telethon.tl.types import ChannelParticipantsAdmins
from os import remove
@listener(is_plugin=True, outgoing=True, command="guess",
description="能不能好好说话? - 拼音首字母缩写释义工具(需要回复一句话)")
async def guess(context):
reply = await context.get_reply_message()
await context.edit("获取中 . . .")
if reply:
pass
else:
context.edit("宁需要回复一句话")
return True
text = {'text': str(reply.message.replace("/guess ", "").replace(" ", ""))}
guess_json = json.loads(
requests.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text, verify=False).content.decode("utf-8"))
guess_res = []
if not len(guess_json) == 0:
for num in range(0, len(guess_json)):
guess_res1 = json.loads(json.dumps(guess_json[num]))
guess_res1_name = guess_res1['name']
try:
guess_res1_ans = ", ".join(guess_res1['trans'])
except:
try:
guess_res1_ans = ", ".join(guess_res1['inputting'])
except:
guess_res1_ans = "尚未录入"
guess_res.extend(["词组:" + guess_res1_name + "\n释义:" + guess_res1_ans])
await context.edit("\n\n".join(guess_res))
else:
await context.edit("没有匹配到拼音首字母缩写")
@listener(is_plugin=True, outgoing=True, command="admin",
description="一键 AT 本群管理员(仅在群组中有效)")
async def admin(context):
await context.edit('正在获取管理员列表中...')
chat = await context.get_chat()
try:
admins = await context.client.get_participants(chat, filter=ChannelParticipantsAdmins)
except:
await context.edit('请在群组中运行。')
return True
admin_list = []
for admin in admins:
if admin.first_name is not None:
admin_list.extend(['[' + admin.first_name + '](tg://user?id=' + str(admin.id) + ')'])
await context.edit(' , '.join(admin_list))
@listener(is_plugin=True, outgoing=True, command="wiki",
description="查询维基百科词条",
parameters="<词组>")
async def wiki(context):
translator = Translator()
lang = config['application_language']
await context.edit("获取中 . . .")
try:
message = await obtain_message(context)
except ValueError:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
try:
wiki_json = json.loads(requests.get("https://zh.wikipedia.org/w/api.php?action=query&list=search&format=json&formatversion=2&srsearch=" + message).content.decode(
"utf-8"))
except:
await context.edit("出错了呜呜呜 ~ 无法访问到维基百科。")
return
if not len(wiki_json['query']['search']) == 0:
wiki_title = wiki_json['query']['search'][0]['title']
wiki_content = wiki_json['query']['search'][0]['snippet'].replace('<span class=\"searchmatch\">', '**').replace('</span>', '**')
wiki_time = wiki_json['query']['search'][0]['timestamp'].replace('T', ' ').replace('Z', ' ')
try:
await context.edit("正在生成翻译中 . . .")
wiki_content = translator.translate(clear_emojis(wiki_content), dest=lang)
message = '词条: [' + wiki_title + '](https://zh.wikipedia.org/zh-cn/' + wiki_title + ')\n\n' + wiki_content.text + '...\n\n此词条最后修订于 ' + wiki_time
except ValueError:
await context.edit("出错了呜呜呜 ~ 找不到目标语言,请更正配置文件中的错误。")
return
await context.edit(message)
else:
await context.edit("没有匹配到相关词条")
@listener(is_plugin=True, outgoing=True, command="ip",
description="IPINFO (或者回复一句话)",
parameters="<ip/域名>")
async def ipinfo(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = json.loads(requests.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,lon,isp,org,as,mobile,proxy,hosting,query").content.decode(
"utf-8"))
if ipinfo_json['status'] == 'fail':
pass
elif ipinfo_json['status'] == 'success':
ipinfo_list = []
ipinfo_list.extend(["查询目标: `" + url + "`"])
if ipinfo_json['query'] == url:
pass
else:
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
ipinfo_json['city'] + "`"])
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
ipinfo_list.extend(["ISP `" + ipinfo_json['isp'] + "`"])
if not ipinfo_json['org'] == '':
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
try:
ipinfo_list.extend(
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
except:
pass
if ipinfo_json['mobile']:
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
if ipinfo_json['proxy']:
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
if ipinfo_json['hosting']:
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
await context.edit('\n'.join(ipinfo_list))
return True
else:
url = urlparse(context.arguments)
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = json.loads(requests.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,lon,isp,org,as,mobile,proxy,hosting,query").content.decode(
"utf-8"))
if ipinfo_json['status'] == 'fail':
pass
elif ipinfo_json['status'] == 'success':
ipinfo_list = []
if url == '':
ipinfo_list.extend(["查询目标: `本机地址`"])
else:
ipinfo_list.extend(["查询目标: `" + url + "`"])
if ipinfo_json['query'] == url:
pass
else:
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
ipinfo_json['city'] + "`"])
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
ipinfo_list.extend(["ISP `" + ipinfo_json['isp'] + "`"])
if not ipinfo_json['org'] == '':
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
try:
ipinfo_list.extend(
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
except:
pass
if ipinfo_json['mobile']:
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
if ipinfo_json['proxy']:
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
if ipinfo_json['hosting']:
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
await context.edit('\n'.join(ipinfo_list))
return True
await context.edit('没有找到要查询的 ip/域名 ...')
except:
await context.edit('没有找到要查询的 ip/域名 ...')
@listener(is_plugin=True, outgoing=True, command="ipping",
description="Ping (或者回复一句话)",
parameters="<ip/域名>")
async def ipping(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
if url.hostname:
url = url.hostname
else:
url = url.path
pinginfo = requests.get(
"https://helloacm.com/api/ping/?cached&host=" + url).content.decode(
"utf-8")
if pinginfo == 'null':
pass
elif not pinginfo == 'null':
pinginfo = pinginfo.replace('"', '').replace("\/", '/').replace('\\n', '\n', 7).replace('\\n', '')
await context.edit(pinginfo)
return True
else:
url = urlparse(context.arguments)
if url.hostname:
url = url.hostname
else:
url = url.path
if url == '':
await context.edit('没有找到要查询的 ip/域名 ...')
return True
pinginfo = requests.get(
"https://helloacm.com/api/ping/?cached&host=" + url).content.decode(
"utf-8")
if pinginfo == 'null':
pass
elif not pinginfo == 'null':
pinginfo = pinginfo.replace('"', '').replace("\/", '/').replace('\\n', '\n', 7).replace('\\n', '')
await context.edit(pinginfo)
return True
await context.edit('没有找到要查询的 ip/域名 ...')
except:
await context.edit('没有找到要查询的 ip/域名 ...')
@listener(is_plugin=True, outgoing=True, command="pixiv",
description="查询插画信息 (或者回复一条消息)",
parameters="[<图片链接>] <图片序号>")
async def pixiv(context):
reply = await context.get_reply_message()
await context.edit('正在查询中...')
try:
if reply:
try:
if context.arguments.strip() == '':
pixiv_page = 1
else:
try:
pixiv_page = int(context.arguments.strip())
except:
await context.edit('呜呜呜出错了...可能参数不是数字')
return True
except:
pass
for num in range(0, len(reply.entities)):
url = reply.message[reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
url = urlparse(url)
try:
url = str(re.findall(r"\d+\.?\d*", url.path)[0])
pixiv_json = json.loads(requests.get(
"https://api.imjad.cn/pixiv/v2/?type=illust&id=" + url).content.decode(
"utf-8"))
except:
await context.edit('呜呜呜出错了...可能是链接不上 API 服务器')
return True
try:
pixiv_tag = pixiv_json['error']['user_message']
await context.edit('没有找到要查询的 pixiv 作品...')
return True
except:
if pixiv_page > pixiv_json['illust']['page_count']:
await context.edit('呜呜呜出错了...可能是参数指定的页数大于插画页数')
return True
else:
pass
pixiv_tag = []
pixiv_num = str(pixiv_json['illust']['page_count'])
pixiv_list = '[' + pixiv_json['illust']['title'] + '](https://www.pixiv.net/artworks/' + str(
pixiv_json['illust']['id']) + ')' + ' (' + str(pixiv_page) + '/' + pixiv_num + ')'
for nums in range(0, len(pixiv_json['illust']['tags'])):
pixiv_tag.extend(['#' + pixiv_json['illust']['tags'][nums]['name']])
try:
await context.edit('正在下载图片中 ...')
try:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_single_page']['original_image_url'])
except:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_pages'][pixiv_page - 1]['image_urls']['original'])
with open("pixiv.jpg", "wb") as code:
code.write(r.content)
await context.edit('正在上传图片中 ...')
await context.client.send_file(context.chat_id, 'pixiv.jpg',
caption=pixiv_list + '\nTags: ' + ' , '.join(pixiv_tag),
reply_to=reply.id)
await context.delete()
remove('pixiv.jpg')
except:
pass
return True
else:
try:
url = urlparse(context.arguments.split()[0])
if len(context.arguments.split()) == 1:
pixiv_page = 1
else:
try:
pixiv_page = int(context.arguments.split()[1])
except:
await context.edit('呜呜呜出错了...可能参数不是数字')
return True
except:
pass
try:
url = str(re.findall(r"\d+\.?\d*", url.path)[0])
pixiv_json = json.loads(requests.get(
"https://api.imjad.cn/pixiv/v2/?type=illust&id=" + url).content.decode(
"utf-8"))
except:
await context.edit('呜呜呜出错了...可能是链接不上 API 服务器')
try:
pixiv_tag = pixiv_json['error']['user_message']
await context.edit('没有找到要查询的 pixiv 作品...')
return True
except:
if pixiv_page > pixiv_json['illust']['page_count']:
await context.edit('呜呜呜出错了...可能是参数指定的页数大于插画页数')
return True
else:
pass
pixiv_tag = []
pixiv_num = str(pixiv_json['illust']['page_count'])
pixiv_list = '[' + pixiv_json['illust']['title'] + '](https://www.pixiv.net/artworks/' + str(
pixiv_json['illust']['id']) + ')' + ' (' + str(pixiv_page) + '/' + pixiv_num + ')'
for nums in range(0, len(pixiv_json['illust']['tags'])):
pixiv_tag.extend(['#' + pixiv_json['illust']['tags'][nums]['name']])
try:
await context.edit('正在下载图片中 ...')
try:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_single_page']['original_image_url'])
except:
r = requests.get('https://daidr.me/imageProxy/?url=' +
pixiv_json['illust']['meta_pages'][pixiv_page - 1]['image_urls']['original'])
with open("pixiv.jpg", "wb") as code:
code.write(r.content)
await context.edit('正在上传图片中 ...')
await context.client.send_file(context.chat_id, 'pixiv.jpg',
caption=pixiv_list + '\nTags: ' + ' , '.join(pixiv_tag))
await context.delete()
remove('pixiv.jpg')
except:
pass
return True
await context.edit('没有找到要查询的 pixiv 作品 ...')
except:
await context.edit('没有找到要查询的 pixiv 作品 ...')
@listener(is_plugin=True, outgoing=True, command="t",
description="通过腾讯AI开放平台将目标消息翻译成指定的语言。",
parameters="<文本>")
async def tx_t(context):
""" PagerMaid universal translator. """
reply = await context.get_reply_message()
message = context.arguments
lang = 'zh'
if message:
pass
elif reply:
message = reply.text
else:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
try:
await context.edit("正在生成翻译中 . . .")
tx_json = json.loads(requests.get(
"https://xtaolink.cn/git/m/t.php?lang=" + lang + '&text=' + clear_emojis(message)).content.decode(
"utf-8"))
if not tx_json['msg'] == 'ok':
context.edit("出错了呜呜呜 ~ 翻译出错")
return True
else:
result = '文本翻译:\n' + tx_json['data']['target_text']
except ValueError:
await context.edit("出错了呜呜呜 ~ 找不到目标语言,请更正配置文件中的错误。")
return
if len(result) > 4096:
await context.edit("输出超出 TG 限制,正在尝试上传文件。")
await attach_log(result, context.chat_id, "translation.txt", context.id)
return
await context.edit(result)

57
yb-dl.py Normal file
View File

@ -0,0 +1,57 @@
""" Pagermaid plugin base. """
from os import remove
from os.path import exists
from youtube_dl import YoutubeDL
from re import compile as regex_compile
from pagermaid import bot, log
from pagermaid.listener import listener
@listener(outgoing=True, command="ybdl",
description="上传 Youtube、Bilibili 视频到 telegram",
parameters="<url>.")
async def ybdl(context):
url = context.arguments
reply = await context.get_reply_message()
reply_id = None
await context.edit("获取视频中 . . .")
if reply:
reply_id = reply.id
if url is None:
await context.edit("出错了呜呜呜 ~ 无效的参数。")
return
bilibili_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?bilibili(\.com)?/.+")
youtube_pattern = regex_compile(r"^(http(s)?://)?((w){3}.)?youtu(be|.be)?(\.com)?/.+")
if youtube_pattern.match(url):
if not await fetch_video(url, context.chat_id, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
await log(f"已拉取UTB视频地址 {url}.")
await context.edit("视频获取成功!")
if bilibili_pattern.match(url):
if not await fetch_video(url, context.chat_id, reply_id):
await context.edit("出错了呜呜呜 ~ 视频下载失败。")
await log(f"已拉取 Bilibili 视频,地址: {url}.")
await context.edit("视频获取成功!")
async def fetch_video(url, chat_id, reply_id):
""" Extracts and uploads YouTube video. """
youtube_dl_options = {
'format': 'bestvideo+bestaudio/best',
'outtmpl': "video.%(ext)s",
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4'
}]
}
YoutubeDL(youtube_dl_options).download([url])
if not exists("video.mp4"):
return False
await bot.send_file(
chat_id,
"video.mp4",
reply_to=reply_id
)
remove("video.mp4")
return True