zulip/zerver/webhooks/bitbucket2/view.py

505 lines
17 KiB
Python
Raw Normal View History

# Webhooks for external integrations.
import re
import string
from functools import partial
from inspect import signature
from typing import Any, Dict, List, Optional
2017-11-16 00:43:10 +01:00
from django.http import HttpRequest, HttpResponse
2017-11-16 00:43:10 +01:00
from zerver.decorator import log_exception_to_webhook_logger, webhook_view
from zerver.lib.exceptions import UnsupportedWebhookEventType
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.lib.webhooks.common import (
check_send_webhook_message,
validate_extract_webhook_http_header,
)
from zerver.lib.webhooks.git import (
TOPIC_WITH_BRANCH_TEMPLATE,
TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE,
get_commits_comment_action_message,
get_force_push_commits_event_message,
get_issue_event_message,
get_pull_request_event_message,
get_push_commits_event_message,
get_push_tag_event_message,
get_remove_branch_event_message,
)
from zerver.models import UserProfile
BITBUCKET_TOPIC_TEMPLATE = "{repository_name}"
BITBUCKET_FORK_BODY = "{actor} forked the repository into [{fork_name}]({fork_url})."
BITBUCKET_COMMIT_STATUS_CHANGED_BODY = (
"[System {key}]({system_url}) changed status of {commit_info} to {status}."
)
BITBUCKET_REPO_UPDATED_CHANGED = (
"{actor} changed the {change} of the **{repo_name}** repo from **{old}** to **{new}**"
)
BITBUCKET_REPO_UPDATED_ADDED = (
"{actor} changed the {change} of the **{repo_name}** repo to **{new}**"
)
PULL_REQUEST_SUPPORTED_ACTIONS = [
"approved",
"unapproved",
"created",
"updated",
"rejected",
"fulfilled",
"comment_created",
"comment_updated",
"comment_deleted",
]
@webhook_view("Bitbucket2")
@has_request_variables
def api_bitbucket2_webhook(
request: HttpRequest,
user_profile: UserProfile,
payload: Dict[str, Any] = REQ(argument_type="body"),
branches: Optional[str] = REQ(default=None),
user_specified_topic: Optional[str] = REQ("topic", default=None),
) -> HttpResponse:
type = get_type(request, payload)
if type == "push":
# ignore push events with no changes
if not payload["push"]["changes"]:
return json_success()
branch = get_branch_name_for_push_event(payload)
if branch and branches:
if branches.find(branch) == -1:
return json_success()
subject = get_subject_based_on_type(payload, type)
body_function = get_body_based_on_type(type)
if "include_title" in signature(body_function).parameters:
body = body_function(
payload,
include_title=user_specified_topic is not None,
)
else:
body = body_function(payload)
if type != "push":
check_send_webhook_message(
request, user_profile, subject, body, unquote_url_parameters=True
)
else:
for b, s in zip(body, subject):
check_send_webhook_message(request, user_profile, s, b, unquote_url_parameters=True)
return json_success()
def get_subject_for_branch_specified_events(
payload: Dict[str, Any], branch_name: Optional[str] = None
) -> str:
return TOPIC_WITH_BRANCH_TEMPLATE.format(
repo=get_repository_name(payload["repository"]),
branch=get_branch_name_for_push_event(payload) if branch_name is None else branch_name,
)
def get_push_subjects(payload: Dict[str, Any]) -> List[str]:
subjects_list = []
for change in payload["push"]["changes"]:
potential_tag = (change["new"] or change["old"] or {}).get("type")
if potential_tag == "tag":
subjects_list.append(str(get_subject(payload)))
else:
if change.get("new"):
branch_name = change["new"]["name"]
else:
branch_name = change["old"]["name"]
subjects_list.append(str(get_subject_for_branch_specified_events(payload, branch_name)))
return subjects_list
def get_subject(payload: Dict[str, Any]) -> str:
assert payload["repository"] is not None
return BITBUCKET_TOPIC_TEMPLATE.format(
repository_name=get_repository_name(payload["repository"])
)
def get_subject_based_on_type(payload: Dict[str, Any], type: str) -> Any:
if type.startswith("pull_request"):
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repository_name(payload["repository"]),
type="PR",
id=payload["pullrequest"]["id"],
title=payload["pullrequest"]["title"],
)
if type.startswith("issue"):
return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
repo=get_repository_name(payload["repository"]),
type="Issue",
id=payload["issue"]["id"],
title=payload["issue"]["title"],
)
if type == "push":
return get_push_subjects(payload)
return get_subject(payload)
def get_type(request: HttpRequest, payload: Dict[str, Any]) -> str:
if payload.get("push"):
return "push"
elif payload.get("fork"):
return "fork"
elif payload.get("comment") and payload.get("commit"):
return "commit_comment"
elif payload.get("commit_status"):
return "change_commit_status"
elif payload.get("issue"):
if payload.get("changes"):
return "issue_updated"
if payload.get("comment"):
return "issue_commented"
return "issue_created"
elif payload.get("pullrequest"):
pull_request_template = "pull_request_{}"
# Note that we only need the HTTP header to determine pullrequest events.
# We rely on the payload itself to determine the other ones.
event_key = validate_extract_webhook_http_header(request, "X_EVENT_KEY", "BitBucket")
assert event_key is not None
action = re.match("pullrequest:(?P<action>.*)$", event_key)
if action:
action_group = action.group("action")
if action_group in PULL_REQUEST_SUPPORTED_ACTIONS:
return pull_request_template.format(action_group)
else:
event_key = validate_extract_webhook_http_header(request, "X_EVENT_KEY", "BitBucket")
if event_key == "repo:updated":
return event_key
raise UnsupportedWebhookEventType(event_key)
def get_body_based_on_type(type: str) -> Any:
fn = GET_SINGLE_MESSAGE_BODY_DEPENDING_ON_TYPE_MAPPER.get(type)
return fn
def get_push_bodies(payload: Dict[str, Any]) -> List[str]:
messages_list = []
for change in payload["push"]["changes"]:
potential_tag = (change["new"] or change["old"] or {}).get("type")
if potential_tag == "tag":
messages_list.append(get_push_tag_body(payload, change))
# if change['new'] is None, that means a branch was deleted
elif change.get("new") is None:
messages_list.append(get_remove_branch_push_body(payload, change))
elif change.get("forced"):
messages_list.append(get_force_push_body(payload, change))
else:
messages_list.append(get_normal_push_body(payload, change))
return messages_list
def get_remove_branch_push_body(payload: Dict[str, Any], change: Dict[str, Any]) -> str:
return get_remove_branch_event_message(
get_actor_info(payload),
change["old"]["name"],
)
def get_force_push_body(payload: Dict[str, Any], change: Dict[str, Any]) -> str:
return get_force_push_commits_event_message(
get_actor_info(payload),
change["links"]["html"]["href"],
change["new"]["name"],
change["new"]["target"]["hash"],
)
def get_commit_author_name(commit: Dict[str, Any]) -> str:
if commit["author"].get("user"):
return get_user_info(commit["author"]["user"])
return commit["author"]["raw"].split()[0]
def get_normal_push_body(payload: Dict[str, Any], change: Dict[str, Any]) -> str:
commits_data = [
{
"name": get_commit_author_name(commit),
"sha": commit.get("hash"),
"url": commit.get("links").get("html").get("href"),
"message": commit.get("message"),
}
for commit in change["commits"]
]
return get_push_commits_event_message(
get_actor_info(payload),
change["links"]["html"]["href"],
change["new"]["name"],
commits_data,
is_truncated=change["truncated"],
)
def get_fork_body(payload: Dict[str, Any]) -> str:
return BITBUCKET_FORK_BODY.format(
actor=get_user_info(payload["actor"]),
fork_name=get_repository_full_name(payload["fork"]),
fork_url=get_repository_url(payload["fork"]),
)
def get_commit_comment_body(payload: Dict[str, Any]) -> str:
comment = payload["comment"]
action = "[commented]({})".format(comment["links"]["html"]["href"])
return get_commits_comment_action_message(
get_actor_info(payload),
action,
comment["commit"]["links"]["html"]["href"],
comment["commit"]["hash"],
comment["content"]["raw"],
)
def get_commit_status_changed_body(payload: Dict[str, Any]) -> str:
commit_api_url = payload["commit_status"]["links"]["commit"]["href"]
commit_id = commit_api_url.split("/")[-1]
commit_info = "[{short_commit_id}]({repo_url}/commits/{commit_id})".format(
repo_url=get_repository_url(payload["repository"]),
short_commit_id=commit_id[:7],
commit_id=commit_id,
)
return BITBUCKET_COMMIT_STATUS_CHANGED_BODY.format(
key=payload["commit_status"]["key"],
system_url=payload["commit_status"]["url"],
commit_info=commit_info,
status=payload["commit_status"]["state"],
)
def get_issue_commented_body(payload: Dict[str, Any], include_title: bool = False) -> str:
action = "[commented]({}) on".format(payload["comment"]["links"]["html"]["href"])
return get_issue_action_body(payload, action, include_title)
def get_issue_action_body(payload: Dict[str, Any], action: str, include_title: bool = False) -> str:
issue = payload["issue"]
assignee = None
message = None
if action == "created":
if issue["assignee"]:
assignee = get_user_info(issue["assignee"])
message = issue["content"]["raw"]
return get_issue_event_message(
get_actor_info(payload),
action,
issue["links"]["html"]["href"],
issue["id"],
message,
assignee,
title=issue["title"] if include_title else None,
)
def get_pull_request_action_body(
payload: Dict[str, Any], action: str, include_title: bool = False
) -> str:
pull_request = payload["pullrequest"]
return get_pull_request_event_message(
get_actor_info(payload),
action,
get_pull_request_url(pull_request),
pull_request.get("id"),
title=pull_request["title"] if include_title else None,
)
def get_pull_request_created_or_updated_body(
payload: Dict[str, Any], action: str, include_title: bool = False
) -> str:
pull_request = payload["pullrequest"]
assignee = None
if pull_request.get("reviewers"):
assignee = get_user_info(pull_request.get("reviewers")[0])
return get_pull_request_event_message(
get_actor_info(payload),
action,
get_pull_request_url(pull_request),
pull_request.get("id"),
target_branch=pull_request["source"]["branch"]["name"],
base_branch=pull_request["destination"]["branch"]["name"],
message=pull_request["description"],
assignee=assignee,
title=pull_request["title"] if include_title else None,
)
def get_pull_request_comment_created_action_body(
payload: Dict[str, Any],
include_title: bool = False,
) -> str:
action = "[commented]({})".format(payload["comment"]["links"]["html"]["href"])
return get_pull_request_comment_action_body(payload, action, include_title)
def get_pull_request_deleted_or_updated_comment_action_body(
payload: Dict[str, Any],
action: str,
include_title: bool = False,
) -> str:
action = "{} a [comment]({})".format(action, payload["comment"]["links"]["html"]["href"])
return get_pull_request_comment_action_body(payload, action, include_title)
def get_pull_request_comment_action_body(
payload: Dict[str, Any],
action: str,
include_title: bool = False,
) -> str:
action += " on"
return get_pull_request_event_message(
get_actor_info(payload),
action,
payload["pullrequest"]["links"]["html"]["href"],
payload["pullrequest"]["id"],
message=payload["comment"]["content"]["raw"],
title=payload["pullrequest"]["title"] if include_title else None,
)
def get_push_tag_body(payload: Dict[str, Any], change: Dict[str, Any]) -> str:
if change.get("new"):
tag = change["new"]
action = "pushed"
elif change.get("old"):
tag = change["old"]
action = "removed"
return get_push_tag_event_message(
get_actor_info(payload),
tag.get("name"),
tag_url=tag["links"]["html"].get("href"),
action=action,
)
def append_punctuation(title: str, message: str) -> str:
if title[-1] not in string.punctuation:
message = f"{message}."
return message
def get_repo_updated_body(payload: Dict[str, Any]) -> str:
changes = ["website", "name", "links", "language", "full_name", "description"]
body = ""
repo_name = payload["repository"]["name"]
actor = get_actor_info(payload)
for change in changes:
new = payload["changes"][change]["new"]
old = payload["changes"][change]["old"]
if change == "full_name":
change = "full name"
if new and old:
message = BITBUCKET_REPO_UPDATED_CHANGED.format(
actor=actor,
change=change,
repo_name=repo_name,
old=old,
new=new,
)
message = append_punctuation(new, message) + "\n"
body += message
elif new and not old:
message = BITBUCKET_REPO_UPDATED_ADDED.format(
actor=actor,
change=change,
repo_name=repo_name,
new=new,
)
message = append_punctuation(new, message) + "\n"
body += message
return body
def get_pull_request_url(pullrequest_payload: Dict[str, Any]) -> str:
return pullrequest_payload["links"]["html"]["href"]
def get_repository_url(repository_payload: Dict[str, Any]) -> str:
return repository_payload["links"]["html"]["href"]
def get_repository_name(repository_payload: Dict[str, Any]) -> str:
return repository_payload["name"]
def get_repository_full_name(repository_payload: Dict[str, Any]) -> str:
return repository_payload["full_name"]
def get_user_info(dct: Dict[str, Any]) -> str:
# See https://developer.atlassian.com/cloud/bitbucket/bitbucket-api-changes-gdpr/
# Since GDPR, we don't get username; instead, we either get display_name
# or nickname.
if "display_name" in dct:
return dct["display_name"]
if "nickname" in dct:
return dct["nickname"]
log_exception_to_webhook_logger(
summary="Could not find display_name/nickname field",
# We call this an unsupported_event, even though we
# are technically still sending a message.
unsupported_event=True,
)
return "Unknown user"
def get_actor_info(payload: Dict[str, Any]) -> str:
actor = payload["actor"]
return get_user_info(actor)
def get_branch_name_for_push_event(payload: Dict[str, Any]) -> Optional[str]:
change = payload["push"]["changes"][-1]
potential_tag = (change["new"] or change["old"] or {}).get("type")
if potential_tag == "tag":
return None
else:
return (change["new"] or change["old"]).get("name")
GET_SINGLE_MESSAGE_BODY_DEPENDING_ON_TYPE_MAPPER = {
"fork": get_fork_body,
"commit_comment": get_commit_comment_body,
"change_commit_status": get_commit_status_changed_body,
"issue_updated": partial(get_issue_action_body, action="updated"),
"issue_created": partial(get_issue_action_body, action="created"),
"issue_commented": get_issue_commented_body,
"pull_request_created": partial(get_pull_request_created_or_updated_body, action="created"),
"pull_request_updated": partial(get_pull_request_created_or_updated_body, action="updated"),
"pull_request_approved": partial(get_pull_request_action_body, action="approved"),
"pull_request_unapproved": partial(get_pull_request_action_body, action="unapproved"),
"pull_request_fulfilled": partial(get_pull_request_action_body, action="merged"),
"pull_request_rejected": partial(get_pull_request_action_body, action="rejected"),
"pull_request_comment_created": get_pull_request_comment_created_action_body,
"pull_request_comment_updated": partial(
get_pull_request_deleted_or_updated_comment_action_body, action="updated"
),
"pull_request_comment_deleted": partial(
get_pull_request_deleted_or_updated_comment_action_body, action="deleted"
),
"push": get_push_bodies,
"repo:updated": get_repo_updated_body,
}