import re from functools import partial from inspect import signature from typing import Any, Dict, List, Optional from django.http import HttpRequest, HttpResponse from zerver.decorator import webhook_view from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success from zerver.lib.validator import check_bool from zerver.lib.webhooks.common import ( check_send_webhook_message, validate_extract_webhook_http_header, ) from zerver.lib.webhooks.git import ( EMPTY_SHA, TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE, get_commits_comment_action_message, get_issue_event_message, get_pull_request_event_message, get_push_commits_event_message, get_push_tag_event_message, get_remove_branch_event_message, ) from zerver.models import UserProfile def fixture_to_headers(fixture_name: str) -> Dict[str, Any]: if fixture_name.startswith("build"): return {} # Since there are 2 possible event types. # Map "push_hook__push_commits_more_than_limit.json" into GitLab's # HTTP event title "Push Hook". return {"HTTP_X_GITLAB_EVENT": fixture_name.split("__")[0].replace("_", " ").title()} def get_push_event_body(payload: Dict[str, Any]) -> str: if payload.get("after") == EMPTY_SHA: return get_remove_branch_event_body(payload) return get_normal_push_event_body(payload) def get_normal_push_event_body(payload: Dict[str, Any]) -> str: compare_url = "{}/compare/{}...{}".format( get_project_homepage(payload), payload["before"], payload["after"], ) commits = [ { "name": commit.get("author").get("name"), "sha": commit.get("id"), "message": commit.get("message"), "url": commit.get("url"), } for commit in payload["commits"] ] return get_push_commits_event_message( get_user_name(payload), compare_url, get_branch_name(payload), commits, ) def get_remove_branch_event_body(payload: Dict[str, Any]) -> str: return get_remove_branch_event_message( get_user_name(payload), get_branch_name(payload), ) def get_tag_push_event_body(payload: Dict[str, Any]) -> str: return get_push_tag_event_message( get_user_name(payload), get_tag_name(payload), action="pushed" if payload.get("checkout_sha") else "removed", ) def get_issue_created_event_body(payload: Dict[str, Any], include_title: bool = False) -> str: description = payload["object_attributes"].get("description") # Filter out multiline hidden comments if description is not None: description = re.sub("", "", description, 0, re.DOTALL) description = description.rstrip() return get_issue_event_message( get_issue_user_name(payload), "created", get_object_url(payload), payload["object_attributes"].get("iid"), description, assignees=replace_assignees_username_with_name(get_assignees(payload)), title=payload["object_attributes"].get("title") if include_title else None, ) def get_issue_event_body(payload: Dict[str, Any], action: str, include_title: bool = False) -> str: return get_issue_event_message( get_issue_user_name(payload), action, get_object_url(payload), payload["object_attributes"].get("iid"), title=payload["object_attributes"].get("title") if include_title else None, ) def get_merge_request_updated_event_body( payload: Dict[str, Any], include_title: bool = False ) -> str: if payload["object_attributes"].get("oldrev"): return get_merge_request_event_body( payload, "added commit(s) to", include_title=include_title, ) return get_merge_request_open_or_updated_body( payload, "updated", include_title=include_title, ) def get_merge_request_event_body( payload: Dict[str, Any], action: str, include_title: bool = False ) -> str: pull_request = payload["object_attributes"] return get_pull_request_event_message( get_issue_user_name(payload), action, pull_request.get("url"), pull_request.get("iid"), type="MR", title=payload["object_attributes"].get("title") if include_title else None, ) def get_merge_request_open_or_updated_body( payload: Dict[str, Any], action: str, include_title: bool = False ) -> str: pull_request = payload["object_attributes"] return get_pull_request_event_message( get_issue_user_name(payload), action, pull_request.get("url"), pull_request.get("iid"), pull_request.get("source_branch"), pull_request.get("target_branch"), pull_request.get("description"), assignees=replace_assignees_username_with_name(get_assignees(payload)), type="MR", title=payload["object_attributes"].get("title") if include_title else None, ) def get_assignees(payload: Dict[str, Any]) -> List[Dict[str, str]]: assignee_details = payload.get("assignees") if assignee_details is None: single_assignee_details = payload.get("assignee") if single_assignee_details is None: assignee_details = [] else: assignee_details = [single_assignee_details] return assignee_details def replace_assignees_username_with_name(assignees: List[Dict[str, str]]) -> List[Dict[str, str]]: """Replace the username of each assignee with their (full) name. This is a hack-like adaptor so that when assignees are passed to `get_pull_request_event_message` we can use the assignee's name and not their username (for more consistency). """ for assignee in assignees: assignee["username"] = assignee["name"] return assignees def get_commented_commit_event_body(payload: Dict[str, Any]) -> str: comment = payload["object_attributes"] action = "[commented]({})".format(comment["url"]) return get_commits_comment_action_message( get_issue_user_name(payload), action, payload["commit"].get("url"), payload["commit"].get("id"), comment["note"], ) def get_commented_merge_request_event_body( payload: Dict[str, Any], include_title: bool = False ) -> str: comment = payload["object_attributes"] action = "[commented]({}) on".format(comment["url"]) url = "{}/merge_requests/{}".format( payload["project"].get("web_url"), payload["merge_request"].get("iid"), ) return get_pull_request_event_message( get_issue_user_name(payload), action, url, payload["merge_request"].get("iid"), message=comment["note"], type="MR", title=payload["merge_request"].get("title") if include_title else None, ) def get_commented_issue_event_body(payload: Dict[str, Any], include_title: bool = False) -> str: comment = payload["object_attributes"] action = "[commented]({}) on".format(comment["url"]) url = "{}/issues/{}".format( payload["project"].get("web_url"), payload["issue"].get("iid"), ) return get_pull_request_event_message( get_issue_user_name(payload), action, url, payload["issue"].get("iid"), message=comment["note"], type="issue", title=payload["issue"].get("title") if include_title else None, ) def get_commented_snippet_event_body(payload: Dict[str, Any], include_title: bool = False) -> str: comment = payload["object_attributes"] action = "[commented]({}) on".format(comment["url"]) url = "{}/snippets/{}".format( payload["project"].get("web_url"), payload["snippet"].get("id"), ) return get_pull_request_event_message( get_issue_user_name(payload), action, url, payload["snippet"].get("id"), message=comment["note"], type="snippet", title=payload["snippet"].get("title") if include_title else None, ) def get_wiki_page_event_body(payload: Dict[str, Any], action: str) -> str: return '{} {} [wiki page "{}"]({}).'.format( get_issue_user_name(payload), action, payload["object_attributes"].get("title"), payload["object_attributes"].get("url"), ) def get_build_hook_event_body(payload: Dict[str, Any]) -> str: build_status = payload.get("build_status") if build_status == "created": action = "was created" elif build_status == "running": action = "started" else: action = f"changed status to {build_status}" return "Build {} from {} stage {}.".format( payload.get("build_name"), payload.get("build_stage"), action, ) def get_test_event_body(payload: Dict[str, Any]) -> str: return f"Webhook for **{get_repo_name(payload)}** has been configured successfully! :tada:" def get_pipeline_event_body(payload: Dict[str, Any]) -> str: pipeline_status = payload["object_attributes"].get("status") if pipeline_status == "pending": action = "was created" elif pipeline_status == "running": action = "started" else: action = f"changed status to {pipeline_status}" project_homepage = get_project_homepage(payload) pipeline_url = "{}/pipelines/{}".format( project_homepage, payload["object_attributes"].get("id"), ) builds_status = "" for build in payload["builds"]: build_url = "{}/-/jobs/{}".format( project_homepage, build.get("id"), ) artifact_filename = build.get("artifacts_file", {}).get("filename", None) if artifact_filename: artifact_download_url = f"{build_url}/artifacts/download" artifact_browse_url = f"{build_url}/artifacts/browse" artifact_string = f" * built artifact: *{artifact_filename}* [[Browse]({artifact_browse_url})|[Download]({artifact_download_url})]\n" else: artifact_string = "" builds_status += "* [{}]({}) - {}\n{}".format( build.get("name"), build_url, build.get("status"), artifact_string, ) return "[Pipeline ({})]({}) {} with build(s):\n{}.".format( payload["object_attributes"].get("id"), pipeline_url, action, builds_status[:-1], ) def get_repo_name(payload: Dict[str, Any]) -> str: if "project" in payload: return payload["project"]["name"] # Apparently, Job Hook payloads don't have a `project` section, # but the repository name is accessible from the `repository` # section. return payload["repository"]["name"] def get_user_name(payload: Dict[str, Any]) -> str: return payload["user_name"] def get_issue_user_name(payload: Dict[str, Any]) -> str: return payload["user"]["name"] def get_project_homepage(payload: Dict[str, Any]) -> str: if "project" in payload: return payload["project"]["web_url"] return payload["repository"]["homepage"] def get_branch_name(payload: Dict[str, Any]) -> str: return payload["ref"].replace("refs/heads/", "") def get_tag_name(payload: Dict[str, Any]) -> str: return payload["ref"].replace("refs/tags/", "") def get_object_url(payload: Dict[str, Any]) -> str: return payload["object_attributes"]["url"] EVENT_FUNCTION_MAPPER = { "Push Hook": get_push_event_body, "Tag Push Hook": get_tag_push_event_body, "Test Hook": get_test_event_body, "Issue Hook open": get_issue_created_event_body, "Issue Hook close": partial(get_issue_event_body, action="closed"), "Issue Hook reopen": partial(get_issue_event_body, action="reopened"), "Issue Hook update": partial(get_issue_event_body, action="updated"), "Confidential Issue Hook open": get_issue_created_event_body, "Confidential Issue Hook close": partial(get_issue_event_body, action="closed"), "Confidential Issue Hook reopen": partial(get_issue_event_body, action="reopened"), "Confidential Issue Hook update": partial(get_issue_event_body, action="updated"), "Note Hook Commit": get_commented_commit_event_body, "Note Hook MergeRequest": get_commented_merge_request_event_body, "Note Hook Issue": get_commented_issue_event_body, "Confidential Note Hook Issue": get_commented_issue_event_body, "Note Hook Snippet": get_commented_snippet_event_body, "Merge Request Hook approved": partial(get_merge_request_event_body, action="approved"), "Merge Request Hook unapproved": partial(get_merge_request_event_body, action="unapproved"), "Merge Request Hook open": partial(get_merge_request_open_or_updated_body, action="created"), "Merge Request Hook update": get_merge_request_updated_event_body, "Merge Request Hook merge": partial(get_merge_request_event_body, action="merged"), "Merge Request Hook close": partial(get_merge_request_event_body, action="closed"), "Merge Request Hook reopen": partial(get_merge_request_event_body, action="reopened"), "Wiki Page Hook create": partial(get_wiki_page_event_body, action="created"), "Wiki Page Hook update": partial(get_wiki_page_event_body, action="updated"), "Job Hook": get_build_hook_event_body, "Build Hook": get_build_hook_event_body, "Pipeline Hook": get_pipeline_event_body, } ALL_EVENT_TYPES = list(EVENT_FUNCTION_MAPPER.keys()) @webhook_view("GitLab", all_event_types=ALL_EVENT_TYPES) @has_request_variables def api_gitlab_webhook( request: HttpRequest, user_profile: UserProfile, payload: Dict[str, Any] = REQ(argument_type="body"), branches: Optional[str] = REQ(default=None), use_merge_request_title: bool = REQ(default=True, json_validator=check_bool), user_specified_topic: Optional[str] = REQ("topic", default=None), ) -> HttpResponse: event = get_event(request, payload, branches) if event is not None: event_body_function = get_body_based_on_event(event) if "include_title" in signature(event_body_function).parameters: body = event_body_function( payload, include_title=user_specified_topic is not None, ) else: body = event_body_function(payload) # Add a link to the project if a custom topic is set if user_specified_topic: project_url = f"[{get_repo_name(payload)}]({get_project_homepage(payload)})" body = f"[{project_url}] {body}" topic = get_subject_based_on_event(event, payload, use_merge_request_title) check_send_webhook_message(request, user_profile, topic, body, event) return json_success() def get_body_based_on_event(event: str) -> Any: return EVENT_FUNCTION_MAPPER[event] def get_subject_based_on_event( event: str, payload: Dict[str, Any], use_merge_request_title: bool ) -> str: if event == "Push Hook": return f"{get_repo_name(payload)} / {get_branch_name(payload)}" elif event == "Job Hook" or event == "Build Hook": return "{} / {}".format(payload["repository"].get("name"), get_branch_name(payload)) elif event == "Pipeline Hook": return "{} / {}".format( get_repo_name(payload), payload["object_attributes"].get("ref").replace("refs/heads/", ""), ) elif event.startswith("Merge Request Hook"): return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repo_name(payload), type="MR", id=payload["object_attributes"].get("iid"), title=payload["object_attributes"].get("title") if use_merge_request_title else "", ) elif event.startswith("Issue Hook") or event.startswith("Confidential Issue Hook"): return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repo_name(payload), type="issue", id=payload["object_attributes"].get("iid"), title=payload["object_attributes"].get("title"), ) elif event == "Note Hook Issue" or event == "Confidential Note Hook Issue": return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repo_name(payload), type="issue", id=payload["issue"].get("iid"), title=payload["issue"].get("title"), ) elif event == "Note Hook MergeRequest": return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repo_name(payload), type="MR", id=payload["merge_request"].get("iid"), title=payload["merge_request"].get("title") if use_merge_request_title else "", ) elif event == "Note Hook Snippet": return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repo_name(payload), type="snippet", id=payload["snippet"].get("id"), title=payload["snippet"].get("title"), ) return get_repo_name(payload) def get_event( request: HttpRequest, payload: Dict[str, Any], branches: Optional[str] ) -> Optional[str]: event = validate_extract_webhook_http_header(request, "X_GITLAB_EVENT", "GitLab") if event == "System Hook": # Convert the event name to a GitLab event title event_name = payload.get("event_name", payload.get("object_kind")) event = event_name.split("__")[0].replace("_", " ").title() event = f"{event} Hook" if event in ["Confidential Issue Hook", "Issue Hook", "Merge Request Hook", "Wiki Page Hook"]: action = payload["object_attributes"].get("action", "open") event = f"{event} {action}" elif event in ["Confidential Note Hook", "Note Hook"]: action = payload["object_attributes"].get("noteable_type") event = f"{event} {action}" elif event == "Push Hook": if branches is not None: branch = get_branch_name(payload) if branches.find(branch) == -1: return None if event in list(EVENT_FUNCTION_MAPPER.keys()): return event raise UnsupportedWebhookEventType(event)