gitlab-telegram-notice/app.py
2022-03-05 17:08:59 +08:00

108 lines
3.2 KiB
Python

from aioflask import Flask
from aioflask import request
from aioflask import jsonify
from ci import app as bot
app = Flask(__name__)
@app.route("/", methods=['GET', 'POST'])
async def webhook():
data = request.json
# json contains an attribute that differentiates between the types, see
# https://docs.gitlab.com/ce/user/project/integrations/webhooks.html
# for more infos
kind = data['object_kind']
if kind == 'push':
msg = generatePushMsg(data)
elif kind == 'tag_push':
msg = generatePushMsg(data) # TODO:Make own function for this
elif kind == 'issue':
msg = generateIssueMsg(data)
elif kind == 'note':
msg = generateCommentMsg(data)
elif kind == 'merge_request':
msg = generateMergeRequestMsg(data)
elif kind == 'wiki_page':
msg = generateWikiMsg(data)
elif kind == 'pipeline':
msg = generatePipelineMsg(data)
elif kind == 'build':
msg = generateBuildMsg(data)
else:
msg = "Unknown event."
await bot.send_message(-1001441461877, msg)
return jsonify({'status': 'ok'})
def generatePushMsg(data):
msg = f'<b>🔨 {data["total_commits_count"]} new commits to ' \
f'{data["project"]["name"]}:{data["ref"].replace("refs/heads/", "")}</b>:\n\n'
for commit in data['commits']:
tag_link = f'<a href="{commit["url"]}">{commit["id"][:7]}</a>'
msg = msg + f"{tag_link}: {commit['message'].rstrip()}\n"
return msg
def generateIssueMsg(data):
action = data['object_attributes']['action']
if action == 'open':
assignees = ''
for assignee in data.get('assignees', []):
assignees += assignee['name'] + ' '
msg = '*{0}* new issue for *{1}*:\n' \
.format(data['project']['name'], assignees)
elif action == 'reopen':
assignees = ''
for assignee in data.get('assignees', []):
assignees += assignee['name'] + ' '
msg = '*{0}* issue re-opened for *{1}*:\n' \
.format(data['project']['name'], assignees)
elif action == 'close':
msg = '*{0}* issue closed by *{1}*:\n' \
.format(data['project']['name'], data['user']['name'])
elif action == 'update':
assignees = ''
for assignee in data.get('assignees', []):
assignees += assignee['name'] + ' '
msg = '*{0}* issue assigned to *{1}*:\n' \
.format(data['project']['name'], assignees)
else:
msg = ""
msg = msg + '[{0}]({1})' \
.format(data['object_attributes']['title'], data['object_attributes']['url'])
return msg
def generateCommentMsg(data):
ntype = data['object_attributes']['noteable_type']
if ntype == 'Commit':
msg = 'note to commit'
elif ntype == 'MergeRequest':
msg = 'note to MergeRequest'
elif ntype == 'Issue':
msg = 'note to Issue'
elif ntype == 'Snippet':
msg = 'note on code snippet'
else:
msg = ""
return msg
def generateMergeRequestMsg(data):
return 'new MergeRequest'
def generateWikiMsg(data):
return 'new wiki stuff'
def generatePipelineMsg(data):
return 'new pipeline stuff'
def generateBuildMsg(data):
return 'new build stuff'