mirror of https://github.com/zulip/zulip.git
thinkst: Strengthen types using WildValue.
This commit is contained in:
parent
6ac9b63def
commit
b46d96af1a
|
@ -1,35 +1,36 @@
|
|||
# Webhooks for external integrations.
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
||||
from zerver.decorator import webhook_view
|
||||
from zerver.lib.request import REQ, has_request_variables
|
||||
from zerver.lib.response import json_success
|
||||
from zerver.lib.validator import WildValue, check_int, check_string, check_union, to_wild_value
|
||||
from zerver.lib.webhooks.common import check_send_webhook_message
|
||||
from zerver.models import UserProfile
|
||||
|
||||
|
||||
def is_canarytoken(message: Dict[str, Any]) -> bool:
|
||||
def is_canarytoken(message: WildValue) -> bool:
|
||||
"""
|
||||
Requests sent from Thinkst canaries are either from canarytokens or
|
||||
canaries, which can be differentiated by the value of the `AlertType`
|
||||
field.
|
||||
"""
|
||||
return message["AlertType"] == "CanarytokenIncident"
|
||||
return message["AlertType"].tame(check_string) == "CanarytokenIncident"
|
||||
|
||||
|
||||
def canary_name(message: Dict[str, Any]) -> str:
|
||||
def canary_name(message: WildValue) -> str:
|
||||
"""
|
||||
Returns the name of the canary or canarytoken.
|
||||
"""
|
||||
if is_canarytoken(message):
|
||||
return message["Reminder"]
|
||||
return message["Reminder"].tame(check_string)
|
||||
else:
|
||||
return message["CanaryName"]
|
||||
return message["CanaryName"].tame(check_string)
|
||||
|
||||
|
||||
def canary_kind(message: Dict[str, Any]) -> str:
|
||||
def canary_kind(message: WildValue) -> str:
|
||||
"""
|
||||
Returns a description of the kind of request - canary or canarytoken.
|
||||
"""
|
||||
|
@ -39,51 +40,51 @@ def canary_kind(message: Dict[str, Any]) -> str:
|
|||
return "canary"
|
||||
|
||||
|
||||
def source_ip_and_reverse_dns(message: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
||||
def source_ip_and_reverse_dns(message: WildValue) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
Extract the source IP and reverse DNS information from a canary request.
|
||||
"""
|
||||
reverse_dns, source_ip = (None, None)
|
||||
|
||||
if "SourceIP" in message:
|
||||
source_ip = message["SourceIP"]
|
||||
source_ip = message["SourceIP"].tame(check_string)
|
||||
# `ReverseDNS` can sometimes exist and still be empty.
|
||||
if "ReverseDNS" in message and message["ReverseDNS"]:
|
||||
reverse_dns = message["ReverseDNS"]
|
||||
if "ReverseDNS" in message and message["ReverseDNS"].tame(check_string) != "":
|
||||
reverse_dns = message["ReverseDNS"].tame(check_string)
|
||||
|
||||
return (source_ip, reverse_dns)
|
||||
|
||||
|
||||
def body(message: Dict[str, Any]) -> str:
|
||||
def body(message: WildValue) -> str:
|
||||
"""
|
||||
Construct the response to a canary or canarytoken request.
|
||||
"""
|
||||
|
||||
title = canary_kind(message).title()
|
||||
name = canary_name(message)
|
||||
body = f"**:alert: {title} *{name}* has been triggered!**\n\n{message['Intro']}\n\n"
|
||||
body = f"**:alert: {title} *{name}* has been triggered!**\n\n{message['Intro'].tame(check_string)}\n\n"
|
||||
|
||||
if "IncidentHash" in message:
|
||||
body += f"**Incident ID:** `{message['IncidentHash']}`\n"
|
||||
body += f"**Incident ID:** `{message['IncidentHash'].tame(check_string)}`\n"
|
||||
|
||||
if "Token" in message:
|
||||
body += f"**Token:** `{message['Token']}`\n"
|
||||
body += f"**Token:** `{message['Token'].tame(check_string)}`\n"
|
||||
|
||||
if "Description" in message:
|
||||
body += f"**Kind:** {message['Description']}\n"
|
||||
body += f"**Kind:** {message['Description'].tame(check_string)}\n"
|
||||
|
||||
if "Timestamp" in message:
|
||||
body += f"**Timestamp:** {message['Timestamp']}\n"
|
||||
body += f"**Timestamp:** {message['Timestamp'].tame(check_string)}\n"
|
||||
|
||||
if "CanaryIP" in message:
|
||||
body += f"**Canary IP:** `{message['CanaryIP']}`\n"
|
||||
body += f"**Canary IP:** `{message['CanaryIP'].tame(check_string)}`\n"
|
||||
|
||||
if "CanaryLocation" in message:
|
||||
body += f"**Canary location:** {message['CanaryLocation']}\n"
|
||||
body += f"**Canary location:** {message['CanaryLocation'].tame(check_string)}\n"
|
||||
|
||||
if "Triggered" in message:
|
||||
unit = "times" if message["Triggered"] > 1 else "time"
|
||||
body += f"**Triggered:** {message['Triggered']} {unit}\n"
|
||||
unit = "times" if message["Triggered"].tame(check_int) > 1 else "time"
|
||||
body += f"**Triggered:** {message['Triggered'].tame(check_int)} {unit}\n"
|
||||
|
||||
source_ip, reverse_dns = source_ip_and_reverse_dns(message)
|
||||
if source_ip:
|
||||
|
@ -93,12 +94,14 @@ def body(message: Dict[str, Any]) -> str:
|
|||
|
||||
if "AdditionalDetails" in message:
|
||||
for detail in message["AdditionalDetails"]:
|
||||
if isinstance(detail[1], str) and "*" in detail[1]:
|
||||
key = detail[0].tame(check_string)
|
||||
value = detail[1].tame(check_union([check_string, check_int]))
|
||||
if isinstance(value, str) and "*" in value:
|
||||
# Thinkst sends passwords as a series of stars which can mess with
|
||||
# formatting, so wrap these in backticks.
|
||||
body += f"**{detail[0]}:** `{detail[1]}`\n"
|
||||
body += f"**{key}:** `{value}`\n"
|
||||
else:
|
||||
body += f"**{detail[0]}:** {detail[1]}\n"
|
||||
body += f"**{key}:** {value}\n"
|
||||
|
||||
return body
|
||||
|
||||
|
@ -108,7 +111,7 @@ def body(message: Dict[str, Any]) -> str:
|
|||
def api_thinkst_webhook(
|
||||
request: HttpRequest,
|
||||
user_profile: UserProfile,
|
||||
message: Dict[str, Any] = REQ(argument_type="body"),
|
||||
message: WildValue = REQ(argument_type="body", converter=to_wild_value),
|
||||
user_specified_topic: Optional[str] = REQ("topic", default=None),
|
||||
) -> HttpResponse:
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue