zulip/zerver/webhooks/splunk/tests.py

156 lines
7.3 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
from zerver.lib.test_classes import WebhookTestCase
class SplunkHookTests(WebhookTestCase):
STREAM_NAME = 'splunk'
URL_TEMPLATE = "/api/v1/external/splunk?api_key={api_key}&stream={stream}"
FIXTURE_DIR_NAME = 'splunk'
def test_splunk_search_one_result(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
# define the expected message contents
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
# using fixture named splunk_search_one_result, execute this test
self.send_and_test_stream_message('search_one_result',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_short_search_name(self) -> None:
# don't provide a topic so the search name is used instead
expected_topic = u"This search's name isn't that long"
expected_message = """
Splunk alert from saved search:
* **Search**: [This search's name isn't that long](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('short_search_name',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_long_search_name(self) -> None:
# don't provide a topic so the search name is used instead
expected_topic = u"this-search's-got-47-words-37-sentences-58-words-we-wanna..."
expected_message = """
Splunk alert from saved search:
* **Search**: [this-search's-got-47-words-37-sentences-58-words-we-wanna-know-details-of-the-search-time-of-the-search-and-any-other-kind-of-thing-you-gotta-say-pertaining-to-and-about-the-search-I-want-to-know-authenticated-user's-name-and-any-other-kind-of-thing-you-gotta-say](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('long_search_name',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_results_link(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [sudo](Missing results_link)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('missing_results_link',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_search_name(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [Missing search_name](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('missing_search_name',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_host(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: Missing host
* **Source**: `/var/log/auth.log`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('missing_host',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_source(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `Missing source`
* **Raw**: `Jan 4 11:14:32 myserver sudo: pam_unix(sudo:session): session closed for user root`
""".strip()
self.send_and_test_stream_message('missing_source',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def test_splunk_missing_raw(self) -> None:
self.url = self.build_webhook_url(topic=u"New Search Alert")
expected_topic = u"New Search Alert"
expected_message = """
Splunk alert from saved search:
* **Search**: [sudo](http://example.com:8000/app/search/search?q=%7Cloadjob%20rt_scheduler__admin__search__sudo_at_1483557185_2.2%20%7C%20head%201%20%7C%20tail%201&earliest=0&latest=now)
* **Host**: myserver
* **Source**: `/var/log/auth.log`
* **Raw**: `Missing _raw`
""".strip()
self.send_and_test_stream_message('missing_raw',
expected_topic,
expected_message,
content_type="application/x-www-form-urlencoded")
def get_body(self, fixture_name: str) -> str:
return self.webhook_fixture_data("splunk", fixture_name, file_type="json")