MihoyoBBSTools/push.py

191 lines
5.5 KiB
Python
Raw Normal View History

2022-01-09 09:31:44 +00:00
import os
import time
import base64
import hashlib
import urllib
import hmac
2022-01-09 09:31:44 +00:00
from request import http
2022-01-11 07:54:47 +00:00
from loghelper import log
2022-01-09 09:31:44 +00:00
from configparser import ConfigParser
cfg = ConfigParser()
def load_config():
2022-01-09 10:57:42 +00:00
config_path = os.path.join(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config'), 'push.ini')
if os.path.exists(config_path):
cfg.read(config_path, encoding='utf-8')
return True
else:
return False
2022-01-09 09:31:44 +00:00
def title(status):
if status == 0:
return "「米游社脚本」执行成功!"
2022-04-25 08:07:37 +00:00
elif status == 1:
2022-01-09 09:31:44 +00:00
return "「米游社脚本」执行失败!"
2022-04-25 08:07:37 +00:00
elif status == 2:
return "「米游社脚本」部分账号执行失败!"
elif status == 3:
return "「米游社脚本」原神签到触发验证码!"
2022-01-09 09:31:44 +00:00
2022-04-20 05:00:29 +00:00
# telegram的推送
2022-01-25 07:03:53 +00:00
def telegram(status, push_message):
http.post(
url="https://{}/bot{}/sendMessage".format(cfg.get('telegram', 'api_url'), cfg.get('telegram', 'bot_token')),
data={
"chat_id": cfg.get('telegram', 'chat_id'),
"text": title(status) + "\r\n" + push_message
}
)
2022-04-20 05:00:29 +00:00
# server酱
2022-01-09 09:31:44 +00:00
def ftqq(status, push_message):
http.post(
url="https://sctapi.ftqq.com/{}.send".format(cfg.get('setting', 'push_token')),
data={
"title": title(status),
"desp": push_message
}
)
2022-04-20 05:00:29 +00:00
# pushplus
2022-01-09 09:31:44 +00:00
def pushplus(status, push_message):
http.post(
2022-04-20 05:00:29 +00:00
url="https://www.pushplus.plus/send",
2022-01-09 09:31:44 +00:00
data={
"token": cfg.get('setting', 'push_token'),
"title": title(status),
"content": push_message
}
)
2022-04-20 05:00:29 +00:00
# cq http协议的推送
2022-04-23 02:40:07 +00:00
def cqhttp(status, push_message):
2022-01-09 09:31:44 +00:00
http.post(
url=cfg.get('cqhttp', 'cqhttp_url'),
json={
"user_id": cfg.getint('cqhttp', 'cqhttp_qq'),
"message": title(status) + "\r\n" + push_message
}
)
2022-04-20 05:00:29 +00:00
# 企业微信 感谢linjie5492@github
2022-04-20 05:44:55 +00:00
def wecom(status, push_message):
2022-04-20 05:00:29 +00:00
secret = cfg.get('wecom', 'secret')
corpid = cfg.get('wecom', 'wechat_id')
try:
touser = cfg.get('wecom', 'touser')
except:
# 没有配置时赋默认值
touser = '@all'
2022-04-20 05:00:29 +00:00
push_token = http.post(
url=f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={secret}',
data=""
).json()['access_token']
2022-04-20 05:00:29 +00:00
push_data = {
"agentid": cfg.get('wecom', 'agentid'),
"msgtype": "text",
"touser": touser,
2022-04-20 05:00:29 +00:00
"text": {
"content": title(status) + "\r\n" + push_message
},
"safe": 0
}
2022-04-20 05:00:29 +00:00
http.post(f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={push_token}', json=push_data)
2022-04-20 05:44:55 +00:00
# pushdeer
def pushdeer(status, push_message):
http.get(
url=f'{cfg.get("pushdeer", "api_url")}/message/push',
params={
2022-04-21 09:08:41 +00:00
"pushkey": cfg.get("pushdeer", "token"),
2022-04-20 05:44:55 +00:00
"text": title(status),
"desp": str(push_message).replace("\r\n", "\r\n\r\n"),
"type": "markdown"
}
)
# 钉钉群机器人
def dingrobot(status, push_message):
api_url = cfg.get('dingrobot', 'webhook') # https://oapi.dingtalk.com/robot/send?access_token=XXX
secret = cfg.get('dingrobot', 'secret') # 安全设置 -> 加签 -> 密钥 -> SEC*
if secret:
timestamp = str(round(time.time() * 1000))
sign_string = f"{timestamp}\n{secret}"
hmac_code = hmac.new(
key=secret.encode("utf-8"),
msg=sign_string.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
api_url = f"{api_url}&timestamp={timestamp}&sign={sign}"
rep = http.post(
url=api_url,
headers={"Content-Type": "application/json; charset=utf-8"},
json={
"msgtype": "text", "text": {"content": title(status) + "\r\n" + push_message}
}
).json()
log.info(f"推送结果:{rep.get('errmsg')}")
2022-05-09 01:07:50 +00:00
# Bark
def bark(status, push_message):
2022-05-09 06:33:59 +00:00
rep = http.get(
2022-05-09 01:07:50 +00:00
url=f'{cfg.get("bark", "api_url")}/{cfg.get("bark", "token")}/{title(status)}/{push_message}'
2022-05-09 06:33:59 +00:00
).json()
log.info(f"推送结果:{rep.get('message')}")
2022-05-09 01:07:50 +00:00
2022-08-15 12:12:08 +00:00
# gotify
def gotify(status, push_message):
rep = http.post(
url=f'{cfg.get("gotify", "api_url")}/message?token={cfg.get("gotify", "token")}',
headers={"Content-Type": "application/json; charset=utf-8"},
json={
"title": title(status),
2022-08-20 01:49:22 +00:00
"message": push_message,
"priority": cfg.getint("gotify", "priority")
2022-08-15 12:12:08 +00:00
}
).json()
log.info(f"推送结果:{rep.get('errmsg')}")
2022-01-09 09:31:44 +00:00
def push(status, push_message):
2022-01-09 10:57:42 +00:00
if not load_config():
return 0
2022-01-09 09:31:44 +00:00
if cfg.getboolean('setting', 'enable'):
2022-01-11 07:54:47 +00:00
log.info("正在执行推送......")
func_name = cfg.get('setting', 'push_server').lower()
func = globals().get(func_name)
# print(func)
if not func:
log.warning("推送服务名称错误请检查config/push.ini -> [setting] -> push_server")
return 0
log.debug(f"推送所用的服务为:{func_name}")
2022-04-20 05:00:29 +00:00
try:
# eval(push_server[:10] + "(status, push_message)")
# 与面代码等效 20220508
func(status, push_message)
except Exception as r:
log.warning(f"推送执行错误:{str(r)}")
return 0
2022-04-20 05:00:29 +00:00
else:
log.info("推送完毕......")
return 1
if __name__ == "__main__":
push(0, f'推送验证{int(time.time())}')