From 4a245a3f62087ab67da3327b7ae183e1eb8bf0f7 Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Fri, 26 Jan 2024 17:06:12 +0100 Subject: [PATCH] register_remote_server: Add more validation of contact_email. --- tools/lib/capitalization.py | 1 + zerver/tests/test_push_notifications.py | 88 ++++++++++++++++++------- zilencer/views.py | 29 ++++++++ 3 files changed, 93 insertions(+), 25 deletions(-) diff --git a/tools/lib/capitalization.py b/tools/lib/capitalization.py index b92ac442a7..255f821c5a 100644 --- a/tools/lib/capitalization.py +++ b/tools/lib/capitalization.py @@ -75,6 +75,7 @@ IGNORED_PHRASES = [ r"keyword", r"streamname", r"user@example\.com", + r"example\.com", r"acme", # Fragments of larger strings r"is …", diff --git a/zerver/tests/test_push_notifications.py b/zerver/tests/test_push_notifications.py index 7a24efbf5f..924b838908 100644 --- a/zerver/tests/test_push_notifications.py +++ b/zerver/tests/test_push_notifications.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple, Union from unittest import mock, skipUnless import aioapns +import DNS import orjson import responses import time_machine @@ -4743,6 +4744,20 @@ class TestPushNotificationsContent(ZulipTestCase): @skipUnless(settings.ZILENCER_ENABLED, "requires zilencer") class PushBouncerSignupTest(ZulipTestCase): + @override + def setUp(self) -> None: + super().setUp() + + # Set up a MX lookup mock for all the tests, so that they don't have to + # worry about it failing, unless they intentionally want to set it up + # to happen. + self.mxlookup_patcher = mock.patch("DNS.mxlookup", return_value=[(0, "")]) + self.mxlookup_mock = self.mxlookup_patcher.start() + + @override + def tearDown(self) -> None: + self.mxlookup_patcher.stop() + def test_deactivate_remote_server(self) -> None: zulip_org_id = str(uuid.uuid4()) zulip_org_key = get_random_string(64) @@ -4750,13 +4765,13 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_success(result) server = RemoteZulipServer.objects.get(uuid=zulip_org_id) self.assertEqual(server.hostname, "example.com") - self.assertEqual(server.contact_email, "server-admin@example.com") + self.assertEqual(server.contact_email, "server-admin@zulip.com") result = self.uuid_post(zulip_org_id, "/api/v1/remotes/server/deactivate", subdomain="") self.assert_json_success(result) @@ -4795,23 +4810,11 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="invalid-host", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_error(result, "invalid-host is not a valid hostname") - def test_push_signup_invalid_email(self) -> None: - zulip_org_id = str(uuid.uuid4()) - zulip_org_key = get_random_string(64) - request = dict( - zulip_org_id=zulip_org_id, - zulip_org_key=zulip_org_key, - hostname="example.com", - contact_email="server-admin", - ) - result = self.client_post("/api/v1/remotes/server/register", request) - self.assert_json_error(result, "Enter a valid email address.") - def test_push_signup_invalid_zulip_org_id(self) -> None: zulip_org_id = "x" * RemoteZulipServer.UUID_LENGTH zulip_org_key = get_random_string(64) @@ -4819,7 +4822,7 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_error(result, "Invalid UUID") @@ -4839,7 +4842,7 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) time_sent = now() @@ -4849,7 +4852,7 @@ class PushBouncerSignupTest(ZulipTestCase): self.assert_json_success(result) server = RemoteZulipServer.objects.get(uuid=zulip_org_id) self.assertEqual(server.hostname, "example.com") - self.assertEqual(server.contact_email, "server-admin@example.com") + self.assertEqual(server.contact_email, "server-admin@zulip.com") self.assertEqual(server.last_request_datetime, time_sent) # Update our hostname @@ -4857,7 +4860,7 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="zulip.example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) with time_machine.travel(time_sent + timedelta(minutes=1), tick=False): @@ -4865,7 +4868,7 @@ class PushBouncerSignupTest(ZulipTestCase): self.assert_json_success(result) server = RemoteZulipServer.objects.get(uuid=zulip_org_id) self.assertEqual(server.hostname, "zulip.example.com") - self.assertEqual(server.contact_email, "server-admin@example.com") + self.assertEqual(server.contact_email, "server-admin@zulip.com") self.assertEqual(server.last_request_datetime, time_sent + timedelta(minutes=1)) # Now test rotating our key @@ -4873,14 +4876,14 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", new_org_key=get_random_string(64), ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_success(result) server = RemoteZulipServer.objects.get(uuid=zulip_org_id) self.assertEqual(server.hostname, "example.com") - self.assertEqual(server.contact_email, "server-admin@example.com") + self.assertEqual(server.contact_email, "server-admin@zulip.com") zulip_org_key = request["new_org_key"] self.assertEqual(server.api_key, zulip_org_key) @@ -4889,20 +4892,20 @@ class PushBouncerSignupTest(ZulipTestCase): zulip_org_id=zulip_org_id, zulip_org_key=zulip_org_key, hostname="zulip.example.com", - contact_email="new-server-admin@example.com", + contact_email="new-server-admin@zulip.com", ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_success(result) server = RemoteZulipServer.objects.get(uuid=zulip_org_id) self.assertEqual(server.hostname, "zulip.example.com") - self.assertEqual(server.contact_email, "new-server-admin@example.com") + self.assertEqual(server.contact_email, "new-server-admin@zulip.com") # Now test trying to double-create with a new random key fails request = dict( zulip_org_id=zulip_org_id, zulip_org_key=get_random_string(64), hostname="example.com", - contact_email="server-admin@example.com", + contact_email="server-admin@zulip.com", ) result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_error( @@ -4935,6 +4938,41 @@ class PushBouncerSignupTest(ZulipTestCase): result = self.client_post("/api/v1/remotes/server/register", request) self.assert_json_error(result, "A server with hostname example.com already exists") + def test_register_contact_email_validation_rules(self) -> None: + zulip_org_id = str(uuid.uuid4()) + zulip_org_key = get_random_string(64) + request = dict( + zulip_org_id=zulip_org_id, + zulip_org_key=zulip_org_key, + hostname="example.com", + ) + + request["contact_email"] = "server-admin" + result = self.client_post("/api/v1/remotes/server/register", request) + self.assert_json_error(result, "Enter a valid email address.") + + request["contact_email"] = "admin@example.com" + result = self.client_post("/api/v1/remotes/server/register", request) + self.assert_json_error(result, "Invalid address.") + + # An example disposable domain. + request["contact_email"] = "admin@mailnator.com" + result = self.client_post("/api/v1/remotes/server/register", request) + self.assert_json_error(result, "Please use your real email address.") + + request["contact_email"] = "admin@zulip.com" + with mock.patch("DNS.mxlookup", side_effect=DNS.Base.ServerError("test", 1)): + result = self.client_post("/api/v1/remotes/server/register", request) + self.assert_json_error( + result, "zulip.com does not exist or is not configured to accept email." + ) + + with mock.patch("DNS.mxlookup", return_value=[]): + result = self.client_post("/api/v1/remotes/server/register", request) + self.assert_json_error( + result, "zulip.com does not exist or is not configured to accept email." + ) + class TestUserPushIdentityCompat(ZulipTestCase): def test_filter_q(self) -> None: diff --git a/zilencer/views.py b/zilencer/views.py index ac728151ca..3dc7b1cb84 100644 --- a/zilencer/views.py +++ b/zilencer/views.py @@ -1,9 +1,11 @@ import logging from collections import Counter from datetime import datetime, timezone +from email.headerregistry import Address from typing import Any, Dict, List, Optional, Type, TypedDict, TypeVar, Union from uuid import UUID +import DNS import orjson from django.conf import settings from django.core.exceptions import ValidationError @@ -32,6 +34,7 @@ from corporate.lib.stripe import ( ) from corporate.models import CustomerPlan, get_current_plan_by_customer from zerver.decorator import require_post +from zerver.lib.email_validation import validate_disposable from zerver.lib.exceptions import ( ErrorCode, JsonableError, @@ -57,6 +60,7 @@ from zerver.lib.timestamp import timestamp_to_datetime from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint from zerver.lib.types import RemoteRealmDictValue from zerver.lib.validator import check_capped_string, check_int, check_string_fixed_length +from zerver.models.realms import DisposableEmailError from zerver.views.push_notifications import check_app_id, validate_token from zilencer.auth import InvalidZulipServerKeyError from zilencer.models import ( @@ -132,6 +136,31 @@ def register_remote_server( except ValidationError as e: raise JsonableError(e.message) + # We don't want to allow disposable domains for contact_email either + try: + validate_disposable(contact_email) + except DisposableEmailError: + raise JsonableError(_("Please use your real email address.")) + + contact_email_domain = Address(addr_spec=contact_email).domain.lower() + if contact_email_domain == "example.com": + raise JsonableError(_("Invalid address.")) + + # Check if the domain has an MX record + try: + records = DNS.mxlookup(contact_email_domain) + dns_ms_check_successful = True + if not records: + dns_ms_check_successful = False + except DNS.Base.ServerError: + dns_ms_check_successful = False + if not dns_ms_check_successful: + raise JsonableError( + _("{domain} does not exist or is not configured to accept email.").format( + domain=contact_email_domain + ) + ) + try: validate_uuid(zulip_org_id) except ValidationError as e: