zulip/zerver/webhooks/gitlab/view.py

533 lines
20 KiB
Python

import re
from typing import Dict, List, Optional, Protocol, Union
from django.http import HttpRequest, HttpResponse
from pydantic import Json
from returns.curry import partial
from zerver.decorator import webhook_view
from zerver.lib.exceptions import UnsupportedWebhookEventTypeError
from zerver.lib.response import json_success
from zerver.lib.typed_endpoint import WebhookPayload, typed_endpoint
from zerver.lib.validator import WildValue, check_int, check_string
from zerver.lib.webhooks.common import (
OptionalUserSpecifiedTopicStr,
check_send_webhook_message,
validate_extract_webhook_http_header,
)
from zerver.lib.webhooks.git import (
EMPTY_SHA,
TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE,
get_commits_comment_action_message,
get_issue_event_message,
get_pull_request_event_message,
get_push_commits_event_message,
get_push_tag_event_message,
get_remove_branch_event_message,
)
from zerver.models import UserProfile
def fixture_to_headers(fixture_name: str) -> Dict[str, str]:
if fixture_name.startswith("build"):
return {} # Since there are 2 possible event types.
# Map "push_hook__push_commits_more_than_limit.json" into GitLab's
# HTTP event title "Push Hook".
return {"HTTP_X_GITLAB_EVENT": fixture_name.split("__")[0].replace("_", " ").title()}
def get_push_event_body(payload: WildValue, include_title: bool) -> str:
after = payload.get("after")
if after:
stringified_after = after.tame(check_string)
if stringified_after == EMPTY_SHA:
return get_remove_branch_event_body(payload)
return get_normal_push_event_body(payload)
def get_normal_push_event_body(payload: WildValue) -> str:
compare_url = "{}/-/compare/{}...{}".format(
get_project_homepage(payload),
payload["before"].tame(check_string),
payload["after"].tame(check_string),
)
commits = [
{
"name": commit["author"]["name"].tame(check_string),
"sha": commit["id"].tame(check_string),
"message": commit["message"].tame(check_string),
"url": commit["url"].tame(check_string),
}
for commit in payload["commits"]
]
return get_push_commits_event_message(
get_user_name(payload),
compare_url,
get_branch_name(payload),
commits,
)
def get_remove_branch_event_body(payload: WildValue) -> str:
return get_remove_branch_event_message(
get_user_name(payload),
get_branch_name(payload),
)
def get_tag_push_event_body(payload: WildValue, include_title: bool) -> str:
return get_push_tag_event_message(
get_user_name(payload),
get_tag_name(payload),
action="pushed" if payload.get("checkout_sha") else "removed",
)
def get_issue_created_event_body(payload: WildValue, include_title: bool) -> str:
description = payload["object_attributes"].get("description")
# Filter out multiline hidden comments
if description:
stringified_description = description.tame(check_string)
stringified_description = re.sub(
"<!--.*?-->", "", stringified_description, count=0, flags=re.DOTALL
)
stringified_description = stringified_description.rstrip()
else:
stringified_description = None
return get_issue_event_message(
user_name=get_issue_user_name(payload),
action="created",
url=get_object_url(payload),
number=payload["object_attributes"]["iid"].tame(check_int),
message=stringified_description,
assignees=replace_assignees_username_with_name(get_assignees(payload)),
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
)
def get_issue_event_body(payload: WildValue, action: str, include_title: bool) -> str:
return get_issue_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=get_object_url(payload),
number=payload["object_attributes"]["iid"].tame(check_int),
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
)
def get_merge_request_updated_event_body(payload: WildValue, include_title: bool) -> str:
if payload["object_attributes"].get("oldrev"):
return get_merge_request_event_body(
payload,
"added commit(s) to",
include_title=include_title,
)
return get_merge_request_open_or_updated_body(
payload,
"updated",
include_title=include_title,
)
def get_merge_request_event_body(payload: WildValue, action: str, include_title: bool) -> str:
pull_request = payload["object_attributes"]
target_branch = None
base_branch = None
if action == "merged":
target_branch = pull_request["source_branch"].tame(check_string)
base_branch = pull_request["target_branch"].tame(check_string)
return get_pull_request_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=pull_request["url"].tame(check_string),
number=pull_request["iid"].tame(check_int),
target_branch=target_branch,
base_branch=base_branch,
type="MR",
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
)
def get_merge_request_open_or_updated_body(
payload: WildValue, action: str, include_title: bool
) -> str:
pull_request = payload["object_attributes"]
return get_pull_request_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=pull_request["url"].tame(check_string),
number=pull_request["iid"].tame(check_int),
target_branch=pull_request["source_branch"].tame(check_string)
if action == "created"
else None,
base_branch=pull_request["target_branch"].tame(check_string)
if action == "created"
else None,
message=pull_request["description"].tame(check_string),
assignees=replace_assignees_username_with_name(get_assignees(payload)),
type="MR",
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
)
def get_assignees(payload: WildValue) -> Union[List[WildValue], WildValue]:
assignee_details = payload.get("assignees")
if not assignee_details:
single_assignee_details = payload.get("assignee")
if not single_assignee_details:
transformed_assignee_details = []
else:
transformed_assignee_details = [single_assignee_details]
return transformed_assignee_details
return assignee_details
def replace_assignees_username_with_name(
assignees: Union[List[WildValue], WildValue]
) -> List[Dict[str, str]]:
"""Replace the username of each assignee with their (full) name.
This is a hack-like adaptor so that when assignees are passed to
`get_pull_request_event_message` we can use the assignee's name
and not their username (for more consistency).
"""
formatted_assignees = []
for assignee in assignees:
formatted_assignee = {}
formatted_assignee["username"] = assignee["name"].tame(check_string)
formatted_assignees.append(formatted_assignee)
return formatted_assignees
def get_commented_commit_event_body(payload: WildValue, include_title: bool) -> str:
comment = payload["object_attributes"]
action = "[commented]({})".format(comment["url"].tame(check_string))
return get_commits_comment_action_message(
get_issue_user_name(payload),
action,
payload["commit"]["url"].tame(check_string),
payload["commit"]["id"].tame(check_string),
comment["note"].tame(check_string),
)
def get_commented_merge_request_event_body(payload: WildValue, include_title: bool) -> str:
comment = payload["object_attributes"]
action = "[commented]({}) on".format(comment["url"].tame(check_string))
url = payload["merge_request"]["url"].tame(check_string)
return get_pull_request_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=url,
number=payload["merge_request"]["iid"].tame(check_int),
message=comment["note"].tame(check_string),
type="MR",
title=payload["merge_request"]["title"].tame(check_string) if include_title else None,
)
def get_commented_issue_event_body(payload: WildValue, include_title: bool) -> str:
comment = payload["object_attributes"]
action = "[commented]({}) on".format(comment["url"].tame(check_string))
url = payload["issue"]["url"].tame(check_string)
return get_pull_request_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=url,
number=payload["issue"]["iid"].tame(check_int),
message=comment["note"].tame(check_string),
type="issue",
title=payload["issue"]["title"].tame(check_string) if include_title else None,
)
def get_commented_snippet_event_body(payload: WildValue, include_title: bool) -> str:
comment = payload["object_attributes"]
action = "[commented]({}) on".format(comment["url"].tame(check_string))
# Snippet URL is only available in GitLab 16.1+
if "url" in payload["snippet"]:
url = payload["snippet"]["url"].tame(check_string)
else:
url = "{}/-/snippets/{}".format(
payload["project"]["web_url"].tame(check_string),
payload["snippet"]["id"].tame(check_int),
)
return get_pull_request_event_message(
user_name=get_issue_user_name(payload),
action=action,
url=url,
number=payload["snippet"]["id"].tame(check_int),
message=comment["note"].tame(check_string),
type="snippet",
title=payload["snippet"]["title"].tame(check_string) if include_title else None,
)
def get_wiki_page_event_body(payload: WildValue, action: str, include_title: bool) -> str:
return '{} {} [wiki page "{}"]({}).'.format(
get_issue_user_name(payload),
action,
payload["object_attributes"]["title"].tame(check_string),
payload["object_attributes"]["url"].tame(check_string),
)
def get_build_hook_event_body(payload: WildValue, include_title: bool) -> str:
build_status = payload["build_status"].tame(check_string)
if build_status == "created":
action = "was created"
elif build_status == "running":
action = "started"
else:
action = f"changed status to {build_status}"
return "Build {} from {} stage {}.".format(
payload["build_name"].tame(check_string),
payload["build_stage"].tame(check_string),
action,
)
def get_test_event_body(payload: WildValue, include_title: bool) -> str:
return f"Webhook for **{get_repo_name(payload)}** has been configured successfully! :tada:"
def get_pipeline_event_body(payload: WildValue, include_title: bool) -> str:
pipeline_status = payload["object_attributes"]["status"].tame(check_string)
if pipeline_status == "pending":
action = "was created"
elif pipeline_status == "running":
action = "started"
else:
action = f"changed status to {pipeline_status}"
project_homepage = get_project_homepage(payload)
pipeline_url = "{}/-/pipelines/{}".format(
project_homepage,
payload["object_attributes"]["id"].tame(check_int),
)
builds_status = ""
for build in payload["builds"]:
build_url = "{}/-/jobs/{}".format(
project_homepage,
build["id"].tame(check_int),
)
artifact_filename = build.get("artifacts_file", {}).get("filename", None)
if artifact_filename:
artifact_download_url = f"{build_url}/artifacts/download"
artifact_browse_url = f"{build_url}/artifacts/browse"
artifact_string = f" * built artifact: *{artifact_filename.tame(check_string)}* [[Browse]({artifact_browse_url})|[Download]({artifact_download_url})]\n"
else:
artifact_string = ""
builds_status += "* [{}]({}) - {}\n{}".format(
build["name"].tame(check_string),
build_url,
build["status"].tame(check_string),
artifact_string,
)
return "[Pipeline ({})]({}) {} with build(s):\n{}.".format(
payload["object_attributes"]["id"].tame(check_int),
pipeline_url,
action,
builds_status[:-1],
)
def get_repo_name(payload: WildValue) -> str:
if "project" in payload:
return payload["project"]["name"].tame(check_string)
# Apparently, Job Hook payloads don't have a `project` section,
# but the repository name is accessible from the `repository`
# section.
return payload["repository"]["name"].tame(check_string)
def get_user_name(payload: WildValue) -> str:
return payload["user_name"].tame(check_string)
def get_issue_user_name(payload: WildValue) -> str:
return payload["user"]["name"].tame(check_string)
def get_project_homepage(payload: WildValue) -> str:
if "project" in payload:
return payload["project"]["web_url"].tame(check_string)
return payload["repository"]["homepage"].tame(check_string)
def get_branch_name(payload: WildValue) -> str:
return payload["ref"].tame(check_string).replace("refs/heads/", "")
def get_tag_name(payload: WildValue) -> str:
return payload["ref"].tame(check_string).replace("refs/tags/", "")
def get_object_url(payload: WildValue) -> str:
return payload["object_attributes"]["url"].tame(check_string)
class EventFunction(Protocol):
def __call__(self, payload: WildValue, include_title: bool) -> str:
...
EVENT_FUNCTION_MAPPER: Dict[str, EventFunction] = {
"Push Hook": get_push_event_body,
"Tag Push Hook": get_tag_push_event_body,
"Test Hook": get_test_event_body,
"Issue Hook open": get_issue_created_event_body,
"Issue Hook close": partial(get_issue_event_body, action="closed"),
"Issue Hook reopen": partial(get_issue_event_body, action="reopened"),
"Issue Hook update": partial(get_issue_event_body, action="updated"),
"Confidential Issue Hook open": get_issue_created_event_body,
"Confidential Issue Hook close": partial(get_issue_event_body, action="closed"),
"Confidential Issue Hook reopen": partial(get_issue_event_body, action="reopened"),
"Confidential Issue Hook update": partial(get_issue_event_body, action="updated"),
"Note Hook Commit": get_commented_commit_event_body,
"Note Hook MergeRequest": get_commented_merge_request_event_body,
"Note Hook Issue": get_commented_issue_event_body,
"Confidential Note Hook Issue": get_commented_issue_event_body,
"Note Hook Snippet": get_commented_snippet_event_body,
"Merge Request Hook approved": partial(get_merge_request_event_body, action="approved"),
"Merge Request Hook unapproved": partial(get_merge_request_event_body, action="unapproved"),
"Merge Request Hook open": partial(get_merge_request_open_or_updated_body, action="created"),
"Merge Request Hook update": get_merge_request_updated_event_body,
"Merge Request Hook merge": partial(get_merge_request_event_body, action="merged"),
"Merge Request Hook close": partial(get_merge_request_event_body, action="closed"),
"Merge Request Hook reopen": partial(get_merge_request_event_body, action="reopened"),
"Wiki Page Hook create": partial(get_wiki_page_event_body, action="created"),
"Wiki Page Hook update": partial(get_wiki_page_event_body, action="updated"),
"Job Hook": get_build_hook_event_body,
"Build Hook": get_build_hook_event_body,
"Pipeline Hook": get_pipeline_event_body,
}
ALL_EVENT_TYPES = list(EVENT_FUNCTION_MAPPER.keys())
@webhook_view("GitLab", all_event_types=ALL_EVENT_TYPES)
@typed_endpoint
def api_gitlab_webhook(
request: HttpRequest,
user_profile: UserProfile,
*,
payload: WebhookPayload[WildValue],
branches: Optional[str] = None,
use_merge_request_title: Json[bool] = True,
user_specified_topic: OptionalUserSpecifiedTopicStr = None,
) -> HttpResponse:
event = get_event(request, payload, branches)
if event is not None:
event_body_function = get_body_based_on_event(event)
body = event_body_function(
payload,
include_title=user_specified_topic is not None,
)
# Add a link to the project if a custom topic is set
if user_specified_topic:
project_url = f"[{get_repo_name(payload)}]({get_project_homepage(payload)})"
body = f"[{project_url}] {body}"
topic = get_topic_based_on_event(event, payload, use_merge_request_title)
check_send_webhook_message(request, user_profile, topic, body, event)
return json_success(request)
def get_body_based_on_event(event: str) -> EventFunction:
return EVENT_FUNCTION_MAPPER[event]
def get_topic_based_on_event(event: str, payload: WildValue, use_merge_request_title: bool) -> str:
if event == "Push Hook":
return f"{get_repo_name(payload)} / {get_branch_name(payload)}"
elif event in ("Job Hook", "Build Hook"):
return "{} / {}".format(
payload["repository"]["name"].tame(check_string), get_branch_name(payload)
)
elif event == "Pipeline Hook":
return "{} / {}".format(
get_repo_name(payload),
payload["object_attributes"]["ref"].tame(check_string).replace("refs/heads/", ""),
)
elif event.startswith("Merge Request Hook"):
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repo_name(payload),
type="MR",
id=payload["object_attributes"]["iid"].tame(check_int),
title=payload["object_attributes"]["title"].tame(check_string)
if use_merge_request_title
else "",
)
elif event.startswith(("Issue Hook", "Confidential Issue Hook")):
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repo_name(payload),
type="issue",
id=payload["object_attributes"]["iid"].tame(check_int),
title=payload["object_attributes"]["title"].tame(check_string),
)
elif event in ("Note Hook Issue", "Confidential Note Hook Issue"):
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repo_name(payload),
type="issue",
id=payload["issue"]["iid"].tame(check_int),
title=payload["issue"]["title"].tame(check_string),
)
elif event == "Note Hook MergeRequest":
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repo_name(payload),
type="MR",
id=payload["merge_request"]["iid"].tame(check_int),
title=payload["merge_request"]["title"].tame(check_string)
if use_merge_request_title
else "",
)
elif event == "Note Hook Snippet":
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repo_name(payload),
type="snippet",
id=payload["snippet"]["id"].tame(check_int),
title=payload["snippet"]["title"].tame(check_string),
)
return get_repo_name(payload)
def get_event(request: HttpRequest, payload: WildValue, branches: Optional[str]) -> Optional[str]:
event = validate_extract_webhook_http_header(request, "X-GitLab-Event", "GitLab")
if event == "System Hook":
# Convert the event name to a GitLab event title
if "event_name" in payload:
event_name = payload["event_name"].tame(check_string)
else:
event_name = payload["object_kind"].tame(check_string)
event = event_name.split("__")[0].replace("_", " ").title()
event = f"{event} Hook"
if event in ["Confidential Issue Hook", "Issue Hook", "Merge Request Hook", "Wiki Page Hook"]:
action = payload["object_attributes"].get("action", "open").tame(check_string)
event = f"{event} {action}"
elif event in ["Confidential Note Hook", "Note Hook"]:
action = payload["object_attributes"]["noteable_type"].tame(check_string)
event = f"{event} {action}"
elif event == "Push Hook" and branches is not None:
branch = get_branch_name(payload)
if branches.find(branch) == -1:
return None
if event in EVENT_FUNCTION_MAPPER:
return event
raise UnsupportedWebhookEventTypeError(event)