zulip/zerver/webhooks/reviewboard/view.py

199 lines
6.8 KiB
Python

from typing import Any, Dict, Sequence
from django.http import HttpRequest, HttpResponse
from zerver.decorator import webhook_view
from zerver.lib.exceptions import UnsupportedWebhookEventType
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.lib.webhooks.common import (
check_send_webhook_message,
get_http_headers_from_filename,
validate_extract_webhook_http_header,
)
from zerver.models import UserProfile
REVIEW_REQUEST_PUBLISHED = """
**{user_name}** opened [#{id}: {review_request_title}]({review_request_url}):
"""
REVIEW_REQUEST_REOPENED = """
**{user_name}** reopened [#{id}: {review_request_title}]({review_request_url}):
"""
REVIEW_REQUEST_CLOSED = """
**{user_name}** closed [#{id}: {review_request_title}]({review_request_url}):
"""
REVIEW_PUBLISHED = """
**{user_name}** [reviewed]({review_url}) [#{id}: {review_request_title}]({review_request_url}):
**Review**:
``` quote
{review_body_top}
```
"""
REVIEW_REQUEST_DETAILS = """
``` quote
**Description**: {description}
**Status**: {status}
**Target people**: {target_people}
{extra_info}
```
"""
REPLY_PUBLISHED = """
**{user_name}** [replied]({reply_url}) to [#{id}: {review_request_title}]({review_request_url}):
**Reply**:
``` quote
{reply_body_top}
```
"""
BRANCH_TEMPLATE = "**Branch**: {branch_name}"
fixture_to_headers = get_http_headers_from_filename("HTTP_X_REVIEWBOARD_EVENT")
def get_target_people_string(payload: Dict[str, Any]) -> str:
result = ""
target_people = payload["review_request"]["target_people"]
if len(target_people) == 1:
result = "**{title}**".format(**target_people[0])
else:
for target_person in target_people[:-1]:
result += "**{title}**, ".format(**target_person)
result += "and **{title}**".format(**target_people[-1])
return result
def get_review_published_body(payload: Dict[str, Any]) -> str:
kwargs = {
"review_url": payload["review"]["absolute_url"],
"id": payload["review_request"]["id"],
"review_request_title": payload["review_request"]["summary"],
"review_request_url": payload["review_request"]["absolute_url"],
"user_name": payload["review"]["links"]["user"]["title"],
"review_body_top": payload["review"]["body_top"],
}
return REVIEW_PUBLISHED.format(**kwargs).strip()
def get_reply_published_body(payload: Dict[str, Any]) -> str:
kwargs = {
"reply_url": payload["reply"]["links"]["self"]["href"],
"id": payload["review_request"]["id"],
"review_request_title": payload["review_request"]["summary"],
"review_request_url": payload["review_request"]["links"]["self"]["href"],
"user_name": payload["reply"]["links"]["user"]["title"],
"user_url": payload["reply"]["links"]["user"]["href"],
"reply_body_top": payload["reply"]["body_top"],
}
return REPLY_PUBLISHED.format(**kwargs).strip()
def get_review_request_published_body(payload: Dict[str, Any]) -> str:
kwargs = {
"id": payload["review_request"]["id"],
"review_request_title": payload["review_request"]["summary"],
"review_request_url": payload["review_request"]["absolute_url"],
"user_name": payload["review_request"]["links"]["submitter"]["title"],
"description": payload["review_request"]["description"],
"status": payload["review_request"]["status"],
"target_people": get_target_people_string(payload),
"extra_info": "",
}
message = REVIEW_REQUEST_PUBLISHED + REVIEW_REQUEST_DETAILS
branch = payload["review_request"].get("branch")
if branch and branch is not None:
branch_info = BRANCH_TEMPLATE.format(branch_name=branch)
kwargs["extra_info"] = branch_info
return message.format(**kwargs).strip()
def get_review_request_reopened_body(payload: Dict[str, Any]) -> str:
kwargs = {
"id": payload["review_request"]["id"],
"review_request_title": payload["review_request"]["summary"],
"review_request_url": payload["review_request"]["absolute_url"],
"user_name": payload["reopened_by"]["username"],
"description": payload["review_request"]["description"],
"status": payload["review_request"]["status"],
"target_people": get_target_people_string(payload),
"extra_info": "",
}
message = REVIEW_REQUEST_REOPENED + REVIEW_REQUEST_DETAILS
branch = payload["review_request"].get("branch")
if branch and branch is not None:
branch_info = BRANCH_TEMPLATE.format(branch_name=branch)
kwargs["extra_info"] = branch_info
return message.format(**kwargs).strip()
def get_review_request_closed_body(payload: Dict[str, Any]) -> str:
kwargs = {
"id": payload["review_request"]["id"],
"review_request_title": payload["review_request"]["summary"],
"review_request_url": payload["review_request"]["absolute_url"],
"user_name": payload["closed_by"]["username"],
"description": payload["review_request"]["description"],
"status": payload["review_request"]["status"],
"target_people": get_target_people_string(payload),
"extra_info": "**Close type**: {}".format(payload["close_type"]),
}
message = REVIEW_REQUEST_CLOSED + REVIEW_REQUEST_DETAILS
branch = payload["review_request"].get("branch")
if branch and branch is not None:
branch_info = BRANCH_TEMPLATE.format(branch_name=branch)
kwargs["extra_info"] = "{}\n{}".format(kwargs["extra_info"], branch_info)
return message.format(**kwargs).strip()
def get_review_request_repo_title(payload: Dict[str, Any]) -> str:
return payload["review_request"]["links"]["repository"]["title"]
RB_MESSAGE_FUNCTIONS = {
"review_request_published": get_review_request_published_body,
"review_request_reopened": get_review_request_reopened_body,
"review_request_closed": get_review_request_closed_body,
"review_published": get_review_published_body,
"reply_published": get_reply_published_body,
}
ALL_EVENT_TYPES = list(RB_MESSAGE_FUNCTIONS.keys())
@webhook_view("ReviewBoard", all_event_types=ALL_EVENT_TYPES)
@has_request_variables
def api_reviewboard_webhook(
request: HttpRequest,
user_profile: UserProfile,
payload: Dict[str, Sequence[Dict[str, Any]]] = REQ(argument_type="body"),
) -> HttpResponse:
event_type = validate_extract_webhook_http_header(
request, "X_REVIEWBOARD_EVENT", "Review Board"
)
assert event_type is not None
body_function = RB_MESSAGE_FUNCTIONS.get(event_type)
if body_function is not None:
body = body_function(payload)
topic = get_review_request_repo_title(payload)
check_send_webhook_message(request, user_profile, topic, body, event_type)
else:
raise UnsupportedWebhookEventType(event_type)
return json_success()