register_remote_server: Add more validation of contact_email.

This commit is contained in:
Mateusz Mandera 2024-01-26 17:06:12 +01:00 committed by Tim Abbott
parent 25f47bd749
commit 4a245a3f62
3 changed files with 93 additions and 25 deletions

View File

@ -75,6 +75,7 @@ IGNORED_PHRASES = [
r"keyword", r"keyword",
r"streamname", r"streamname",
r"user@example\.com", r"user@example\.com",
r"example\.com",
r"acme", r"acme",
# Fragments of larger strings # Fragments of larger strings
r"is …", r"is …",

View File

@ -8,6 +8,7 @@ from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple, Union
from unittest import mock, skipUnless from unittest import mock, skipUnless
import aioapns import aioapns
import DNS
import orjson import orjson
import responses import responses
import time_machine import time_machine
@ -4743,6 +4744,20 @@ class TestPushNotificationsContent(ZulipTestCase):
@skipUnless(settings.ZILENCER_ENABLED, "requires zilencer") @skipUnless(settings.ZILENCER_ENABLED, "requires zilencer")
class PushBouncerSignupTest(ZulipTestCase): class PushBouncerSignupTest(ZulipTestCase):
@override
def setUp(self) -> None:
super().setUp()
# Set up a MX lookup mock for all the tests, so that they don't have to
# worry about it failing, unless they intentionally want to set it up
# to happen.
self.mxlookup_patcher = mock.patch("DNS.mxlookup", return_value=[(0, "")])
self.mxlookup_mock = self.mxlookup_patcher.start()
@override
def tearDown(self) -> None:
self.mxlookup_patcher.stop()
def test_deactivate_remote_server(self) -> None: def test_deactivate_remote_server(self) -> None:
zulip_org_id = str(uuid.uuid4()) zulip_org_id = str(uuid.uuid4())
zulip_org_key = get_random_string(64) zulip_org_key = get_random_string(64)
@ -4750,13 +4765,13 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="example.com", hostname="example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_success(result) self.assert_json_success(result)
server = RemoteZulipServer.objects.get(uuid=zulip_org_id) server = RemoteZulipServer.objects.get(uuid=zulip_org_id)
self.assertEqual(server.hostname, "example.com") self.assertEqual(server.hostname, "example.com")
self.assertEqual(server.contact_email, "server-admin@example.com") self.assertEqual(server.contact_email, "server-admin@zulip.com")
result = self.uuid_post(zulip_org_id, "/api/v1/remotes/server/deactivate", subdomain="") result = self.uuid_post(zulip_org_id, "/api/v1/remotes/server/deactivate", subdomain="")
self.assert_json_success(result) self.assert_json_success(result)
@ -4795,23 +4810,11 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="invalid-host", hostname="invalid-host",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "invalid-host is not a valid hostname") self.assert_json_error(result, "invalid-host is not a valid hostname")
def test_push_signup_invalid_email(self) -> None:
zulip_org_id = str(uuid.uuid4())
zulip_org_key = get_random_string(64)
request = dict(
zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key,
hostname="example.com",
contact_email="server-admin",
)
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "Enter a valid email address.")
def test_push_signup_invalid_zulip_org_id(self) -> None: def test_push_signup_invalid_zulip_org_id(self) -> None:
zulip_org_id = "x" * RemoteZulipServer.UUID_LENGTH zulip_org_id = "x" * RemoteZulipServer.UUID_LENGTH
zulip_org_key = get_random_string(64) zulip_org_key = get_random_string(64)
@ -4819,7 +4822,7 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="example.com", hostname="example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "Invalid UUID") self.assert_json_error(result, "Invalid UUID")
@ -4839,7 +4842,7 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="example.com", hostname="example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
time_sent = now() time_sent = now()
@ -4849,7 +4852,7 @@ class PushBouncerSignupTest(ZulipTestCase):
self.assert_json_success(result) self.assert_json_success(result)
server = RemoteZulipServer.objects.get(uuid=zulip_org_id) server = RemoteZulipServer.objects.get(uuid=zulip_org_id)
self.assertEqual(server.hostname, "example.com") self.assertEqual(server.hostname, "example.com")
self.assertEqual(server.contact_email, "server-admin@example.com") self.assertEqual(server.contact_email, "server-admin@zulip.com")
self.assertEqual(server.last_request_datetime, time_sent) self.assertEqual(server.last_request_datetime, time_sent)
# Update our hostname # Update our hostname
@ -4857,7 +4860,7 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="zulip.example.com", hostname="zulip.example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
with time_machine.travel(time_sent + timedelta(minutes=1), tick=False): with time_machine.travel(time_sent + timedelta(minutes=1), tick=False):
@ -4865,7 +4868,7 @@ class PushBouncerSignupTest(ZulipTestCase):
self.assert_json_success(result) self.assert_json_success(result)
server = RemoteZulipServer.objects.get(uuid=zulip_org_id) server = RemoteZulipServer.objects.get(uuid=zulip_org_id)
self.assertEqual(server.hostname, "zulip.example.com") self.assertEqual(server.hostname, "zulip.example.com")
self.assertEqual(server.contact_email, "server-admin@example.com") self.assertEqual(server.contact_email, "server-admin@zulip.com")
self.assertEqual(server.last_request_datetime, time_sent + timedelta(minutes=1)) self.assertEqual(server.last_request_datetime, time_sent + timedelta(minutes=1))
# Now test rotating our key # Now test rotating our key
@ -4873,14 +4876,14 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="example.com", hostname="example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
new_org_key=get_random_string(64), new_org_key=get_random_string(64),
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_success(result) self.assert_json_success(result)
server = RemoteZulipServer.objects.get(uuid=zulip_org_id) server = RemoteZulipServer.objects.get(uuid=zulip_org_id)
self.assertEqual(server.hostname, "example.com") self.assertEqual(server.hostname, "example.com")
self.assertEqual(server.contact_email, "server-admin@example.com") self.assertEqual(server.contact_email, "server-admin@zulip.com")
zulip_org_key = request["new_org_key"] zulip_org_key = request["new_org_key"]
self.assertEqual(server.api_key, zulip_org_key) self.assertEqual(server.api_key, zulip_org_key)
@ -4889,20 +4892,20 @@ class PushBouncerSignupTest(ZulipTestCase):
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key, zulip_org_key=zulip_org_key,
hostname="zulip.example.com", hostname="zulip.example.com",
contact_email="new-server-admin@example.com", contact_email="new-server-admin@zulip.com",
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_success(result) self.assert_json_success(result)
server = RemoteZulipServer.objects.get(uuid=zulip_org_id) server = RemoteZulipServer.objects.get(uuid=zulip_org_id)
self.assertEqual(server.hostname, "zulip.example.com") self.assertEqual(server.hostname, "zulip.example.com")
self.assertEqual(server.contact_email, "new-server-admin@example.com") self.assertEqual(server.contact_email, "new-server-admin@zulip.com")
# Now test trying to double-create with a new random key fails # Now test trying to double-create with a new random key fails
request = dict( request = dict(
zulip_org_id=zulip_org_id, zulip_org_id=zulip_org_id,
zulip_org_key=get_random_string(64), zulip_org_key=get_random_string(64),
hostname="example.com", hostname="example.com",
contact_email="server-admin@example.com", contact_email="server-admin@zulip.com",
) )
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error( self.assert_json_error(
@ -4935,6 +4938,41 @@ class PushBouncerSignupTest(ZulipTestCase):
result = self.client_post("/api/v1/remotes/server/register", request) result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "A server with hostname example.com already exists") self.assert_json_error(result, "A server with hostname example.com already exists")
def test_register_contact_email_validation_rules(self) -> None:
zulip_org_id = str(uuid.uuid4())
zulip_org_key = get_random_string(64)
request = dict(
zulip_org_id=zulip_org_id,
zulip_org_key=zulip_org_key,
hostname="example.com",
)
request["contact_email"] = "server-admin"
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "Enter a valid email address.")
request["contact_email"] = "admin@example.com"
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "Invalid address.")
# An example disposable domain.
request["contact_email"] = "admin@mailnator.com"
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(result, "Please use your real email address.")
request["contact_email"] = "admin@zulip.com"
with mock.patch("DNS.mxlookup", side_effect=DNS.Base.ServerError("test", 1)):
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(
result, "zulip.com does not exist or is not configured to accept email."
)
with mock.patch("DNS.mxlookup", return_value=[]):
result = self.client_post("/api/v1/remotes/server/register", request)
self.assert_json_error(
result, "zulip.com does not exist or is not configured to accept email."
)
class TestUserPushIdentityCompat(ZulipTestCase): class TestUserPushIdentityCompat(ZulipTestCase):
def test_filter_q(self) -> None: def test_filter_q(self) -> None:

View File

@ -1,9 +1,11 @@
import logging import logging
from collections import Counter from collections import Counter
from datetime import datetime, timezone from datetime import datetime, timezone
from email.headerregistry import Address
from typing import Any, Dict, List, Optional, Type, TypedDict, TypeVar, Union from typing import Any, Dict, List, Optional, Type, TypedDict, TypeVar, Union
from uuid import UUID from uuid import UUID
import DNS
import orjson import orjson
from django.conf import settings from django.conf import settings
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
@ -32,6 +34,7 @@ from corporate.lib.stripe import (
) )
from corporate.models import CustomerPlan, get_current_plan_by_customer from corporate.models import CustomerPlan, get_current_plan_by_customer
from zerver.decorator import require_post from zerver.decorator import require_post
from zerver.lib.email_validation import validate_disposable
from zerver.lib.exceptions import ( from zerver.lib.exceptions import (
ErrorCode, ErrorCode,
JsonableError, JsonableError,
@ -57,6 +60,7 @@ from zerver.lib.timestamp import timestamp_to_datetime
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
from zerver.lib.types import RemoteRealmDictValue from zerver.lib.types import RemoteRealmDictValue
from zerver.lib.validator import check_capped_string, check_int, check_string_fixed_length from zerver.lib.validator import check_capped_string, check_int, check_string_fixed_length
from zerver.models.realms import DisposableEmailError
from zerver.views.push_notifications import check_app_id, validate_token from zerver.views.push_notifications import check_app_id, validate_token
from zilencer.auth import InvalidZulipServerKeyError from zilencer.auth import InvalidZulipServerKeyError
from zilencer.models import ( from zilencer.models import (
@ -132,6 +136,31 @@ def register_remote_server(
except ValidationError as e: except ValidationError as e:
raise JsonableError(e.message) raise JsonableError(e.message)
# We don't want to allow disposable domains for contact_email either
try:
validate_disposable(contact_email)
except DisposableEmailError:
raise JsonableError(_("Please use your real email address."))
contact_email_domain = Address(addr_spec=contact_email).domain.lower()
if contact_email_domain == "example.com":
raise JsonableError(_("Invalid address."))
# Check if the domain has an MX record
try:
records = DNS.mxlookup(contact_email_domain)
dns_ms_check_successful = True
if not records:
dns_ms_check_successful = False
except DNS.Base.ServerError:
dns_ms_check_successful = False
if not dns_ms_check_successful:
raise JsonableError(
_("{domain} does not exist or is not configured to accept email.").format(
domain=contact_email_domain
)
)
try: try:
validate_uuid(zulip_org_id) validate_uuid(zulip_org_id)
except ValidationError as e: except ValidationError as e: