mirror of https://github.com/zulip/zulip.git
536 lines
20 KiB
Python
536 lines
20 KiB
Python
import re
|
|
from typing import Dict, List, Optional, Protocol, Union
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
from pydantic import Json
|
|
from returns.curry import partial
|
|
|
|
from zerver.decorator import webhook_view
|
|
from zerver.lib.exceptions import UnsupportedWebhookEventTypeError
|
|
from zerver.lib.response import json_success
|
|
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
|
|
from zerver.lib.validator import WildValue, check_int, check_none_or, check_string
|
|
from zerver.lib.webhooks.common import (
|
|
OptionalUserSpecifiedTopicStr,
|
|
check_send_webhook_message,
|
|
validate_extract_webhook_http_header,
|
|
)
|
|
from zerver.lib.webhooks.git import (
|
|
EMPTY_SHA,
|
|
TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE,
|
|
get_commits_comment_action_message,
|
|
get_issue_event_message,
|
|
get_pull_request_event_message,
|
|
get_push_commits_event_message,
|
|
get_push_tag_event_message,
|
|
get_remove_branch_event_message,
|
|
)
|
|
from zerver.models import UserProfile
|
|
|
|
|
|
def fixture_to_headers(fixture_name: str) -> Dict[str, str]:
|
|
if fixture_name.startswith("build"):
|
|
return {} # Since there are 2 possible event types.
|
|
|
|
# Map "push_hook__push_commits_more_than_limit.json" into GitLab's
|
|
# HTTP event title "Push Hook".
|
|
return {"HTTP_X_GITLAB_EVENT": fixture_name.split("__")[0].replace("_", " ").title()}
|
|
|
|
|
|
def get_push_event_body(payload: WildValue, include_title: bool) -> str:
|
|
after = payload.get("after")
|
|
if after:
|
|
stringified_after = after.tame(check_string)
|
|
if stringified_after == EMPTY_SHA:
|
|
return get_remove_branch_event_body(payload)
|
|
return get_normal_push_event_body(payload)
|
|
|
|
|
|
def get_normal_push_event_body(payload: WildValue) -> str:
|
|
compare_url = "{}/-/compare/{}...{}".format(
|
|
get_project_homepage(payload),
|
|
payload["before"].tame(check_string),
|
|
payload["after"].tame(check_string),
|
|
)
|
|
|
|
commits = [
|
|
{
|
|
"name": commit["author"]["name"].tame(check_string),
|
|
"sha": commit["id"].tame(check_string),
|
|
"message": commit["message"].tame(check_string),
|
|
"url": commit["url"].tame(check_string),
|
|
}
|
|
for commit in payload["commits"]
|
|
]
|
|
|
|
return get_push_commits_event_message(
|
|
get_user_name(payload),
|
|
compare_url,
|
|
get_branch_name(payload),
|
|
commits,
|
|
)
|
|
|
|
|
|
def get_remove_branch_event_body(payload: WildValue) -> str:
|
|
return get_remove_branch_event_message(
|
|
get_user_name(payload),
|
|
get_branch_name(payload),
|
|
)
|
|
|
|
|
|
def get_tag_push_event_body(payload: WildValue, include_title: bool) -> str:
|
|
return get_push_tag_event_message(
|
|
get_user_name(payload),
|
|
get_tag_name(payload),
|
|
action="pushed" if payload.get("checkout_sha") else "removed",
|
|
)
|
|
|
|
|
|
def get_issue_created_event_body(payload: WildValue, include_title: bool) -> str:
|
|
description = payload["object_attributes"].get("description")
|
|
# Filter out multiline hidden comments
|
|
if description:
|
|
stringified_description = description.tame(check_string)
|
|
stringified_description = re.sub(
|
|
"<!--.*?-->", "", stringified_description, count=0, flags=re.DOTALL
|
|
)
|
|
stringified_description = stringified_description.rstrip()
|
|
else:
|
|
stringified_description = None
|
|
|
|
return get_issue_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action="created",
|
|
url=get_object_url(payload),
|
|
number=payload["object_attributes"]["iid"].tame(check_int),
|
|
message=stringified_description,
|
|
assignees=replace_assignees_username_with_name(get_assignees(payload)),
|
|
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_issue_event_body(payload: WildValue, action: str, include_title: bool) -> str:
|
|
return get_issue_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=get_object_url(payload),
|
|
number=payload["object_attributes"]["iid"].tame(check_int),
|
|
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_merge_request_updated_event_body(payload: WildValue, include_title: bool) -> str:
|
|
if payload["object_attributes"].get("oldrev"):
|
|
return get_merge_request_event_body(
|
|
payload,
|
|
"added commit(s) to",
|
|
include_title=include_title,
|
|
)
|
|
|
|
return get_merge_request_open_or_updated_body(
|
|
payload,
|
|
"updated",
|
|
include_title=include_title,
|
|
)
|
|
|
|
|
|
def get_merge_request_event_body(payload: WildValue, action: str, include_title: bool) -> str:
|
|
pull_request = payload["object_attributes"]
|
|
target_branch = None
|
|
base_branch = None
|
|
if action == "merged":
|
|
target_branch = pull_request["source_branch"].tame(check_string)
|
|
base_branch = pull_request["target_branch"].tame(check_string)
|
|
|
|
return get_pull_request_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=pull_request["url"].tame(check_string),
|
|
number=pull_request["iid"].tame(check_int),
|
|
target_branch=target_branch,
|
|
base_branch=base_branch,
|
|
type="MR",
|
|
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_merge_request_open_or_updated_body(
|
|
payload: WildValue, action: str, include_title: bool
|
|
) -> str:
|
|
pull_request = payload["object_attributes"]
|
|
return get_pull_request_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=pull_request["url"].tame(check_string),
|
|
number=pull_request["iid"].tame(check_int),
|
|
target_branch=(
|
|
pull_request["source_branch"].tame(check_string) if action == "created" else None
|
|
),
|
|
base_branch=(
|
|
pull_request["target_branch"].tame(check_string) if action == "created" else None
|
|
),
|
|
message=pull_request["description"].tame(check_none_or(check_string)),
|
|
assignees=replace_assignees_username_with_name(get_assignees(payload)),
|
|
type="MR",
|
|
title=payload["object_attributes"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_assignees(payload: WildValue) -> Union[List[WildValue], WildValue]:
|
|
assignee_details = payload.get("assignees")
|
|
if not assignee_details:
|
|
single_assignee_details = payload.get("assignee")
|
|
if not single_assignee_details:
|
|
transformed_assignee_details = []
|
|
else:
|
|
transformed_assignee_details = [single_assignee_details]
|
|
return transformed_assignee_details
|
|
return assignee_details
|
|
|
|
|
|
def replace_assignees_username_with_name(
|
|
assignees: Union[List[WildValue], WildValue]
|
|
) -> List[Dict[str, str]]:
|
|
"""Replace the username of each assignee with their (full) name.
|
|
|
|
This is a hack-like adaptor so that when assignees are passed to
|
|
`get_pull_request_event_message` we can use the assignee's name
|
|
and not their username (for more consistency).
|
|
"""
|
|
formatted_assignees = []
|
|
for assignee in assignees:
|
|
formatted_assignee = {}
|
|
formatted_assignee["username"] = assignee["name"].tame(check_string)
|
|
formatted_assignees.append(formatted_assignee)
|
|
return formatted_assignees
|
|
|
|
|
|
def get_commented_commit_event_body(payload: WildValue, include_title: bool) -> str:
|
|
comment = payload["object_attributes"]
|
|
action = "[commented]({})".format(comment["url"].tame(check_string))
|
|
return get_commits_comment_action_message(
|
|
get_issue_user_name(payload),
|
|
action,
|
|
payload["commit"]["url"].tame(check_string),
|
|
payload["commit"]["id"].tame(check_string),
|
|
comment["note"].tame(check_string),
|
|
)
|
|
|
|
|
|
def get_commented_merge_request_event_body(payload: WildValue, include_title: bool) -> str:
|
|
comment = payload["object_attributes"]
|
|
action = "[commented]({}) on".format(comment["url"].tame(check_string))
|
|
url = payload["merge_request"]["url"].tame(check_string)
|
|
|
|
return get_pull_request_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=url,
|
|
number=payload["merge_request"]["iid"].tame(check_int),
|
|
message=comment["note"].tame(check_string),
|
|
type="MR",
|
|
title=payload["merge_request"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_commented_issue_event_body(payload: WildValue, include_title: bool) -> str:
|
|
comment = payload["object_attributes"]
|
|
action = "[commented]({}) on".format(comment["url"].tame(check_string))
|
|
url = payload["issue"]["url"].tame(check_string)
|
|
|
|
return get_pull_request_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=url,
|
|
number=payload["issue"]["iid"].tame(check_int),
|
|
message=comment["note"].tame(check_string),
|
|
type="issue",
|
|
title=payload["issue"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_commented_snippet_event_body(payload: WildValue, include_title: bool) -> str:
|
|
comment = payload["object_attributes"]
|
|
action = "[commented]({}) on".format(comment["url"].tame(check_string))
|
|
# Snippet URL is only available in GitLab 16.1+
|
|
if "url" in payload["snippet"]:
|
|
url = payload["snippet"]["url"].tame(check_string)
|
|
else:
|
|
url = "{}/-/snippets/{}".format(
|
|
payload["project"]["web_url"].tame(check_string),
|
|
payload["snippet"]["id"].tame(check_int),
|
|
)
|
|
|
|
return get_pull_request_event_message(
|
|
user_name=get_issue_user_name(payload),
|
|
action=action,
|
|
url=url,
|
|
number=payload["snippet"]["id"].tame(check_int),
|
|
message=comment["note"].tame(check_string),
|
|
type="snippet",
|
|
title=payload["snippet"]["title"].tame(check_string) if include_title else None,
|
|
)
|
|
|
|
|
|
def get_wiki_page_event_body(payload: WildValue, action: str, include_title: bool) -> str:
|
|
return '{} {} [wiki page "{}"]({}).'.format(
|
|
get_issue_user_name(payload),
|
|
action,
|
|
payload["object_attributes"]["title"].tame(check_string),
|
|
payload["object_attributes"]["url"].tame(check_string),
|
|
)
|
|
|
|
|
|
def get_build_hook_event_body(payload: WildValue, include_title: bool) -> str:
|
|
build_status = payload["build_status"].tame(check_string)
|
|
if build_status == "created":
|
|
action = "was created"
|
|
elif build_status == "running":
|
|
action = "started"
|
|
else:
|
|
action = f"changed status to {build_status}"
|
|
return "Build {} from {} stage {}.".format(
|
|
payload["build_name"].tame(check_string),
|
|
payload["build_stage"].tame(check_string),
|
|
action,
|
|
)
|
|
|
|
|
|
def get_test_event_body(payload: WildValue, include_title: bool) -> str:
|
|
return f"Webhook for **{get_repo_name(payload)}** has been configured successfully! :tada:"
|
|
|
|
|
|
def get_pipeline_event_body(payload: WildValue, include_title: bool) -> str:
|
|
pipeline_status = payload["object_attributes"]["status"].tame(check_string)
|
|
if pipeline_status == "pending":
|
|
action = "was created"
|
|
elif pipeline_status == "running":
|
|
action = "started"
|
|
else:
|
|
action = f"changed status to {pipeline_status}"
|
|
|
|
project_homepage = get_project_homepage(payload)
|
|
pipeline_url = "{}/-/pipelines/{}".format(
|
|
project_homepage,
|
|
payload["object_attributes"]["id"].tame(check_int),
|
|
)
|
|
|
|
builds_status = ""
|
|
for build in payload["builds"]:
|
|
build_url = "{}/-/jobs/{}".format(
|
|
project_homepage,
|
|
build["id"].tame(check_int),
|
|
)
|
|
artifact_filename = build.get("artifacts_file", {}).get("filename", None)
|
|
if artifact_filename:
|
|
artifact_download_url = f"{build_url}/artifacts/download"
|
|
artifact_browse_url = f"{build_url}/artifacts/browse"
|
|
artifact_string = f" * built artifact: *{artifact_filename.tame(check_string)}* [[Browse]({artifact_browse_url})|[Download]({artifact_download_url})]\n"
|
|
else:
|
|
artifact_string = ""
|
|
builds_status += "* [{}]({}) - {}\n{}".format(
|
|
build["name"].tame(check_string),
|
|
build_url,
|
|
build["status"].tame(check_string),
|
|
artifact_string,
|
|
)
|
|
return "[Pipeline ({})]({}) {} with build(s):\n{}.".format(
|
|
payload["object_attributes"]["id"].tame(check_int),
|
|
pipeline_url,
|
|
action,
|
|
builds_status[:-1],
|
|
)
|
|
|
|
|
|
def get_repo_name(payload: WildValue) -> str:
|
|
if "project" in payload:
|
|
return payload["project"]["name"].tame(check_string)
|
|
|
|
# Apparently, Job Hook payloads don't have a `project` section,
|
|
# but the repository name is accessible from the `repository`
|
|
# section.
|
|
return payload["repository"]["name"].tame(check_string)
|
|
|
|
|
|
def get_user_name(payload: WildValue) -> str:
|
|
return payload["user_name"].tame(check_string)
|
|
|
|
|
|
def get_issue_user_name(payload: WildValue) -> str:
|
|
return payload["user"]["name"].tame(check_string)
|
|
|
|
|
|
def get_project_homepage(payload: WildValue) -> str:
|
|
if "project" in payload:
|
|
return payload["project"]["web_url"].tame(check_string)
|
|
return payload["repository"]["homepage"].tame(check_string)
|
|
|
|
|
|
def get_branch_name(payload: WildValue) -> str:
|
|
return payload["ref"].tame(check_string).replace("refs/heads/", "")
|
|
|
|
|
|
def get_tag_name(payload: WildValue) -> str:
|
|
return payload["ref"].tame(check_string).replace("refs/tags/", "")
|
|
|
|
|
|
def get_object_url(payload: WildValue) -> str:
|
|
return payload["object_attributes"]["url"].tame(check_string)
|
|
|
|
|
|
class EventFunction(Protocol):
|
|
def __call__(self, payload: WildValue, include_title: bool) -> str: ...
|
|
|
|
|
|
EVENT_FUNCTION_MAPPER: Dict[str, EventFunction] = {
|
|
"Push Hook": get_push_event_body,
|
|
"Tag Push Hook": get_tag_push_event_body,
|
|
"Test Hook": get_test_event_body,
|
|
"Issue Hook open": get_issue_created_event_body,
|
|
"Issue Hook close": partial(get_issue_event_body, action="closed"),
|
|
"Issue Hook reopen": partial(get_issue_event_body, action="reopened"),
|
|
"Issue Hook update": partial(get_issue_event_body, action="updated"),
|
|
"Confidential Issue Hook open": get_issue_created_event_body,
|
|
"Confidential Issue Hook close": partial(get_issue_event_body, action="closed"),
|
|
"Confidential Issue Hook reopen": partial(get_issue_event_body, action="reopened"),
|
|
"Confidential Issue Hook update": partial(get_issue_event_body, action="updated"),
|
|
"Note Hook Commit": get_commented_commit_event_body,
|
|
"Note Hook MergeRequest": get_commented_merge_request_event_body,
|
|
"Note Hook Issue": get_commented_issue_event_body,
|
|
"Confidential Note Hook Issue": get_commented_issue_event_body,
|
|
"Note Hook Snippet": get_commented_snippet_event_body,
|
|
"Merge Request Hook approved": partial(get_merge_request_event_body, action="approved"),
|
|
"Merge Request Hook unapproved": partial(get_merge_request_event_body, action="unapproved"),
|
|
"Merge Request Hook open": partial(get_merge_request_open_or_updated_body, action="created"),
|
|
"Merge Request Hook update": get_merge_request_updated_event_body,
|
|
"Merge Request Hook merge": partial(get_merge_request_event_body, action="merged"),
|
|
"Merge Request Hook close": partial(get_merge_request_event_body, action="closed"),
|
|
"Merge Request Hook reopen": partial(get_merge_request_event_body, action="reopened"),
|
|
"Wiki Page Hook create": partial(get_wiki_page_event_body, action="created"),
|
|
"Wiki Page Hook update": partial(get_wiki_page_event_body, action="updated"),
|
|
"Job Hook": get_build_hook_event_body,
|
|
"Build Hook": get_build_hook_event_body,
|
|
"Pipeline Hook": get_pipeline_event_body,
|
|
}
|
|
|
|
ALL_EVENT_TYPES = list(EVENT_FUNCTION_MAPPER.keys())
|
|
|
|
|
|
@webhook_view("GitLab", all_event_types=ALL_EVENT_TYPES)
|
|
@typed_endpoint
|
|
def api_gitlab_webhook(
|
|
request: HttpRequest,
|
|
user_profile: UserProfile,
|
|
*,
|
|
payload: JsonBodyPayload[WildValue],
|
|
branches: Optional[str] = None,
|
|
use_merge_request_title: Json[bool] = True,
|
|
user_specified_topic: OptionalUserSpecifiedTopicStr = None,
|
|
) -> HttpResponse:
|
|
event = get_event(request, payload, branches)
|
|
if event is not None:
|
|
event_body_function = get_body_based_on_event(event)
|
|
body = event_body_function(
|
|
payload,
|
|
include_title=user_specified_topic is not None,
|
|
)
|
|
|
|
# Add a link to the project if a custom topic is set
|
|
if user_specified_topic:
|
|
project_url = f"[{get_repo_name(payload)}]({get_project_homepage(payload)})"
|
|
body = f"[{project_url}] {body}"
|
|
|
|
topic_name = get_topic_based_on_event(event, payload, use_merge_request_title)
|
|
check_send_webhook_message(request, user_profile, topic_name, body, event)
|
|
return json_success(request)
|
|
|
|
|
|
def get_body_based_on_event(event: str) -> EventFunction:
|
|
return EVENT_FUNCTION_MAPPER[event]
|
|
|
|
|
|
def get_topic_based_on_event(event: str, payload: WildValue, use_merge_request_title: bool) -> str:
|
|
if event == "Push Hook":
|
|
return f"{get_repo_name(payload)} / {get_branch_name(payload)}"
|
|
elif event in ("Job Hook", "Build Hook"):
|
|
return "{} / {}".format(
|
|
payload["repository"]["name"].tame(check_string), get_branch_name(payload)
|
|
)
|
|
elif event == "Pipeline Hook":
|
|
return "{} / {}".format(
|
|
get_repo_name(payload),
|
|
payload["object_attributes"]["ref"].tame(check_string).replace("refs/heads/", ""),
|
|
)
|
|
elif event.startswith("Merge Request Hook"):
|
|
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
repo=get_repo_name(payload),
|
|
type="MR",
|
|
id=payload["object_attributes"]["iid"].tame(check_int),
|
|
title=(
|
|
payload["object_attributes"]["title"].tame(check_string)
|
|
if use_merge_request_title
|
|
else ""
|
|
),
|
|
)
|
|
elif event.startswith(("Issue Hook", "Confidential Issue Hook")):
|
|
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
repo=get_repo_name(payload),
|
|
type="issue",
|
|
id=payload["object_attributes"]["iid"].tame(check_int),
|
|
title=payload["object_attributes"]["title"].tame(check_string),
|
|
)
|
|
elif event in ("Note Hook Issue", "Confidential Note Hook Issue"):
|
|
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
repo=get_repo_name(payload),
|
|
type="issue",
|
|
id=payload["issue"]["iid"].tame(check_int),
|
|
title=payload["issue"]["title"].tame(check_string),
|
|
)
|
|
elif event == "Note Hook MergeRequest":
|
|
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
repo=get_repo_name(payload),
|
|
type="MR",
|
|
id=payload["merge_request"]["iid"].tame(check_int),
|
|
title=(
|
|
payload["merge_request"]["title"].tame(check_string)
|
|
if use_merge_request_title
|
|
else ""
|
|
),
|
|
)
|
|
|
|
elif event == "Note Hook Snippet":
|
|
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
repo=get_repo_name(payload),
|
|
type="snippet",
|
|
id=payload["snippet"]["id"].tame(check_int),
|
|
title=payload["snippet"]["title"].tame(check_string),
|
|
)
|
|
return get_repo_name(payload)
|
|
|
|
|
|
def get_event(request: HttpRequest, payload: WildValue, branches: Optional[str]) -> Optional[str]:
|
|
event = validate_extract_webhook_http_header(request, "X-GitLab-Event", "GitLab")
|
|
if event == "System Hook":
|
|
# Convert the event name to a GitLab event title
|
|
if "event_name" in payload:
|
|
event_name = payload["event_name"].tame(check_string)
|
|
else:
|
|
event_name = payload["object_kind"].tame(check_string)
|
|
event = event_name.split("__")[0].replace("_", " ").title()
|
|
event = f"{event} Hook"
|
|
if event in ["Confidential Issue Hook", "Issue Hook", "Merge Request Hook", "Wiki Page Hook"]:
|
|
action = payload["object_attributes"].get("action", "open").tame(check_string)
|
|
event = f"{event} {action}"
|
|
elif event in ["Confidential Note Hook", "Note Hook"]:
|
|
action = payload["object_attributes"]["noteable_type"].tame(check_string)
|
|
event = f"{event} {action}"
|
|
elif event == "Push Hook" and branches is not None:
|
|
branch = get_branch_name(payload)
|
|
if branches.find(branch) == -1:
|
|
return None
|
|
|
|
if event in EVENT_FUNCTION_MAPPER:
|
|
return event
|
|
|
|
raise UnsupportedWebhookEventTypeError(event)
|