zulip/zerver/webhooks/bitbucket/view.py

66 lines
2.4 KiB
Python

from typing import Optional
from django.http import HttpRequest, HttpResponse
from zerver.decorator import authenticated_rest_api_view
from zerver.lib.response import json_success
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
from zerver.lib.validator import WildValue, check_string
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.lib.webhooks.git import TOPIC_WITH_BRANCH_TEMPLATE, get_push_commits_event_message
from zerver.models import UserProfile
@authenticated_rest_api_view(webhook_client_name="Bitbucket")
@typed_endpoint
def api_bitbucket_webhook(
request: HttpRequest,
user_profile: UserProfile,
*,
payload: JsonBodyPayload[WildValue],
branches: Optional[str] = None,
) -> HttpResponse:
repository = payload["repository"]
commits = [
{
"name": (
commit["author"].tame(check_string)
if "author" in commit
else payload.get("user", "Someone").tame(check_string)
),
"sha": commit["raw_node"].tame(check_string),
"message": commit["message"].tame(check_string),
"url": "{}{}commits/{}".format(
payload["canon_url"].tame(check_string),
repository["absolute_url"].tame(check_string),
commit["raw_node"].tame(check_string),
),
}
for commit in payload["commits"]
]
if len(commits) == 0:
# Bitbucket doesn't give us enough information to really give
# a useful message :/
topic_name = repository["name"].tame(check_string)
content = "{} [force pushed]({}).".format(
payload.get("user", "Someone").tame(check_string),
payload["canon_url"].tame(check_string) + repository["absolute_url"].tame(check_string),
)
else:
branch = payload["commits"][-1]["branch"].tame(check_string)
if branches is not None and branches.find(branch) == -1:
return json_success(request)
committer = payload.get("user", "Someone").tame(check_string)
content = get_push_commits_event_message(committer, None, branch, commits)
topic_name = TOPIC_WITH_BRANCH_TEMPLATE.format(
repo=repository["name"].tame(check_string), branch=branch
)
check_send_webhook_message(
request, user_profile, topic_name, content, unquote_url_parameters=True
)
return json_success(request)