zulip/zerver/webhooks/zabbix/view.py

47 lines
1.5 KiB
Python

from typing import Any, Dict
from django.utils.translation import ugettext as _
from django.http import HttpRequest, HttpResponse
from zerver.lib.webhooks.common import check_send_webhook_message
from zerver.lib.response import json_success, json_error
from zerver.decorator import REQ, has_request_variables, api_key_only_webhook_view
from zerver.models import UserProfile
import ujson
ZABBIX_TOPIC_TEMPLATE = '{hostname}'
ZABBIX_MESSAGE_TEMPLATE = '{status} ({severity}) alert on [{hostname}]({link}).\n{trigger}\n{item}'
@api_key_only_webhook_view('Zabbix')
@has_request_variables
def api_zabbix_webhook(request: HttpRequest, user_profile: UserProfile,
payload: Dict[str, Any]=REQ(argument_type='body')) -> HttpResponse:
body = get_body_for_http_request(payload)
subject = get_subject_for_http_request(payload)
check_send_webhook_message(request, user_profile, subject, body)
return json_success()
def get_subject_for_http_request(payload: Dict[str, Any]) -> str:
return ZABBIX_TOPIC_TEMPLATE.format(hostname=payload['hostname'])
def get_body_for_http_request(payload: Dict[str, Any]) -> str:
hostname = payload['hostname']
severity = payload['severity']
status = payload['status']
item = payload['item']
trigger = payload['trigger']
link = payload['link']
data = {
"hostname": hostname,
"severity": severity,
"status": status,
"item": item,
"trigger": trigger,
"link": link
}
return ZABBIX_MESSAGE_TEMPLATE.format(**data)