from aioflask import Flask from aioflask import request from aioflask import jsonify from ci import app as bot app = Flask(__name__) @app.route("/", methods=['GET', 'POST']) async def webhook(): data = request.json # json contains an attribute that differentiates between the types, see # https://docs.gitlab.com/ce/user/project/integrations/webhooks.html # for more infos kind = data['object_kind'] if kind == 'push': msg = generatePushMsg(data) elif kind == 'tag_push': msg = generatePushMsg(data) # TODO:Make own function for this elif kind == 'issue': msg = generateIssueMsg(data) elif kind == 'note': msg = generateCommentMsg(data) elif kind == 'merge_request': msg = generateMergeRequestMsg(data) elif kind == 'wiki_page': msg = generateWikiMsg(data) elif kind == 'pipeline': msg = generatePipelineMsg(data) elif kind == 'build': msg = generateBuildMsg(data) else: msg = "Unknown event." await bot.send_message(-1001441461877, msg) return jsonify({'status': 'ok'}) def generatePushMsg(data): msg = f'🔨 {data["total_commits_count"]} new commits to ' \ f'{data["project"]["name"]}:{data["ref"].replace("refs/heads/", "")}:\n\n' for commit in data['commits']: tag_link = f'{commit["id"][:7]}' msg = msg + f"{tag_link}: {commit['message'].rstrip()}\n" return msg def generateIssueMsg(data): action = data['object_attributes']['action'] if action == 'open': assignees = '' for assignee in data.get('assignees', []): assignees += assignee['name'] + ' ' msg = '*{0}* new issue for *{1}*:\n' \ .format(data['project']['name'], assignees) elif action == 'reopen': assignees = '' for assignee in data.get('assignees', []): assignees += assignee['name'] + ' ' msg = '*{0}* issue re-opened for *{1}*:\n' \ .format(data['project']['name'], assignees) elif action == 'close': msg = '*{0}* issue closed by *{1}*:\n' \ .format(data['project']['name'], data['user']['name']) elif action == 'update': assignees = '' for assignee in data.get('assignees', []): assignees += assignee['name'] + ' ' msg = '*{0}* issue assigned to *{1}*:\n' \ .format(data['project']['name'], assignees) else: msg = "" msg = msg + '[{0}]({1})' \ .format(data['object_attributes']['title'], data['object_attributes']['url']) return msg def generateCommentMsg(data): ntype = data['object_attributes']['noteable_type'] if ntype == 'Commit': msg = 'note to commit' elif ntype == 'MergeRequest': msg = 'note to MergeRequest' elif ntype == 'Issue': msg = 'note to Issue' elif ntype == 'Snippet': msg = 'note on code snippet' else: msg = "" return msg def generateMergeRequestMsg(data): return 'new MergeRequest' def generateWikiMsg(data): return 'new wiki stuff' def generatePipelineMsg(data): return 'new pipeline stuff' def generateBuildMsg(data): return 'new build stuff'