2019-06-05 15:12:34 +02:00
from types import SimpleNamespace
from typing import Dict
2020-06-11 00:54:34 +02:00
from unittest . mock import MagicMock , patch
2019-06-05 15:12:34 +02:00
2018-11-15 05:31:34 +01:00
from django . http import HttpRequest
2021-08-19 10:45:20 +02:00
from django . http . response import HttpResponse
2018-11-15 05:31:34 +01:00
2022-04-14 23:51:16 +02:00
from zerver . actions . streams import do_rename_stream
2020-08-20 00:32:15 +02:00
from zerver . decorator import webhook_view
2018-11-15 05:31:34 +01:00
from zerver . lib . exceptions import InvalidJSONError , JsonableError
2018-04-24 20:22:38 +02:00
from zerver . lib . send_email import FromAddress
2020-06-11 00:54:34 +02:00
from zerver . lib . test_classes import WebhookTestCase , ZulipTestCase
2018-04-24 20:22:38 +02:00
from zerver . lib . test_helpers import HostRequestMock
2020-06-11 00:54:34 +02:00
from zerver . lib . users import get_api_key
from zerver . lib . webhooks . common import (
INVALID_JSON_MESSAGE ,
MISSING_EVENT_HEADER_MESSAGE ,
2022-11-17 09:30:48 +01:00
MissingHTTPEventHeaderError ,
2020-06-11 00:54:34 +02:00
get_fixture_http_headers ,
standardize_headers ,
validate_extract_webhook_http_header ,
)
from zerver . models import UserProfile , get_realm , get_user
2018-04-24 20:22:38 +02:00
class WebhooksCommonTestCase ( ZulipTestCase ) :
def test_webhook_http_header_header_exists ( self ) - > None :
2021-02-12 08:20:45 +01:00
webhook_bot = get_user ( " webhook-bot@zulip.com " , get_realm ( " zulip " ) )
2018-04-24 20:22:38 +02:00
request = HostRequestMock ( )
2021-02-12 08:20:45 +01:00
request . META [ " HTTP_X_CUSTOM_HEADER " ] = " custom_value "
2018-04-24 20:22:38 +02:00
request . user = webhook_bot
2021-02-12 08:19:30 +01:00
header_value = validate_extract_webhook_http_header (
2022-05-12 06:54:12 +02:00
request , " X-Custom-Header " , " test_webhook "
2021-02-12 08:19:30 +01:00
)
2018-04-24 20:22:38 +02:00
2021-02-12 08:20:45 +01:00
self . assertEqual ( header_value , " custom_value " )
2018-04-24 20:22:38 +02:00
def test_webhook_http_header_header_does_not_exist ( self ) - > None :
2021-03-08 11:39:48 +01:00
realm = get_realm ( " zulip " )
webhook_bot = get_user ( " webhook-bot@zulip.com " , realm )
2018-04-24 20:22:38 +02:00
webhook_bot . last_reminder = None
2021-03-08 11:39:48 +01:00
notification_bot = self . notification_bot ( realm )
2018-04-24 20:22:38 +02:00
request = HostRequestMock ( )
request . user = webhook_bot
2021-02-12 08:20:45 +01:00
request . path = " some/random/path "
2018-04-24 20:22:38 +02:00
2022-05-12 06:54:12 +02:00
exception_msg = " Missing the HTTP event header ' X-Custom-Header ' "
2022-11-17 09:30:48 +01:00
with self . assertRaisesRegex ( MissingHTTPEventHeaderError , exception_msg ) :
2022-05-12 06:54:12 +02:00
validate_extract_webhook_http_header ( request , " X-Custom-Header " , " test_webhook " )
2018-04-24 20:22:38 +02:00
msg = self . get_last_message ( )
expected_message = MISSING_EVENT_HEADER_MESSAGE . format (
bot_name = webhook_bot . full_name ,
request_path = request . path ,
2022-05-12 06:54:12 +02:00
header_name = " X-Custom-Header " ,
2021-02-12 08:20:45 +01:00
integration_name = " test_webhook " ,
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
support_email = FromAddress . SUPPORT ,
2018-04-24 20:22:38 +02:00
) . rstrip ( )
2021-03-08 11:29:00 +01:00
self . assertEqual ( msg . sender . id , notification_bot . id )
2018-04-24 20:22:38 +02:00
self . assertEqual ( msg . content , expected_message )
2019-01-31 14:32:37 +01:00
def test_notify_bot_owner_on_invalid_json ( self ) - > None :
2021-02-12 08:20:45 +01:00
@webhook_view ( " ClientName " , notify_bot_owner_on_invalid_json = False )
2021-08-19 10:45:20 +02:00
def my_webhook_no_notify ( request : HttpRequest , user_profile : UserProfile ) - > HttpResponse :
2018-11-15 05:31:34 +01:00
raise InvalidJSONError ( " Malformed JSON " )
2021-02-12 08:20:45 +01:00
@webhook_view ( " ClientName " , notify_bot_owner_on_invalid_json = True )
2021-08-19 10:45:20 +02:00
def my_webhook_notify ( request : HttpRequest , user_profile : UserProfile ) - > HttpResponse :
2018-11-15 05:31:34 +01:00
raise InvalidJSONError ( " Malformed JSON " )
2021-02-12 08:20:45 +01:00
webhook_bot_email = " webhook-bot@zulip.com "
webhook_bot_realm = get_realm ( " zulip " )
2018-11-15 05:31:34 +01:00
webhook_bot = get_user ( webhook_bot_email , webhook_bot_realm )
webhook_bot_api_key = get_api_key ( webhook_bot )
request = HostRequestMock ( )
2021-02-12 08:20:45 +01:00
request . POST [ " api_key " ] = webhook_bot_api_key
2018-11-15 05:31:34 +01:00
request . host = " zulip.testserver "
2021-02-12 08:20:45 +01:00
expected_msg = INVALID_JSON_MESSAGE . format ( webhook_name = " ClientName " )
2018-11-15 05:31:34 +01:00
last_message_id = self . get_last_message ( ) . id
with self . assertRaisesRegex ( JsonableError , " Malformed JSON " ) :
2020-06-23 04:30:55 +02:00
my_webhook_no_notify ( request )
2018-11-15 05:31:34 +01:00
2023-06-19 16:26:12 +02:00
# First verify that without the setting, it doesn't send a direct
# message to bot owner.
2018-11-15 05:31:34 +01:00
msg = self . get_last_message ( )
self . assertEqual ( msg . id , last_message_id )
self . assertNotEqual ( msg . content , expected_msg . strip ( ) )
# Then verify that with the setting, it does send such a message.
2022-05-12 08:28:00 +02:00
request = HostRequestMock ( )
request . POST [ " api_key " ] = webhook_bot_api_key
request . host = " zulip.testserver "
2018-12-06 00:12:19 +01:00
with self . assertRaisesRegex ( JsonableError , " Malformed JSON " ) :
2020-06-23 04:30:55 +02:00
my_webhook_notify ( request )
2018-11-15 05:31:34 +01:00
msg = self . get_last_message ( )
self . assertNotEqual ( msg . id , last_message_id )
2021-03-08 11:39:48 +01:00
self . assertEqual ( msg . sender . id , self . notification_bot ( webhook_bot_realm ) . id )
2018-11-15 05:31:34 +01:00
self . assertEqual ( msg . content , expected_msg . strip ( ) )
2019-06-05 15:12:34 +02:00
@patch ( " zerver.lib.webhooks.common.importlib.import_module " )
def test_get_fixture_http_headers_for_success ( self , import_module_mock : MagicMock ) - > None :
def fixture_to_headers ( fixture_name : str ) - > Dict [ str , str ] :
# A sample function which would normally perform some
# extra operations before returning a dictionary
# corresponding to the fixture name passed. For this test,
# we just return a fixed dictionary.
return { " key " : " value " }
fake_module = SimpleNamespace ( fixture_to_headers = fixture_to_headers )
import_module_mock . return_value = fake_module
headers = get_fixture_http_headers ( " some_integration " , " complex_fixture " )
self . assertEqual ( headers , { " key " : " value " } )
2022-02-08 00:13:33 +01:00
def test_get_fixture_http_headers_for_non_existent_integration ( self ) - > None :
headers = get_fixture_http_headers ( " some_random_nonexistent_integration " , " fixture_name " )
2019-06-05 15:12:34 +02:00
self . assertEqual ( headers , { } )
@patch ( " zerver.lib.webhooks.common.importlib.import_module " )
2019-06-22 06:57:40 +02:00
def test_get_fixture_http_headers_with_no_fixtures_to_headers_function (
self ,
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
import_module_mock : MagicMock ,
2019-06-22 06:57:40 +02:00
) - > None :
2019-06-05 15:12:34 +02:00
fake_module = SimpleNamespace ( )
import_module_mock . return_value = fake_module
2019-06-22 06:57:40 +02:00
self . assertEqual (
get_fixture_http_headers ( " some_integration " , " simple_fixture " ) ,
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
{ } ,
2019-06-22 06:57:40 +02:00
)
2019-06-05 15:12:34 +02:00
2019-06-21 04:41:30 +02:00
def test_standardize_headers ( self ) - > None :
self . assertEqual ( standardize_headers ( { } ) , { } )
2019-06-05 15:12:34 +02:00
raw_headers = { " Content-Type " : " text/plain " , " X-Event-Type " : " ping " }
2019-06-21 04:41:30 +02:00
djangoified_headers = standardize_headers ( raw_headers )
2019-06-05 15:12:34 +02:00
expected_djangoified_headers = { " CONTENT_TYPE " : " text/plain " , " HTTP_X_EVENT_TYPE " : " ping " }
self . assertEqual ( djangoified_headers , expected_djangoified_headers )
2021-02-12 08:19:30 +01:00
2021-06-30 14:15:27 +02:00
class WebhookURLConfigurationTestCase ( WebhookTestCase ) :
STREAM_NAME = " helloworld "
WEBHOOK_DIR_NAME = " helloworld "
URL_TEMPLATE = " /api/v1/external/helloworld?stream= {stream} &api_key= {api_key} "
def setUp ( self ) - > None :
super ( ) . setUp ( )
stream = self . subscribe ( self . test_user , self . STREAM_NAME )
# In actual webhook tests, we will not need to use stream id.
# We assign stream id to STREAM_NAME for testing URL configuration only.
self . STREAM_NAME = str ( stream . id )
do_rename_stream ( stream , " helloworld_renamed " , self . test_user )
self . url = self . build_webhook_url ( )
def test_trigger_stream_message_by_id ( self ) - > None :
# check_webhook cannot be used here as it
# subscribes the test user to self.STREAM_NAME
payload = self . get_body ( " hello " )
self . send_webhook_payload (
self . test_user , self . url , payload , content_type = " application/json "
)
expected_topic = " Hello World "
expected_message = " Hello! I am happy to be here! :smile: \n The Wikipedia featured article for today is **[Marilyn Monroe](https://en.wikipedia.org/wiki/Marilyn_Monroe)** "
msg = self . get_last_message ( )
self . assert_stream_message (
message = msg ,
stream_name = " helloworld_renamed " ,
topic_name = expected_topic ,
content = expected_message ,
)
2018-04-24 20:22:38 +02:00
class MissingEventHeaderTestCase ( WebhookTestCase ) :
2021-02-12 08:20:45 +01:00
STREAM_NAME = " groove "
URL_TEMPLATE = " /api/v1/external/groove?stream= {stream} &api_key= {api_key} "
2018-04-24 20:22:38 +02:00
# This tests the validate_extract_webhook_http_header function with
# an actual webhook, instead of just making a mock
def test_missing_event_header ( self ) - > None :
self . subscribe ( self . test_user , self . STREAM_NAME )
2023-10-12 15:54:34 +02:00
with self . assertLogs ( " zulip.zerver.webhooks.anomalous " , level = " INFO " ) as webhook_logs :
result = self . client_post (
self . url ,
self . get_body ( " ticket_state_changed " ) ,
content_type = " application/x-www-form-urlencoded " ,
)
self . assertTrue ( " Missing the HTTP event header ' X-Groove-Event ' " in webhook_logs . output [ 0 ] )
2022-05-12 06:54:12 +02:00
self . assert_json_error ( result , " Missing the HTTP event header ' X-Groove-Event ' " )
2018-04-24 20:22:38 +02:00
2021-03-08 11:39:48 +01:00
realm = get_realm ( " zulip " )
webhook_bot = get_user ( " webhook-bot@zulip.com " , realm )
2018-04-24 20:22:38 +02:00
webhook_bot . last_reminder = None
2021-03-08 11:39:48 +01:00
notification_bot = self . notification_bot ( realm )
2018-04-24 20:22:38 +02:00
msg = self . get_last_message ( )
expected_message = MISSING_EVENT_HEADER_MESSAGE . format (
bot_name = webhook_bot . full_name ,
2021-02-12 08:20:45 +01:00
request_path = " /api/v1/external/groove " ,
2022-05-12 06:54:12 +02:00
header_name = " X-Groove-Event " ,
2021-02-12 08:20:45 +01:00
integration_name = " Groove " ,
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
support_email = FromAddress . SUPPORT ,
2018-04-24 20:22:38 +02:00
) . rstrip ( )
2021-03-08 11:29:00 +01:00
if msg . sender . id != notification_bot . id : # nocoverage
2018-05-17 18:13:57 +02:00
# This block seems to fire occasionally; debug output:
print ( msg )
print ( msg . content )
2021-03-08 11:29:00 +01:00
self . assertEqual ( msg . sender . id , notification_bot . id )
2018-04-24 20:22:38 +02:00
self . assertEqual ( msg . content , expected_message )
2018-05-10 19:00:29 +02:00
def get_body ( self , fixture_name : str ) - > str :
2018-04-24 20:22:38 +02:00
return self . webhook_fixture_data ( " groove " , fixture_name , file_type = " json " )