# Webhooks pfor external integrations. from typing import Any, Dict from django.http import HttpRequest, HttpResponse from zerver.decorator import webhook_view from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success from zerver.lib.webhooks.common import check_send_webhook_message from zerver.models import UserProfile PINGDOM_TOPIC_TEMPLATE = "{name} status." MESSAGE_TEMPLATE = """ Service {service_url} changed its {type} status from {previous_state} to {current_state}: """.strip() DESC_TEMPLATE = """ ``` quote {description} ``` """.rstrip() SUPPORTED_CHECK_TYPES = ( "HTTP", "HTTP_CUSTOM", "HTTPS", "SMTP", "POP3", "IMAP", "PING", "DNS", "UDP", "PORT_TCP", ) @webhook_view("Pingdom") @has_request_variables def api_pingdom_webhook( request: HttpRequest, user_profile: UserProfile, payload: Dict[str, Any] = REQ(argument_type="body"), ) -> HttpResponse: check_type = get_check_type(payload) if check_type in SUPPORTED_CHECK_TYPES: subject = get_subject_for_http_request(payload) body = get_body_for_http_request(payload) else: raise UnsupportedWebhookEventType(check_type) check_send_webhook_message(request, user_profile, subject, body) return json_success() def get_subject_for_http_request(payload: Dict[str, Any]) -> str: return PINGDOM_TOPIC_TEMPLATE.format(name=payload["check_name"]) def get_body_for_http_request(payload: Dict[str, Any]) -> str: current_state = payload["current_state"] previous_state = payload["previous_state"] data = { "service_url": payload["check_params"]["hostname"], "previous_state": previous_state, "current_state": current_state, "type": get_check_type(payload), } body = MESSAGE_TEMPLATE.format(**data) if current_state == "DOWN" and previous_state == "UP": description = DESC_TEMPLATE.format(description=payload["long_description"]) body += description else: body = f"{body[:-1]}." return body def get_check_type(payload: Dict[str, Any]) -> str: return payload["check_type"]