Merge pull request #110 from TingHsi/master

钉钉机器人支持设置加签密钥
This commit is contained in:
Womsxd 2022-05-09 06:17:56 +08:00 committed by GitHub
commit e37c354733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 16 deletions

View File

@ -98,3 +98,29 @@
`5`对应大别墅 `5`对应大别墅
`6`对应崩坏:星穹铁道 `6`对应崩坏:星穹铁道
# push.ini配置教程
* push_server 可选范围 cqhttp ftqq(sever酱) pushplus telegram dingrobot
## dingrobot
钉钉群机器人
**webhook**填写**Webhook**地址
**secret**填写**安全设置**中**加签**的密钥,此选项为可选项
填写示例
```ini
[setting]
enable=true
push_server=dingrobot
[dingrobot]
webhook=https://oapi.dingtalk.com/robot/send?access_token=XXX
secret=
```

View File

@ -27,4 +27,5 @@ api_url=https://api2.pushdeer.com
token= token=
[dingrobot] [dingrobot]
api_url=https://oapi.dingtalk.com/robot/send webhook=https://oapi.dingtalk.com/robot/send?access_token=XXX
secret=

52
push.py
View File

@ -1,4 +1,9 @@
import os import os
import time
import base64
import hashlib
import urllib
import hmac
from request import http from request import http
from loghelper import log from loghelper import log
from configparser import ConfigParser from configparser import ConfigParser
@ -98,36 +103,53 @@ def pushdeer(status, push_message):
} }
) )
# 钉钉群机器人
# dingding robot
def dingrobot(status, push_message): def dingrobot(status, push_message):
# print('dingrobot', title(status) + "\r\n" + push_message) api_url = cfg.get('dingrobot', 'webhook') # https://oapi.dingtalk.com/robot/send?access_token=XXX
access_token=cfg.get('setting', 'push_token') secret = cfg.get('dingrobot', 'secret') # 安全设置 -> 加签 -> 密钥 -> SEC*
# print(f"https://oapi.dingtalk.com/robot/send?access_token={access_token}")
if secret:
timestamp = str(round(time.time() * 1000))
sign_string = f"{timestamp}\n{secret}"
hmac_code = hmac.new(
key=secret.encode("utf-8"),
msg=sign_string.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
api_url = f"{api_url}&timestamp={timestamp}&sign={sign}"
rep = http.post( rep = http.post(
url=f"https://oapi.dingtalk.com/robot/send?access_token={access_token}", url=api_url,
headers={"Content-Type": "application/json; charset=utf-8"}, headers={"Content-Type": "application/json; charset=utf-8"},
json={ json={
"msgtype": "text", "text": { "content": title(status) + "\r\n" + push_message } "msgtype": "text", "text": { "content": title(status) + "\r\n" + push_message }
} }
).json() ).json()
print(rep) log.info(f"推送结果:{rep.get('errmsg')}")
def push(status, push_message): def push(status, push_message):
if not load_config(): if not load_config():
return 0 return 0
if cfg.getboolean('setting', 'enable'): if cfg.getboolean('setting', 'enable'):
push_server = cfg.get('setting', 'push_server').lower()
log.info("正在执行推送......") log.info("正在执行推送......")
func_name = cfg.get('setting', 'push_server').lower()
func = globals().get(func_name)
# print(func)
if not func:
log.warning("推送服务名称错误请检查config/push.ini -> [setting] -> push_server")
return 0
log.debug(f"推送所用的服务为:{func_name}")
try: try:
log.debug(f"推送所用的服务为:{push_server}") # eval(push_server[:10] + "(status, push_message)")
eval(push_server[:10] + "(status, push_message)") # 与面代码等效 20220508
except NameError: func(status, push_message)
log.warning("推送服务名称错误") except:
log.warning("推送执行错误")
return 0
else: else:
log.info("推送完毕......") log.info("推送完毕......")
return 0 return 1
if __name__ == "__main__": if __name__ == "__main__":
push(0, '推送正文') push(0, f'推送验证{int(time.time())}')